通过脚本修改接口

updateByQuery 最简单的用法是更新单个索引中的每个文档而不更改源文件。这种用法还可以获取新的属性或其它映射。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

调用 updateByQuery 方法会优先获取索引的快照,然后使用内部版本索引所有的文档。

当文档正在建立索引时,如果快照版本发生了改变,那么这个文档的版本会发生冲突。

当版本匹配时, updateByQuery 方法会更新文档并增加版本号。

所有的修改和查询都失败的时候, updateByQuery 方法就会停止。 这些失败的原因可以从 BulkByScrollResponse#getIndexingFailures 中获得。 但是所有成功的更新操作保留并且不会回滚。当第一次发生失败而导致终止时,响应会包含批量请求生成的所有失败消息。

为了防止版本冲突而导致 updateByQuery 方法停止,请设置 abortOnVersionConflict(false)。 第一个例子就是这样做的,因为这个例子是想获取映射的修改,并且版本冲突意味着冲突文档在 updateByQuery 的开始和尝试更新修改文档之间更新。这样很好,因为修改操作会获取更新的映射。

UpdateByQueryRequestBuilder 接口支持过滤更新的文档,并且可以限制更新的数量,还可以通过脚本更新文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder 还支持直接获取用于查询文档的语句。你可以用它来改变默认的滚动数量,或者修改匹配文档的请求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您还可以将条数和顺序结合使用,来限制需要更新的文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了修改文档的 _source 字段之外,您还可以使用脚本来修改操作,类似修改 Update接口

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely) {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update接口 一样,您可以设置 ctx.op 来改变执行的操作。

  • noop:如果脚本没有改变,可以设置 ctx.op = "noop"updateByQuery 会从修改列表中将它忽略,并且请求体中的 noop 会增加。

  • delete:如果脚本决定删除,可以设置 ctx.op = "delete" 。 请求体中的 delete 会有报告。

ctx.op 设置其它值会报错。 ctx 设置其它字段也会报错。

这个接口不允许移动接触到的文档,只允许修改它。它就是这么设计的!我们没有规定从原始位置将文档移除。

你也可以一次对多个索引和类型执行这些操作,类似于 Search接口

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果你提供路由,那么该进程会将该路由的值复制到滚动查询中,从而将进程限制为匹配该路由值的分片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery 也可以通过指定一个想这样的 pipeline 来使用 ingest 节点:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

用于任务接口

你可以使用任务接口来获取所有正在运行的 updateByQuery 请求的状态:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面显示的 TaskId ,您可以直接查找任务:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

用于取消任务接口

所有 updateByQuery 可以使用任务取消接口:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用 任务列表 接口可以查询 taskId 的值。

取消请求通常非常快,但也需要几秒钟。任务状态接口会继续列出任务,直到取消完成。

重新节流

使用 _rethrottle 接口修改正在运行修改的 requests_per_second 值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用 任务列表 接口可以查询 taskId 的值。

updateByQuery 一样,requests_per_second 可以设置任何正浮点值或 Float.POSITIVE_INFINITY 来设置节流级别。 加速进程的 requests_per_second 值立刻生效。要在完成当前批处理后减慢进程,可以设置 requests_per_second 来防止滚动。