文档 API
这部分讲的是这些CRUD API:
所有的CRUD API 都是单index API。 index 参数只可以接受单独的index,或者是使用 alias 来指向这个index。
|
对JSON文档建立索引
Index API 可以在指定的index中将一个结构化的JSON文档建立索引,并使其可以被查询到。
生成JSON格式的文档
一般有下面几种方式来生成一个JSON文档:
-
手动使用
byte[]
或String
类型拼接一个JSON -
使用
Map
类型会自动转换成对应的JSON -
使用第三方库将你的Bean序列化成Json格式,例如 Jackson 库等
-
使用内置的 XContentFactory.jsonBuilder() 生成JSON
在内部,所有类型最终都会转换成 byte[]
类型(所以String类型最后也会转成 byte[]
类型)。
因此,如果你的对象已经是 byte[]
类型了,那么就直接使用它吧。
当然官方内置的 jsonBuilder
是一个高度优化后的工具,你可以使用它来直接构造 byte[]
。
手动拼接
没什么困难的,唯一需要注意的是通过 Date Format 对日期进行格式化。
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
使用 Map
Map是一个键值对的集合,它可以等价于一个JSON结构:
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
序列化 JavaBean
你可以使用 Jackson 将你的Bean序列化成JSON。
请将 Jackson Databind 加入你的项目中。
然后你就可以使用 ObjectMapper
来序列化你的Bean了:
import com.fasterxml.jackson.databind.*;
// 实例化一个 json mapper
ObjectMapper mapper = new ObjectMapper(); // 只创建一次,重复使用
// 生成 json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用 Elasticsearch 助手
Elasticsearch 提供了一个内置的助手来帮助你生成JSON内容。
import static org.elasticsearch.common.xcontent.XContentFactory.*;
XContentBuilder builder = jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
注意,你也可以使用 startArray(String)
和 endArray()
方法来添加一个数组。
顺便说一下,field
方法可以接受很多对象类型。
你可以直接传一些 number,date 或者是其它 XContentBuilder 对象。
如果你想查看生成的JSON内容,可以使用 Strings.toString()
方法。
import org.elasticsearch.common.Strings;
String json = Strings.toString(builder);
对文档建立索引
下面就是将一个JSON文档建立索引的例子,其中index为twitter,type为 _doc
,id的值是1:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
IndexResponse response = client.prepareIndex("twitter", "_doc", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
.get();
注意,你也可以对一个JSON字符串类型的文档建立索引,并且 ID 也不是必须的:
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("twitter", "_doc")
.setSource(json, XContentType.JSON)
.get();
结果可以在 IndexResponse
中查看:
// Index 名称
String _index = response.getIndex();
// Type 名称
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (如果你是新建的这个文档,会得到1)
long _version = response.getVersion();
// 当前实例的存储状态
RestStatus status = response.status();
有关index操作的更多信息,请查看 REST index 。
根据ID获取文档
Get API 可以根据 id
从 index 中获取JSON文档。
下面就是一个获取JSON格式文档的例子,它的条件是index为twitter,type是 _doc
,id是1:
GetResponse response = client.prepareGet("twitter", "_doc", "1").get();
有关Get操作的更多信息,请查看 REST get 。
删除文档
Delete API 可以在指定的 index 中根据ID删除一个JSON类型的文档。
下面就是一个删除JSON文档的例子,它的index是twitter,type是 _doc
,id是1:
DeleteResponse response = client.prepareDelete("twitter", "_doc", "1").get();
有关Delete操作的更多信息,请查看 delete API 。
根据条件删除文档
Delete By Query API 可以根据查询的结果删除一组文档:
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) (1)
.source("persons") (2)
.get(); (3)
long deleted = response.getDeleted(); (4)
1 | query |
2 | index |
3 | 执行操作 |
4 | 删除文档的数量 |
由于这个操作可能会运行很长时间,如果你希望异步执行,可以使用 execute
代替 get
来执行,并提供一个listener,比如:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) (1)
.source("persons") (2)
.execute(new ActionListener<BulkByScrollResponse>() { (3)
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); (4)
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
1 | query |
2 | index |
3 | 执行操作 |
4 | 删除文档的数量 |
更新文档
你可以创建一个 UpdateRequest
并发送给client:
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("_doc");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
或者也可以使用 prepareUpdate()
方法:
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script(
"ctx._source.gender = \"male\"", (1)
ScriptService.ScriptType.INLINE, null, null))
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() (2)
.startObject()
.field("gender", "male")
.endObject())
.get();
1 | 你的脚本。它也可以是存储中本地文件中,这种情况下,需要使用 ScriptService.ScriptType.FILE |
2 | 将要和已有文档合并的文档。 |
Note that you can’t provide both script
and doc
.
通过脚本更新
Update API 允许通过一个脚本来更新文档:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
合并文档
Update API 也可以传入一部分文档,并将之合并(简单的递归合并,对象的内部合并,替换键值对和数组)到一个已存在的文档中。 例如:
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
更新并插入
Update API 也支持 upsert
。如果该文档不存在,则会对该文档建立索引:
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); (1)
client.update(updateRequest).get();
1 | 如果文档不存在, indexRequest 中的元素就会被建立索引 |
如果 index/_doc/1
这个文档已经存在了,在这个操作之后我们会有一个这样的文档:
{
"name" : "Joe Dalton",
"gender": "male" (1)
}
1 | 这个字段是通过 update request 添加的 |
如果不存在,我们会得到一个新的文档:
{
"name" : "Joe Smith",
"gender": "male"
}
获取多个文档
Multi get API 允许通过 index
和 id
获取多个文档:
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "_doc", "1") (1)
.add("twitter", "_doc", "2", "3", "4") (2)
.add("another", "_doc", "foo") (3)
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) { (4)
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { (5)
String json = response.getSourceAsString(); (6)
}
}
1 | 通过单独的id获取 |
2 | 在同一个index中,可以通过多个id获取 |
3 | 也可以从其它的index中获取文档 |
4 | 遍历结果集 |
5 | 查看文档是否存在 |
6 | 获取 _source 字段 |
有关Multi get操作的更多信息,请查看REST multi get。
批处理
Bulk API 允许在一次单独的请求中索引或删除多个文档。 下面是一个用法示例:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
使用批处理工具
BulkProcessor
类提供了一个简单的 API 来自动执行批处理操作,它可以设置的规则有请求数、请求数据大小、周期。
要使用它,首先需要创建一个 BulkProcessor
实例:
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, (1)
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } (2)
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } (3)
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } (4)
})
.setBulkActions(10000) (5)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) (6)
.setFlushInterval(TimeValue.timeValueSeconds(5)) (7)
.setConcurrentRequests(1) (8)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) (9)
.build();
1 | 添加Elasticsearch client |
2 | 在批处理执行之前被调用。你可以使用 request.numberOfActions() 方法来获取本次批处理的数量 |
3 | 在批处理执行之后被调用。你可以使用 response.hasFailures() 方法可以查看失败的请求 |
4 | 在批处理执行失败,或抛出 Throwable 的时候被调用 |
5 | 设置每次累计达10000个请求就立刻执行批处理 |
6 | 设置每次数据到达5MB的大小就立刻执行批处理 |
7 | 设置无论有几条请求,总之每隔5秒就立刻执行批处理 |
8 | 设置执行批处理请求时允许的最大并发数。0代表每次只执行一个请求,1代表允许1条并发请求 |
9 | 设置自定义补偿策略:第一次等待100ms,之后每次的间隔时间翻倍,一共重试3次。当批处理执行的时候抛出 EsRejectedExecutionException 异常的时候,会启动重试策略。原因可能是硬件资源太低。如想关闭该策略,可以传入 BackoffPolicy.noBackoff() |
默认情况,BulkProcessor
的配置如下:
-
1000
个请求 -
5mb
大小 -
没有间隔
-
允许1个并发请求
-
第一次等待50ms重试,之后每次的间隔时间翻倍,一共重试8次,总时间大概5.1s左右(这个5.1s是怎么算出来的?麻烦知道的告诉一下)
添加请求
然后你就可以很简单的向 BulkProcessor
中添加请求了:
bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
关闭批处理工具
当所有文档全部加载到 BulkProcessor
之后,可以使用 awaitClose
或 close
方法关闭它:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
如果设置了 flushInterval
,上面两个方法都会刷新剩余的文档并且禁用定时刷新任务。
如果开启了并发请求策略,awaitClose
方法会根据你所指定的时间作出等待,当规定时间内所有批处理请求全部完成时,会返回 true
;
如果在规定时间内,仍有批处理请求未完成,则会返回 false
。
close
方法不会等待批处理请求是否完成,而是直接取消剩余的所有请求。
在测试中使用批处理工具
如果你正使用 Elasticsearch 进行测试并且使用的是 BulkProcessor
来批量添加数据,那么最好将并发请求数量设置为 0
,
这样批处理请求将会以同步的方式执行:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// 添加请求
bulkProcessor.add(/* Your requests */);
// 刷新剩余请求
bulkProcessor.flush();
// 如果你不在需要它了则关闭掉
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// 现在可以开始检索啦!
client.prepareSearch().get();
全局参数
全局参数可以在 BulkRequest 和 BulkProcessor 上指定,类似于REST API。 这些全局参数用作默认值,可以被每个子请求的本地参数所覆盖。 有一些参数必须在子请求添加之前被设置, - index, type - ,并且必须在 BulkRequest 或 BulkProcessor 创建期间指定它们。 也有一些是可选的 - pipeline, routing - ,这些可以在批量发送之前的任意时间点指定。
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
processor.add(new IndexRequest() (1)
.source(XContentType.JSON, "user", "some user"));
processor.add(new IndexRequest("blogs", "post_type", "1") (2)
.source(XContentType.JSON, "title", "some title"));
}
1 | BulkRequest 的全局参数将应用于子请求 |
2 | 子请求中的本地管道参数将覆盖BulkRequest中的全局参数 |
BulkRequest request = new BulkRequest();
request.pipeline("globalId");
request.add(new IndexRequest("test", "doc", "1")
.source(XContentType.JSON, "field", "bulk1")
.setPipeline("perIndexId")); (1)
request.add(new IndexRequest("test", "doc", "2")
.source(XContentType.JSON, "field", "bulk2")); (2)
1 | 子请求中的本地管道参数将覆盖BulkRequest中的全局参数 |
2 | BulkRequest 的全局参数将应用于子请求 |
通过条件更新文档
updateByQuery
最简单的用法是在不更改源的情况下更新同一个index中的所有文档。
这种用法还可以获取新的属性或其它网络映射的改变。
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();
调用 updateByQuery
方法会优先获取index的快照,然后使用`internal`版本索引所有的文档。
当文档正在建立索引时,如果快照版本发生了改变,那么这个文档的版本会发生冲突。 |
当版本匹配时, updateByQuery
方法会更新文档并增加版本号。
所有的修改和查询都失败的时候, updateByQuery
方法就会停止。
这些失败的原因可以从 BulkByScrollResponse#getIndexingFailures
中获得。
但是所有成功的更新操作都会保留并且不会回滚。当第一次发生失败而导致终止时,响应会包含批量请求生成的所有失败消息。
为了防止版本冲突而导致 updateByQuery
方法停止,请设置 abortOnVersionConflict(false)
。
第一个例子就是这样做的,因为这个例子是想获取网络映射的修改,并且版本冲突意味着冲突文档在 updateByQuery
的开始和尝试更新修改文档之间更新。这样很好,因为修改操作会获取更新后的网络映射。
UpdateByQueryRequestBuilder
API 支持过滤更新后的文档,并且可以限制更新的数量,还可以通过脚本更新文档:
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.filter(QueryBuilders.termQuery("level", "awesome"))
.size(1000)
.script(new Script(ScriptType.INLINE,
"ctx._source.awesome = 'absolutely'",
"painless",
Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();
UpdateByQueryRequestBuilder
还支持直接获取用于查询文档的语句。
你可以用它来改变默认的滚动数量,或者修改匹配文档的请求。
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.source()
.setSize(500);
BulkByScrollResponse response = updateByQuery.get();
您还可以将 size
和排序结合使用,来限制需要更新的文档:
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.size(100)
.source()
.addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();
除了修改文档的 _source
字段之外,您还可以使用脚本来做其它操作,比如 Update API:
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.script(new Script(
ScriptType.INLINE,
"if (ctx._source.awesome == 'absolutely') {"
+ " ctx.op='noop'"
+ "} else if (ctx._source.awesome == 'lame') {"
+ " ctx.op='delete'"
+ "} else {"
+ "ctx._source.awesome = 'absolutely'}",
"painless",
Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();
与 Update API 一样,您可以通过设置 ctx.op
来改变执行的操作:
noop
-
如果这个脚本不会修改任何文档,可以设置
ctx.op = "noop"
。updateByQuery
操作会从修改中忽略掉这个文档。 这个行为会让响应体中的noop
计数器增加。 delete
-
如果这个脚本一定会删除文档,可以设置
ctx.op = "delete"
。 这样在响应体中的deleted
计数器会有显示。
ctx.op
设置其它值会报错。 ctx
设置其它字段也会报错。
这个API不允许移动它能接触到的文档,只允许修改source。这是故意这样设计的!我们规定不允许从原始位置将文档移除。
你也可以一次在多个索引和类型上执行这些操作,类似于 search API :
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();
如果你提供了 routing
值,则该进程会将路由值复制到滚动查询中,从而将进程限制为匹配该路由值的分片:
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();
updateByQuery
也可以通过指定一个像这样的 pipeline
来使用 ingest 节点:
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();
使用任务 API
你可以使用 Task API 来获取所有正在运行中的 update-by-query 请求状态:
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
.setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
TaskId taskId = info.getTaskId();
BulkByScrollTask.Status status =
(BulkByScrollTask.Status) info.getStatus();
// do stuff
}
使用上面显示的 TaskId
您可以直接地找到task:
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
取消任务 API
任何 Update By Query 可以使用 Task Cancel API 来取消:
// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks()
.setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks()
.setTaskId(taskId).get().getTasks();
使用 list tasks
API 可以查询 taskId
的值。
取消请求通常非常快,但也需要几秒钟。任务状态API会继续列出任务,直到取消完成。
二次节流
使用 _rethrottle
API 可以修改正在运行的 requests_per_second
值:
RethrottleAction.INSTANCE.newRequestBuilder(client)
.setTaskId(taskId)
.setRequestsPerSecond(2.0f)
.get();
使用 list tasks
API 可以查询 taskId
的值。
与 updateByQuery
一样,requests_per_second
可以设置成任何正浮点值来设置throttle的级别,或者使用 Float.POSITIVE_INFINITY
来禁止 throttling。
requests_per_second
值可以加速进程并立刻生效。
为防止滚动超时,要在完成当前批处理后设置 requests_per_second
来减慢进程。
重新建立索引
查看 reindex API。
BulkByScrollResponse response =
ReindexAction.INSTANCE.newRequestBuilder(client)
.source("source_index")
.destination("target_index")
.filter(QueryBuilders.matchQuery("category", "xzy")) (1)
.get();
1 | 可以提供一个查询语句来筛选 source index 到 target index 的文档。 |