批处理
Bulk API 允许在一次单独的请求中索引或删除多个文档。 下面是一个用法示例:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
使用批处理工具
BulkProcessor
类提供了一个简单的 API 来自动执行批处理操作,它可以设置的规则有请求数、请求数据大小、周期。
要使用它,首先需要创建一个 BulkProcessor
实例:
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, (1)
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } (2)
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } (3)
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } (4)
})
.setBulkActions(10000) (5)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) (6)
.setFlushInterval(TimeValue.timeValueSeconds(5)) (7)
.setConcurrentRequests(1) (8)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) (9)
.build();
1 | 添加Elasticsearch client |
2 | 在批处理执行之前被调用。你可以使用 request.numberOfActions() 方法来获取本次批处理的数量 |
3 | 在批处理执行之后被调用。你可以使用 response.hasFailures() 方法可以查看失败的请求 |
4 | 在批处理执行失败,或抛出 Throwable 的时候被调用 |
5 | 设置每次累计达10000个请求就立刻执行批处理 |
6 | 设置每次数据到达5MB的大小就立刻执行批处理 |
7 | 设置无论有几条请求,总之每隔5秒就立刻执行批处理 |
8 | 设置执行批处理请求时允许的最大并发数。0代表每次只执行一个请求,1代表允许1条并发请求 |
9 | 设置自定义补偿策略:第一次等待100ms,之后每次的间隔时间翻倍,一共重试3次。当批处理执行的时候抛出 EsRejectedExecutionException 异常的时候,会启动重试策略。原因可能是硬件资源太低。如想关闭该策略,可以传入 BackoffPolicy.noBackoff() |
默认情况,BulkProcessor
的配置如下:
-
1000
个请求 -
5mb
大小 -
没有间隔
-
允许1个并发请求
-
第一次等待50ms重试,之后每次的间隔时间翻倍,一共重试8次,总时间大概5.1s左右(这个5.1s是怎么算出来的?麻烦知道的告诉一下)
添加请求
然后你就可以很简单的向 BulkProcessor
中添加请求了:
bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
关闭批处理工具
当所有文档全部加载到 BulkProcessor
之后,可以使用 awaitClose
或 close
方法关闭它:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
如果设置了 flushInterval
,上面两个方法都会刷新剩余的文档并且禁用定时刷新任务。
如果开启了并发请求策略,awaitClose
方法会根据你所指定的时间作出等待,当规定时间内所有批处理请求全部完成时,会返回 true
;
如果在规定时间内,仍有批处理请求未完成,则会返回 false
。
close
方法不会等待批处理请求是否完成,而是直接取消剩余的所有请求。
在测试中使用批处理工具
如果你正使用 Elasticsearch 进行测试并且使用的是 BulkProcessor
来批量添加数据,那么最好将并发请求数量设置为 0
,
这样批处理请求将会以同步的方式执行:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// 添加请求
bulkProcessor.add(/* Your requests */);
// 刷新剩余请求
bulkProcessor.flush();
// 如果你不在需要它了则关闭掉
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// 现在可以开始检索啦!
client.prepareSearch().get();
全局参数
全局参数可以在 BulkRequest 和 BulkProcessor 上指定,类似于REST API。 这些全局参数用作默认值,可以被每个子请求的本地参数所覆盖。 有一些参数必须在子请求添加之前被设置, - index, type - ,并且必须在 BulkRequest 或 BulkProcessor 创建期间指定它们。 也有一些是可选的 - pipeline, routing - ,这些可以在批量发送之前的任意时间点指定。
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
processor.add(new IndexRequest() (1)
.source(XContentType.JSON, "user", "some user"));
processor.add(new IndexRequest("blogs", "post_type", "1") (2)
.source(XContentType.JSON, "title", "some title"));
}
1 | BulkRequest 的全局参数将应用于子请求 |
2 | 子请求中的本地管道参数将覆盖BulkRequest中的全局参数 |
BulkRequest request = new BulkRequest();
request.pipeline("globalId");
request.add(new IndexRequest("test", "doc", "1")
.source(XContentType.JSON, "field", "bulk1")
.setPipeline("perIndexId")); (1)
request.add(new IndexRequest("test", "doc", "2")
.source(XContentType.JSON, "field", "bulk2")); (2)
1 | 子请求中的本地管道参数将覆盖BulkRequest中的全局参数 |
2 | BulkRequest 的全局参数将应用于子请求 |