文档 API

这部分讲的是这些CRUD API:

所有的CRUD API 都是单index API。 index 参数只可以接受单独的index,或者是使用 alias 来指向这个index。

对JSON文档建立索引

Index API 可以在指定的index中将一个结构化的JSON文档建立索引,并使其可以被查询到。

生成JSON格式的文档

一般有下面几种方式来生成一个JSON文档:

  • 手动使用 byte[]String 类型拼接一个JSON

  • 使用 Map 类型会自动转换成对应的JSON

  • 使用第三方库将你的Bean序列化成Json格式,例如 Jackson 库等

  • 使用内置的 XContentFactory.jsonBuilder() 生成JSON

在内部,所有类型最终都会转换成 byte[] 类型(所以String类型最后也会转成 byte[] 类型)。 因此,如果你的对象已经是 byte[] 类型了,那么就直接使用它吧。 当然官方内置的 jsonBuilder 是一个高度优化后的工具,你可以使用它来直接构造 byte[]

手动拼接

没什么困难的,唯一需要注意的是通过 Date Format 对日期进行格式化。

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";
使用 Map

Map是一个键值对的集合,它可以等价于一个JSON结构:

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
序列化 JavaBean

你可以使用 Jackson 将你的Bean序列化成JSON。 请将 Jackson Databind 加入你的项目中。 然后你就可以使用 ObjectMapper 来序列化你的Bean了:

import com.fasterxml.jackson.databind.*;

// 实例化一个 json mapper
ObjectMapper mapper = new ObjectMapper(); // 只创建一次,重复使用

// 生成 json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用 Elasticsearch 助手

Elasticsearch 提供了一个内置的助手来帮助你生成JSON内容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,你也可以使用 startArray(String)endArray() 方法来添加一个数组。 顺便说一下,field 方法可以接受很多对象类型。 你可以直接传一些 number,date 或者是其它 XContentBuilder 对象。

如果你想查看生成的JSON内容,可以使用 Strings.toString() 方法。

import org.elasticsearch.common.Strings;

String json = Strings.toString(builder);

对文档建立索引

下面就是将一个JSON文档建立索引的例子,其中index为twitter,type为 _doc ,id的值是1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "_doc", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,你也可以对一个JSON字符串类型的文档建立索引,并且 ID 也不是必须的:

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "_doc")
        .setSource(json, XContentType.JSON)
        .get();

结果可以在 IndexResponse 中查看:

// Index 名称
String _index = response.getIndex();
// Type 名称
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (如果你是新建的这个文档,会得到1)
long _version = response.getVersion();
// 当前实例的存储状态
RestStatus status = response.status();

有关index操作的更多信息,请查看 REST index

根据ID获取文档

Get API 可以根据 id 从 index 中获取JSON文档。 下面就是一个获取JSON格式文档的例子,它的条件是index为twitter,type是 _doc ,id是1:

GetResponse response = client.prepareGet("twitter", "_doc", "1").get();

有关Get操作的更多信息,请查看 REST get

删除文档

Delete API 可以在指定的 index 中根据ID删除一个JSON类型的文档。 下面就是一个删除JSON文档的例子,它的index是twitter,type是 _doc ,id是1:

DeleteResponse response = client.prepareDelete("twitter", "_doc", "1").get();

有关Delete操作的更多信息,请查看 delete API

根据条件删除文档

Delete By Query API 可以根据查询的结果删除一组文档:

BulkByScrollResponse response =
  DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) (1)
    .source("persons") (2)
    .get(); (3)
long deleted = response.getDeleted(); (4)
1 query
2 index
3 执行操作
4 删除文档的数量

由于这个操作可能会运行很长时间,如果你希望异步执行,可以使用 execute 代替 get 来执行,并提供一个listener,比如:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) (1)
    .source("persons") (2)
    .execute(new ActionListener<BulkByScrollResponse>() { (3)
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted(); (4)
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
1 query
2 index
3 执行操作
4 删除文档的数量

更新文档

你可以创建一个 UpdateRequest 并发送给client:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("_doc");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

或者也可以使用 prepareUpdate() 方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script(
            "ctx._source.gender = \"male\"", (1)
            ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder() (2)
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();
1 你的脚本。它也可以是存储中本地文件中,这种情况下,需要使用 ScriptService.ScriptType.FILE
2 将要和已有文档合并的文档。

Note that you can’t provide both script and doc.

通过脚本更新

Update API 允许通过一个脚本来更新文档:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

合并文档

Update API 也可以传入一部分文档,并将之合并(简单的递归合并,对象的内部合并,替换键值对和数组)到一个已存在的文档中。 例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

更新并插入

Update API 也支持 upsert 。如果该文档不存在,则会对该文档建立索引:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); (1)
client.update(updateRequest).get();
1 如果文档不存在, indexRequest 中的元素就会被建立索引

如果 index/_doc/1 这个文档已经存在了,在这个操作之后我们会有一个这样的文档:

{
    "name"  : "Joe Dalton",
    "gender": "male" (1)
}
1 这个字段是通过 update request 添加的

如果不存在,我们会得到一个新的文档:

{
    "name" : "Joe Smith",
    "gender": "male"
}

获取多个文档

Multi get API 允许通过 indexid 获取多个文档:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "_doc", "1") (1)
    .add("twitter", "_doc", "2", "3", "4") (2)
    .add("another", "_doc", "foo") (3)
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { (4)
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) { (5)
        String json = response.getSourceAsString(); (6)
    }
}
1 通过单独的id获取
2 在同一个index中,可以通过多个id获取
3 也可以从其它的index中获取文档
4 遍历结果集
5 查看文档是否存在
6 获取 _source 字段

