使用Bulk Processor
Bulk Processor 类提供了一个简单的接口来自动执行批处理操作,它可以设置的规则有:请求数,请求数据大小,周期。
要使用它,首先要创建一个BulkProcessor实例:
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
//添加ES client
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
//在批处理之前执行,其中request.numberOfActions()方法可以获取批处理的数量
...
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
//在批处理之后执行,其中response.hasFailures()方法可以查看失败的请求
...
}
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
//在批处理执行失败,或异常的时候执行
...
}
})
//到达10000个请求就立刻执行批处理
.setBulkActions(10000)
//到达5MB的大小就立刻执行批处理
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
//每隔5秒就立刻执行批处理
.setFlushInterval(TimeValue.timeValueSeconds(5))
//设置执行批处理请求时允许的最大并发数。0代表每次只执行一个请求,1代表允许1条并发请求
.setConcurrentRequests(1)
//设置异常策略,间隔100ms重试一次,一共重试3次。当批处理执行的时候抛出EsRejectedExecutionException异常的时候,会启动重试策略。原因可能是硬件资源太低。如想关闭该策略,可以传入BackoffPolicy.noBackoff()
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
默认情况,BulkProcessor的配置如下:
-
1000个请求
-
5mb大小
-
没有间隔
-
允许1个并发请求
-
每50ms重试一次,一共重试8次,总时间大概5.1秒左右
添加请求
现在可以向Bulk Processor中添加请求了
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
关闭Bulk Processor
当确认所有的文档全部发送到BulkProcessor之后,可以使用awaitClose或close方法关闭它。
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或者
bulkProcessor.close();
如果设置了flushInterval,上面两个方法都可以发送剩余的文档并且禁用发送任务。 如果开启了并发请求策略,awaitClose方法会等待所指定的时间,当所有批处理请求全部完成时,会返回true, 如果在指定时间内,仍有批处理请求未完成,则会返回false。 close方法不会等待批处理请求是否完成,而是直接取消剩余的所有请求。
测试Bulk Processor
如果你正在测试ES并且使用的是BulkProcessor来批量添加数据,最好将并发请求数量设置为0,这样批处理请求将会以同步的方式执行:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// 添加请求
bulkProcessor.add(/* Your requests */);
// 发送所有剩余请求
bulkProcessor.flush();
// 在不需要的时候关闭bulkProcessor
bulkProcessor.close();
// 刷新indices
client.admin().indices().prepareRefresh().get();
// 现在可以开始查询了!
client.prepareSearch().get();