快速入门
本章将向您介绍,如何快速的上手底层REST客户端,从获取artifact并在应用中使用它。
Maven仓库
底层Java REST客户端已经托管在
Maven中央仓库
中。最低支持的Java版本为 1.8。
底层REST客户端的发布周期与Elasticsearch相同。
客户端版本根据Elasticsearch版本而来,第一个版本是 5.0.0-alpha4。
客户端和它正在进行通讯的Elasticsearch版本之间没有关系。
底层REST客户端与Elasticsearch的所有版本都兼容。
如果您正在寻找SNAPSHOT版本,可以在 Elastic Maven Snapshot仓库 中找到。
依赖
底层Java REST客户端内部使用 Apache Http Async Client 来发送http请求。 它会依赖下面这些包,也就是异步http客户端以及它自身传递过来的依赖项:
-
org.apache.httpcomponents:httpasyncclient
-
org.apache.httpcomponents:httpcore-nio
-
org.apache.httpcomponents:httpclient
-
org.apache.httpcomponents:httpcore
-
commons-codec:commons-codec
-
commons-logging:commons-logging
Shading
为了防止版本冲突,可以对依赖进行shade处理,然后将其打包到单独的jar包中(有时称其为 "uber JAR" 或 "fat JAR")。 对依赖进行shade处理包括获取它的内容(资源文件以及java class文件)然后重命名后将其放入底层Java REST客户端相同的JAR包里。 通过Gradle以及Maven的第三方插件都可以进行shade处理。
需要注意的是,对JAR包进行的shade处理也有影响。比如,对Commons Logging进行shade处理,意味着第三方后台日志也需要进行shade处理。
Maven 配置
这里是一个使用maven时的
Shade
插件配置,将下面的内容添加到 pom.xml 文件中:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>hidden.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.logging</pattern>
<shadedPattern>hidden.org.apache.logging</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.codec</pattern>
<shadedPattern>hidden.org.apache.commons.codec</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.logging</pattern>
<shadedPattern>hidden.org.apache.commons.logging</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Gradle 配置
这里是一个使用gradle时的
ShadowJar
插件配置,将下面的内容添加到 build.gradle 文件中:
shadowJar {
relocate 'org.apache.http', 'hidden.org.apache.http'
relocate 'org.apache.logging', 'hidden.org.apache.logging'
relocate 'org.apache.commons.codec', 'hidden.org.apache.commons.codec'
relocate 'org.apache.commons.logging', 'hidden.org.apache.commons.logging'
}
初始化
RestClient 实例可以通过其对应的 RestClientBuilder 构建,使用 RestClient#builder(HttpHost…) 静态方法即可。
仅需要与之进行通讯的主机地址作为参数,以
HttpHost
实例来提供,如下所示:
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")).build();
RestClient 类是县城安全的,理想情况下它的生命周期应该与使用它的应用相同。
在你不再使用的时候,一定要及时关闭,这样才能释放它所占用的系统资源、底层http客户端实例及其线程。
restClient.close();
RestClientBuilder 同时允许在构建 RestClient 实例的时候,配置下列可选参数:
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
builder.setDefaultHeaders(defaultHeaders); (1)
| 1 | 针对所有请求设置默认的请求头,避免在每一个请求中手动的设置它们 |
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
builder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
(1)
}
});
| 1 | 设置一个监听器,在节点每次出现故障时收到通知,并做出相应的处理。需要启用故障嗅探功能。 |
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS); (1)
| 1 | 设置节点选择器来过滤客户端将要发送请求的节点,也包括客户端本身的节点。在嗅探功能开启时,这会有助于避免向主节点发送请求。默认情况下,客户端会向已配置的所有节点发送请求。 |
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
builder.setRequestConfigCallback(
new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setSocketTimeout(10000); (1)
}
});
| 1 | 设置针对默认请求配置修改后的回调函数(例如请求超时、身份认证、或者是其它在 org.apache.http.client.config.RequestConfig.Builder 中允许设置的值) |
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setProxy(
new HttpHost("proxy", 9000, "http")); (1)
}
});
| 1 | 设置针对http客户端配置修改后的回调函数(例如ssl的加密,或者是其它在 org.apache.http.impl.nio.client.HttpAsyncClientBuilder 中允许设置的值) |
执行请求
在 RestClient 被创建后,可以通过执行 performRequest 或 performRequestAsync 方法发送请求。
performRequest 是同步的,当请求成功时会阻塞线程并返回 Response,当请求失败的时候会抛出异常。
而 performRequestAsync 是异步的,可以提前传入一个 ResponseListener 参数,
在请求成功后会用 Response 作为参数调用该函数,在失败后会用 Exception 作为参数调用。
这是同步执行的代码:
Request request = new Request(
"GET", (1)
"/"); (2)
Response response = restClient.performRequest(request);
| 1 | HTTP请求类型 (GET, POST, HEAD, 等等) |
| 2 | 服务器地址 |
然后这是异步的:
Request request = new Request(
"GET", (1)
"/"); (2)
Cancellable cancellable = restClient.performRequestAsync(request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
(3)
}
@Override
public void onFailure(Exception exception) {
(4)
}
});
| 1 | HTTP请求类型 (GET, POST, HEAD, 等等) |
| 2 | 服务器地址 |
| 3 | 响应处理 |
| 4 | 失败处理 |
你可以像这样向请求对象中添加参数:
request.addParameter("pretty", "true");
你可以像这样将请求参数设置为任意的 HttpEntity:
request.setEntity(new NStringEntity(
"{\"json\":\"text\"}",
ContentType.APPLICATION_JSON));
为 HttpEntity 指定 ContentType 很重要,因为在设置请求头的时候会用到它,这样 Elasticsearch 才能正确的进行解析。
|
你也可以将其设置成 String 类型,因为json的默认 ContentType 是 application/json。
request.setJsonEntity("{\"json\":\"text\"}");
请求的可选配置
RequestOptions 类会将请求的部分内容保存下来,这些内容可以在同一应用下的多个请求之间共享。
可以创建一个单例并在所有请求之间共享它:
private static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer " + TOKEN); (1)
builder.setHttpAsyncResponseConsumerFactory( (2)
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
| 1 | 为所有请求统一的添加headers |
| 2 | 定义响应的消费者 |
addHeader 用于身份授权所需的headers,还有在Elasticsearch之前设置代理。
这里不用设置 Content-Type,因为客户端会自动从 HttpEntity 中读取。
你可以设置 NodeSelector 来控制哪些节点会接受哪些请求。
推荐使用 NodeSelector.SKIP_DEDICATED_MASTERS。
你还可以为异步响应的缓存来自定义消费者。 默认情况下,消费者会对JVM堆上100MB大小响应进行缓存。 如果响应超过了这个限制,则请求就会失败。 当然,如果在你的环境中不允许像上面的示例(30G缓存)一样设置的话,你也可以降低这个最大值。
在创建单例后,你就可以在请求的时候使用它了:
request.setOptions(COMMON_OPTIONS);
你也可以根据每个请求自定义这些选项。 例如,下面就是一个添加header的例子:
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
options.addHeader("cats", "knock things off of other things");
request.setOptions(options);
多个异步操作的并行处理
客户端更愿意并行的处理多个操作。
下面的示例就并行的对很多文档进行索引。
在真实项目中,你可以需要使用 _bulk API 替换它,这个例子仅仅是用来展示的。
final CountDownLatch latch = new CountDownLatch(documents.length);
for (int i = 0; i < documents.length; i++) {
Request request = new Request("PUT", "/posts/doc/" + i);
//let's assume that the documents are stored in an HttpEntity array
request.setEntity(documents[i]);
restClient.performRequestAsync(
request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
(1)
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
(2)
latch.countDown();
}
}
);
}
latch.await();
| 1 | 处理返回的响应 |
| 2 | 处理由于通信异常,或者是响应码异常而返回的错误 |
取消异步请求
performRequestAsync 方法会返回一个 Cancellable(可以取消)的对象,它会暴露一个名为 cancel 的方法。
调用这个方法可以取消正在进行的请求。
取消请求操作会在底层http客户端中取消http请求。
而在服务器端,这一步操作不会自动地转换为取消操作,也就是说还需要通过API具体手动实现。
Cancellable 的实例是可选项,如果不需要的话,也可以忽略它,这不会影响安全。
一个典型用例就是将它与类似 Rx Java 或者是 Kotlin 中的 suspendCancellableCoRoutine 一起使用。
取消不再需要的请求可以避免 Elasticsearch 进行不必要的负担。
Request request = new Request("GET", "/posts/_search");
Cancellable cancellable = restClient.performRequestAsync(
request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
(1)
}
@Override
public void onFailure(Exception exception) {
(2)
}
}
);
cancellable.cancel();
| 1 | 处理返回的响应,防止在请求被取消之前就已经完成 |
| 2 | 处理返回的异常,很大可能是取消请求所引起的 CancellationException |
读取响应
Response 对象是执行同步 performRequest 方法或者是作为 ResponseListener#onSuccess(Response) 的参数所返回的,
其中包含http客户端以及一些额外的信息。
Response response = restClient.performRequest(new Request("GET", "/"));
RequestLine requestLine = response.getRequestLine(); (1)
HttpHost host = response.getHost(); (2)
int statusCode = response.getStatusLine().getStatusCode(); (3)
Header[] headers = response.getHeaders(); (4)
String responseBody = EntityUtils.toString(response.getEntity()); (5)
| 1 | 关于已经执行的请求信息 |
| 2 | 响应返回的Host |
| 3 | 响应状态行,你可以从中查看状态码 |
| 4 | 响应headers,也可以通过 getHeader(String) 方法根据header的名字查看 |
| 5 | 响应体被封装在 org.apache.http.HttpEntity 对象中 |
在执行请求的过程中,以下情况会抛出异常(或者是在 ResponseListener#onFailure(Exception) 中以参数的形式接受到):
IOException-
通信问题 (例如 SocketTimeoutException)
ResponseException-
接受到响应信息,但是状态码提示错误(非
2xx)。ResponseException异常是从有效http响应中得到的,因此其中可以获取返回的Response对象,让你读取返回的响应信息。
对于 404 状态码的 HEAD 请求是 不会 抛出 ResponseException 异常的,因为这是一个预期的 HEAD 响应,只不过表示未找到资源。
其它所有HTTP方法(比如 GET)针对 404 状态码的响应都会抛出 ResponseException 异常,除非使用 ignore 参数将 404 排除在外了。
ignore 是一个特殊的客户端参数,它不会发送到 Elasticsearch,并且包含一个以逗号分割的错误状态码列表。
它可以控制是否将某些错误状态码视为预期响应,而不是抛出异常。
例如,有时候一些get api在返回 404 状态码的时候,代表这没有找到该文档,
这时的响应体中不包含错误信息,而是正常返回的get api响应,只不过因为没有找到而没有带上文档。
|
请注意,底层客户端不会暴露任何对json的构造以及解析工具。 用户可以自由的选择他们喜欢的库来实现。
底层的Apache Async Http Client根据不同的 org.apache.http.HttpEntity 实现而提供不同格式(流、字节数组、字符串等)的请求体。
至于读取响应体,HttpEntity#getContent 方法可以非常方便的从之前返回的缓存响应体中读取 InputStream。
作为替代方案,可以提供一个自定义的 org.apache.http.nio.protocol.HttpAsyncResponseConsumer 用来控制字节的读取和缓冲方式。
日志
Java REST 客户端使用的日志库和 Apache Async Http Client 使用的相同,都是 Apache Commons Logging,同时它还支持许多流行的日志实现方案。
启用Jar包中包含的日志,其中 org.elasticsearch.client 包对应客户端本身,org.elasticsearch.client.sniffer 包对应嗅探功能。
还可以对请求启用日志追踪功能,以curl格式记录每个请求及其响应。
这在debug的时候很方便,比如执行一个请求后,查看它和之前的响应是否相同。
启用日志追踪 tracer 包来追踪日志记录,并将之打印出来。
注意这种类型的日志对性能有影响,不建议在生产环境中启用,而是仅在需要的时候临时开启。