有关Multi get操作的更多信息,请查看REST multi get

批处理

Bulk API 允许在一次单独的请求中索引或删除多个文档。 下面是一个用法示例:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

使用批处理工具

BulkProcessor 类提供了一个简单的 API 来自动执行批处理操作,它可以设置的规则有请求数、请求数据大小、周期。

要使用它,首先需要创建一个 BulkProcessor 实例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client, (1)
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } (2)

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } (3)

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } (4)
        })
        .setBulkActions(10000) (5)
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) (6)
        .setFlushInterval(TimeValue.timeValueSeconds(5)) (7)
        .setConcurrentRequests(1) (8)
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) (9)
        .build();
1 添加Elasticsearch client
2 在批处理执行之前被调用。你可以使用 request.numberOfActions() 方法来获取本次批处理的数量
3 在批处理执行之后被调用。你可以使用 response.hasFailures() 方法可以查看失败的请求
4 在批处理执行失败,或抛出 Throwable 的时候被调用
5 设置每次累计达10000个请求就立刻执行批处理
6 设置每次数据到达5MB的大小就立刻执行批处理
7 设置无论有几条请求,总之每隔5秒就立刻执行批处理
8 设置执行批处理请求时允许的最大并发数。0代表每次只执行一个请求,1代表允许1条并发请求
9 设置自定义补偿策略:第一次等待100ms,之后每次的间隔时间翻倍,一共重试3次。当批处理执行的时候抛出 EsRejectedExecutionException 异常的时候,会启动重试策略。原因可能是硬件资源太低。如想关闭该策略,可以传入 BackoffPolicy.noBackoff()

默认情况,BulkProcessor 的配置如下:

  • 1000 个请求

  • 5mb 大小

  • 没有间隔

  • 允许1个并发请求

  • 第一次等待50ms重试,之后每次的间隔时间翻倍,一共重试8次,总时间大概5.1s左右(这个5.1s是怎么算出来的?麻烦知道的告诉一下)

添加请求

然后你就可以很简单的向 BulkProcessor 中添加请求了:

bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));

关闭批处理工具

当所有文档全部加载到 BulkProcessor 之后,可以使用 awaitCloseclose 方法关闭它:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

如果设置了 flushInterval ,上面两个方法都会刷新剩余的文档并且禁用定时刷新任务。 如果开启了并发请求策略,awaitClose 方法会根据你所指定的时间作出等待,当规定时间内所有批处理请求全部完成时,会返回 true ; 如果在规定时间内,仍有批处理请求未完成,则会返回 falseclose 方法不会等待批处理请求是否完成,而是直接取消剩余的所有请求。

在测试中使用批处理工具

如果你正使用 Elasticsearch 进行测试并且使用的是 BulkProcessor 来批量添加数据,那么最好将并发请求数量设置为 0 , 这样批处理请求将会以同步的方式执行:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// 添加请求
bulkProcessor.add(/* Your requests */);

// 刷新剩余请求
bulkProcessor.flush();

// 如果你不在需要它了则关闭掉
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// 现在可以开始检索啦!
client.prepareSearch().get();

全局参数

全局参数可以在 BulkRequest 和 BulkProcessor 上指定,类似于REST API。 这些全局参数用作默认值,可以被每个子请求的本地参数所覆盖。 有一些参数必须在子请求添加之前被设置, - index, type - ,并且必须在 BulkRequest 或 BulkProcessor 创建期间指定它们。 也有一些是可选的 - pipeline, routing - ,这些可以在批量发送之前的任意时间点指定。

