批处理

Bulk API 允许在一次单独的请求中索引或删除多个文档。 下面是一个用法示例:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

使用批处理工具

BulkProcessor 类提供了一个简单的 API 来自动执行批处理操作,它可以设置的规则有请求数、请求数据大小、周期。

要使用它,首先需要创建一个 BulkProcessor 实例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client, (1)
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } (2)

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } (3)

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } (4)
        })
        .setBulkActions(10000) (5)
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) (6)
        .setFlushInterval(TimeValue.timeValueSeconds(5)) (7)
        .setConcurrentRequests(1) (8)
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) (9)
        .build();
1 添加Elasticsearch client
2 在批处理执行之前被调用。你可以使用 request.numberOfActions() 方法来获取本次批处理的数量
3 在批处理执行之后被调用。你可以使用 response.hasFailures() 方法可以查看失败的请求
4 在批处理执行失败,或抛出 Throwable 的时候被调用
5 设置每次累计达10000个请求就立刻执行批处理
6 设置每次数据到达5MB的大小就立刻执行批处理
7 设置无论有几条请求,总之每隔5秒就立刻执行批处理
8 设置执行批处理请求时允许的最大并发数。0代表每次只执行一个请求,1代表允许1条并发请求
9 设置自定义补偿策略:第一次等待100ms,之后每次的间隔时间翻倍,一共重试3次。当批处理执行的时候抛出 EsRejectedExecutionException 异常的时候,会启动重试策略。原因可能是硬件资源太低。如想关闭该策略,可以传入 BackoffPolicy.noBackoff()

默认情况,BulkProcessor 的配置如下:

  • 1000 个请求

  • 5mb 大小

  • 没有间隔

  • 允许1个并发请求

  • 第一次等待50ms重试,之后每次的间隔时间翻倍,一共重试8次,总时间大概5.1s左右(这个5.1s是怎么算出来的?麻烦知道的告诉一下)

添加请求

然后你就可以很简单的向 BulkProcessor 中添加请求了:

bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));

关闭批处理工具

当所有文档全部加载到 BulkProcessor 之后,可以使用 awaitCloseclose 方法关闭它:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

如果设置了 flushInterval ,上面两个方法都会刷新剩余的文档并且禁用定时刷新任务。 如果开启了并发请求策略,awaitClose 方法会根据你所指定的时间作出等待,当规定时间内所有批处理请求全部完成时,会返回 true ; 如果在规定时间内,仍有批处理请求未完成,则会返回 falseclose 方法不会等待批处理请求是否完成,而是直接取消剩余的所有请求。

在测试中使用批处理工具

如果你正使用 Elasticsearch 进行测试并且使用的是 BulkProcessor 来批量添加数据,那么最好将并发请求数量设置为 0 , 这样批处理请求将会以同步的方式执行:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// 添加请求
bulkProcessor.add(/* Your requests */);

// 刷新剩余请求
bulkProcessor.flush();

// 如果你不在需要它了则关闭掉
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// 现在可以开始检索啦!
client.prepareSearch().get();

全局参数

全局参数可以在 BulkRequest 和 BulkProcessor 上指定,类似于REST API。 这些全局参数用作默认值,可以被每个子请求的本地参数所覆盖。 有一些参数必须在子请求添加之前被设置, - index, type - ,并且必须在 BulkRequest 或 BulkProcessor 创建期间指定它们。 也有一些是可选的 - pipeline, routing - ,这些可以在批量发送之前的任意时间点指定。

try (BulkProcessor processor = initBulkProcessorBuilder(listener)
        .setGlobalIndex("tweets")
        .setGlobalType("_doc")
        .setGlobalRouting("routing")
        .setGlobalPipeline("pipeline_id")
        .build()) {


    processor.add(new IndexRequest() (1)
        .source(XContentType.JSON, "user", "some user"));
    processor.add(new IndexRequest("blogs", "post_type", "1") (2)
        .source(XContentType.JSON, "title", "some title"));
}
1 BulkRequest 的全局参数将应用于子请求
2 子请求中的本地管道参数将覆盖BulkRequest中的全局参数
BulkRequest request = new BulkRequest();
request.pipeline("globalId");

request.add(new IndexRequest("test", "doc", "1")
    .source(XContentType.JSON, "field", "bulk1")
    .setPipeline("perIndexId")); (1)

request.add(new IndexRequest("test", "doc", "2")
    .source(XContentType.JSON, "field", "bulk2")); (2)
1 子请求中的本地管道参数将覆盖BulkRequest中的全局参数
2 BulkRequest 的全局参数将应用于子请求