try (BulkProcessor processor = initBulkProcessorBuilder(listener)
        .setGlobalIndex("tweets")
        .setGlobalType("_doc")
        .setGlobalRouting("routing")
        .setGlobalPipeline("pipeline_id")
        .build()) {


    processor.add(new IndexRequest() (1)
        .source(XContentType.JSON, "user", "some user"));
    processor.add(new IndexRequest("blogs", "post_type", "1") (2)
        .source(XContentType.JSON, "title", "some title"));
}
1 BulkRequest 的全局参数将应用于子请求
2 子请求中的本地管道参数将覆盖BulkRequest中的全局参数
BulkRequest request = new BulkRequest();
request.pipeline("globalId");

request.add(new IndexRequest("test", "doc", "1")
    .source(XContentType.JSON, "field", "bulk1")
    .setPipeline("perIndexId")); (1)

request.add(new IndexRequest("test", "doc", "2")
    .source(XContentType.JSON, "field", "bulk2")); (2)
1 子请求中的本地管道参数将覆盖BulkRequest中的全局参数
2 BulkRequest 的全局参数将应用于子请求

通过条件更新文档

updateByQuery 最简单的用法是在不更改源的情况下更新同一个index中的所有文档。 这种用法还可以获取新的属性或其它网络映射的改变。

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

调用 updateByQuery 方法会优先获取index的快照,然后使用`internal`版本索引所有的文档。

当文档正在建立索引时,如果快照版本发生了改变,那么这个文档的版本会发生冲突。

当版本匹配时, updateByQuery 方法会更新文档并增加版本号。

所有的修改和查询都失败的时候, updateByQuery 方法就会停止。 这些失败的原因可以从 BulkByScrollResponse#getIndexingFailures 中获得。 但是所有成功的更新操作都会保留并且不会回滚。当第一次发生失败而导致终止时,响应会包含批量请求生成的所有失败消息。

为了防止版本冲突而导致 updateByQuery 方法停止,请设置 abortOnVersionConflict(false) 。 第一个例子就是这样做的,因为这个例子是想获取网络映射的修改,并且版本冲突意味着冲突文档在 updateByQuery 的开始和尝试更新修改文档之间更新。这样很好,因为修改操作会获取更新后的网络映射。

UpdateByQueryRequestBuilder API 支持过滤更新后的文档,并且可以限制更新的数量,还可以通过脚本更新文档:

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE,
        "ctx._source.awesome = 'absolutely'",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder 还支持直接获取用于查询文档的语句。 你可以用它来改变默认的滚动数量,或者修改匹配文档的请求。

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source()
    .setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您还可以将 size 和排序结合使用,来限制需要更新的文档:

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .size(100)
    .source()
    .addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了修改文档的 _source 字段之外,您还可以使用脚本来做其它操作,比如 Update API:

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely') {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API 一样,您可以通过设置 ctx.op 来改变执行的操作:

noop

如果这个脚本不会修改任何文档,可以设置 ctx.op = "noop"updateByQuery 操作会从修改中忽略掉这个文档。 这个行为会让响应体中的 noop 计数器增加。

delete

如果这个脚本一定会删除文档,可以设置 ctx.op = "delete" 。 这样在响应体中的 deleted 计数器会有显示。

ctx.op 设置其它值会报错。 ctx 设置其它字段也会报错。

这个API不允许移动它能接触到的文档,只允许修改source。这是故意这样设计的!我们规定不允许从原始位置将文档移除。

你也可以一次在多个索引和类型上执行这些操作,类似于 search API :

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果你提供了 routing 值,则该进程会将路由值复制到滚动查询中,从而将进程限制为匹配该路由值的分片:

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery 也可以通过指定一个像这样的 pipeline 来使用 ingest 节点:

UpdateByQueryRequestBuilder updateByQuery =
  UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

使用任务 API

你可以使用 Task API 来获取所有正在运行中的 update-by-query 请求状态:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status =
        (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面显示的 TaskId 您可以直接地找到task:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

取消任务 API

任何 Update By Query 可以使用 Task Cancel API 来取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks()
    .setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks()
    .setTaskId(taskId).get().getTasks();

使用 list tasks API 可以查询 taskId 的值。

取消请求通常非常快,但也需要几秒钟。任务状态API会继续列出任务,直到取消完成。

二次节流

使用 _rethrottle API 可以修改正在运行的 requests_per_second 值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用 list tasks API 可以查询 taskId 的值。

updateByQuery 一样,requests_per_second 可以设置成任何正浮点值来设置throttle的级别,或者使用 Float.POSITIVE_INFINITY 来禁止 throttling。 requests_per_second 值可以加速进程并立刻生效。 为防止滚动超时,要在完成当前批处理后设置 requests_per_second 来减慢进程。

重新建立索引

查看 reindex API

BulkByScrollResponse response =
    ReindexAction.INSTANCE.newRequestBuilder(client)
    .source("source_index")
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) (1)
    .get();
1 可以提供一个查询语句来筛选 source index 到 target index 的文档。