Elasticsearch(三) API调用

Posted by ZhouJ000 on December 30, 2021

Elasticsearch(一) 入门
Elasticsearch(二) 搜索
Elasticsearch(三) API调用

Java Api

  • Java REST Client
    • Java Low Level REST Client
      • 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串
      • 兼容所有ES版本
    • Java High Level REST Client
      • 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api
      • 使用的版本需要保持和ES服务端的版本一致,否则会有版本问题
      • 官方推荐
  • Java Client

Low Level REST Client

引入maven

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>7.15.2</version>
</dependency>

简单的demo:

RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")).build();

Request request = new Request("GET", "/");
// request.setEntity(new NStringEntity("{\"json\":\"text\"}", ContentType.APPLICATION_JSON));
// request.setJsonEntity("{\"json\":\"text\"}");
// request.addParameter("key", "value");
// 同步
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println(responseBody);

restClient.close();

异步:

Cancellable cancellable = restClient.performRequestAsync(request,
    new ResponseListener() {
        @Override
        public void onSuccess(Response response) {
            // doSomething
        }

        @Override
        public void onFailure(Exception exception) {
            // doSomething
        }
});
// 取消,客户端终止http请求
// cancellable.cancel();

可以在创建RestClient时进行一些配置:

RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
// 全局的header,比如Authorization等
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
builder.setDefaultHeaders(defaultHeaders);
// 添加节点失败通知Listener
builder.setFailureListener(new RestClient.FailureListener() {
    @Override
    public void onFailure(Node node) {
        // doSomething
    }
});
// 设置超时
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
	@Override
	public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
		return requestConfigBuilder.setConnectTimeout(5000)
								   .setSocketTimeout(60000);
	}
});
// 验证
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "user-password"));
// 使者线程id
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
	@Override
	public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
		// httpClientBuilder.disableAuthCaching();
		return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
								.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
	}
});

RestClient restClient = builder.build();

可以再request上进行配置:

RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// Header
builder.addHeader("Authorization", TOKEN); 
// response consumer
builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
RequestOptions COMMON_OPTIONS = builder.build();

request.setOptions(COMMON_OPTIONS);

异步并发:

final CountDownLatch latch = new CountDownLatch(documents.length);
for (int i = 0; i < documents.length; i++) {
    Request request = new Request("PUT", "/posts/doc/" + i);
    request.setEntity(documents[i]);
    restClient.performRequestAsync(request,
            new ResponseListener() {
                @Override
                public void onSuccess(Response response) {
					// doSomething
                    latch.countDown();
                }

                @Override
                public void onFailure(Exception exception) {
                    // doSomething
                    latch.countDown();
                }
            }
    );
}
latch.await();

使用TLS:

// 第一种
Path trustStorePath = Paths.get("/path/to/truststore.p12");
KeyStore truststore = KeyStore.getInstance("pkcs12");
try (InputStream is = Files.newInputStream(trustStorePath)) {
	truststore.load(is, keyStorePass.toCharArray());
}
// 第二种
Path caCertificatePath = Paths.get("/path/to/ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
	trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);


SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslBuilder.build();
RestClient.builder(new HttpHost("localhost", 9200, "https"))
		.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
			@Override
			public HttpAsyncClientBuilder customizeHttpClient(
					HttpAsyncClientBuilder httpClientBuilder) {
				return httpClientBuilder.setSSLContext(sslContext);
			}
		});

嗅探器(Sniffer): 允许从运行的elasticsearch集群中自动发现节点,并将其设置到现有的RestClient实例中

High Level REST Client

版本向后兼容

同样引入maven

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.15.2</version>
</dependency>

与spring-boot的2.1.8.RELEASE版本会有报错,NoClassDefFoundError,需要版本号对应,这里新建了个非spring boot项目测试接口

连接与关闭:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));

client.close();

索引

测试的几种方法:

AnalyzeRequest request = AnalyzeRequest.withGlobalAnalyzer("english", "Some text to analyze", "Some more text to analyze");

// 自定义分析器
Map<String, Object> stopFilter = new HashMap<>();
stopFilter.put("type", "stop");
stopFilter.put("stopwords", new String[]{"to"});
AnalyzeRequest request2 = AnalyzeRequest.buildCustomAnalyzer("standard")
		.addCharFilter("html_strip")
		.addTokenFilter("lowercase")
		.addTokenFilter(stopFilter)
		.build("Some text to analyze");

AnalyzeRequest request3 = AnalyzeRequest.buildCustomNormalizer()
		.addTokenFilter("lowercase")
		.build("<b>BaR</b>");

AnalyzeRequest request4 = AnalyzeRequest.withIndexAnalyzer(
		"my_index",
		"my_analyzer",
		"some text to analyze"
);
AnalyzeRequest request5 = AnalyzeRequest.withNormalizer(
		"my_index",
		"my_normalizer",
		"some text to analyze"
);

AnalyzeRequest request6 = AnalyzeRequest.withField("my_index", "my_field", "some text to analyze");

request.explain(true);
// request.attributes("keyword", "type");

AnalyzeResponse response = client.indices().analyze(request3, RequestOptions.DEFAULT);
if (response.detail() != null) {
	for (AnalyzeResponse.AnalyzeToken token : response.detail().analyzer().getTokens() ) {
		System.out.println(token.getTerm());
	}
}
if (response.getTokens() != null) {
	response.getTokens().forEach(token -> System.out.println(token.getTerm()));
}

创建索引的几种方法:

CreateIndexRequest request = new CreateIndexRequest("twitter");
// 设置settings
request.settings(Settings.builder()
		.put("index.number_of_shards", 3)
		.put("index.number_of_replicas", 2)
);

// 设置mapping方法1
request.mapping("{\n" +
				"  \"properties\": {\n" +
				"    \"message\": {\n" +
				"      \"type\": \"text\"\n" +
				"    }\n" +
				"  }\n" +
				"}",
XContentType.JSON);

// 设置mapping方法2
Map<String, Object> message = new HashMap<>();
message.put("type", "text");
Map<String, Object> properties = new HashMap<>();
properties.put("message", message);
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
request.mapping(mapping);

// 设置mapping方法3
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
	builder.startObject("properties");
	{
		builder.startObject("message");
		{
			builder.field("type", "text");
		}
		builder.endObject();
	}
	builder.endObject();
}
builder.endObject();
request.mapping(builder);

// 设置别名
request.alias(new Alias("twitter_alias"));


// 直接json设置
request.source("{\n" +
		"    \"settings\" : {\n" +
		"        \"number_of_shards\" : 1,\n" +
		"        \"number_of_replicas\" : 0\n" +
		"    },\n" +
		"    \"mappings\" : {\n" +
		"        \"properties\" : {\n" +
		"            \"message\" : { \"type\" : \"text\" }\n" +
		"        }\n" +
		"    },\n" +
		"    \"aliases\" : {\n" +
		"        \"twitter_alias\" : {}\n" +
		"    }\n" +
		"}", XContentType.JSON);
		
// 超时等设置
request.setTimeout(TimeValue.timeValueMinutes(2)); 		

// 同步
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.index() + ":" + createIndexResponse.isAcknowledged() + "," + createIndexResponse.isShardsAcknowledged());

删除索引

try {
	DeleteIndexRequest request = new DeleteIndexRequest("twitter"); 
	AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
	System.out.println(deleteIndexResponse.isAcknowledged()));
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.NOT_FOUND) {
        System.out.println("NOT_FOUND");
    }
}

更新索引

PutMappingRequest request = new PutMappingRequest("twitter");
request.source(
    "{\n" +
    "  \"properties\": {\n" +
    "    \"message\": {\n" +
    "      \"type\": \"text\"\n" +
    "    }\n" +
    "  }\n" +
    "}", 
    XContentType.JSON);
AcknowledgedResponse putMappingResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);

Index APIs

创建索引文档

几种创建方法:

IndexRequest request = new IndexRequest("test");
request.id("1");
String jsonString = "{" +
		"\"user\":\"kimchy\"," +
		"\"postDate\":\"2021-12-01\"," +
		"\"message\":\"trying out Elasticsearch\"" +
		"}";
request.source(jsonString, XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("test").id("1").source(jsonMap);

IndexRequest indexRequest2 = new IndexRequest("test")
		.id("1")
		.source("user", "kimchy",
				"postDate", new Date(),
				"message", "trying out Elasticsearch");

				
// 超时时间
request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 
// 刷新规则
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 
// 版本,指定create操作类型时不能精确指定
request.version(1);
request.versionType(VersionType.EXTERNAL);
// 操作类型,指定后id重复将发生冲突
request.opType(DocWriteRequest.OpType.CREATE); 
request.opType("create");


// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);				
// 异步
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
	@Override
	public void onResponse(IndexResponse indexResponse) {
		// doSomething
	}
	@Override
	public void onFailure(Exception e) {
		// doSomething
	}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);

打印返回结果:

System.out.println("index: " + indexResponse.getIndex() + ", id: " + indexResponse.getId());
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
	System.out.println("created");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
	System.out.println("updated");
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println("fail: " + failure.reason());
	}
}
第一次执行打印:
index: test, id: 1
created
success

第二次执行打印:
index: test, id: 1
updated
success

模拟版本冲突

IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
                                    .setIfSeqNo(10L).setIfPrimaryTerm(20);
try {
	IndexResponse response = client.index(request, RequestOptions.DEFAULT);
	System.out.println(response.getResult());
} catch(ElasticsearchException e) {
	if (e.status() == RestStatus.CONFLICT) {
		System.out.println("conflict");
	}
} catch (Exception e) {
	System.out.println(e);
}

如果在opType为create时同索引下已有此id也会发生冲突

IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
							.opType(DocWriteRequest.OpType.CREATE);

获取

GetRequest getRequest = new GetRequest("test", "1");
// 不获取source
// getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
// 取哪些
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
// String[] includes = Strings.EMPTY_ARRAY;
// String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
// 其他配置
// getRequest.routing("routing");
// getRequest.storedFields("message");

// 同步
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse.getIndex() + "/" + getResponse.getId() + ": " + getResponse.getFields());
if (getResponse.isExists()) {
	getResponse.getSource().forEach((k, v) -> System.out.println(k + ": " + v));
	String sourceAsString = getResponse.getSourceAsString();
	Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
}

// 异步
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
	@Override
	public void onResponse(GetResponse getResponse) {
		System.out.println(getResponse.isExists());
	}

	@Override
	public void onFailure(Exception e) {
		e.printStackTrace();
	}
};
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);

如果获取的索引不存在,会抛出ElasticsearchException

catch (ElasticsearchException e) {
	if (e.status() == RestStatus.NOT_FOUND) {
		System.out.println("404");
	}
}

当要获取指定version的文档,且已存在文档的version不一致时也会抛出异常

try {
    GetRequest request = new GetRequest("posts", "1").version(2);
    GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        System.out.println("409");
    }
}

只获取_source字段时可以使用GetSourceRequest,使用和GetRequest差不多

GetSourceRequest getSourceRequest = new GetSourceRequest("test","1");
GetSourceResponse response = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
response.getSource().forEach((k, v) -> System.out.println(k + ": " + v));

EXISTS

如果只要判断是否存在

GetRequest getRequest = new GetRequest("test", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");

// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 异步
// client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
System.out.println("exists: " + exists);

删除

DeleteRequest request = new DeleteRequest("test", "1"); // .setIfSeqNo(100).setIfPrimaryTerm(2)
request.timeout("2m");

// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
	System.out.println("404");
	return;
}
System.out.println(deleteResponse.getIndex() + "/" + deleteResponse.getId() + ": " + deleteResponse.getVersion());
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println(failure.reason());
	}
}

// 异步
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
	@Override
	public void onResponse(DeleteResponse deleteResponse) {
		System.out.println(deleteResponse.getShardInfo().getSuccessful());
	}

	@Override
	public void onFailure(Exception e) {
		System.out.println("onFailure");
	}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);

与获取一样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT

更新

部分修改的几种方法

// String jsonString = "{" +
//  	"\"updated\":\"2021-12-01\"," +
//  	"\"reason\":\"daily update\"" +
//  "}";
// UpdateRequest request = new UpdateRequest("test", "1").doc(jsonString, XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("total", 10);
jsonMap.put("message", "new update message");
UpdateRequest request = new UpdateRequest("test", "1").doc(jsonMap);

// XContentBuilder builder = XContentFactory.jsonBuilder();
// builder.startObject();
// {
//     builder.timeField("updated", new Date());
//     builder.field("reason", "daily update");
// }
// builder.endObject();
// UpdateRequest request = new UpdateRequest("test", "1").doc(builder);

// UpdateRequest request = new UpdateRequest("test", "1")
//                          .doc("updated", new Date(), "reason", "daily update");

// Upsert
// String jsonString = "{\"created\":\"2017-01-01\"}";
// request.upsert(jsonString, XContentType.JSON);

设置参数

request.fetchSource(true);
request.docAsUpsert(true);
// request.timeout("1s");
// request.retryOnConflict(3);
// String[] includes = new String[]{"updated", "r*"};
// String[] excludes = Strings.EMPTY_ARRAY;
// request.fetchSource(new FetchSourceContext(true, includes, excludes));
// request.detectNoop(false);
// ..

执行更新,处理结果

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
// 异步
// client.updateAsync(request, RequestOptions.DEFAULT, listener);

System.out.println(updateResponse.getIndex() + "/" + updateResponse.getId() + ": " + updateResponse.getVersion());
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
	System.out.println("CREATED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
	System.out.println("UPDATED");	// UPDATED
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
	System.out.println("DELETED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
	System.out.println("NOOP");
}
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
	String sourceAsString = result.sourceAsString();
	Map<String, Object> sourceAsMap = result.sourceAsMap();
	sourceAsMap.forEach((k, v) -> System.out.println(k + ": " + v));
} else {
	System.out.println("not exists");
}
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
	System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
	for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
		System.out.println("failure: " + failure.reason());
	}
}

使用script脚本

UpdateRequest request = new UpdateRequest("test", "1");

// 使用inline脚本
Map<String, Object> parameters = Collections.singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.total += params.count", parameters);
request.script(inline);

// 使用已存储的脚本
// Script stored = new Script( ScriptType.STORED, null, "script_id", parameters);
// request.script(stored);

request.scriptedUpsert(true);

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
System.out.println(updateResponse.getResult());		// UPDATED

同样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT

term查询

Term Vectors词条向量,是关于词的一些统计信息

TermVectorsRequest request = new TermVectorsRequest("test", "1");
request.setFields("user");

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("message", "trying out Elasticsearch").endObject();
TermVectorsRequest request2 = new TermVectorsRequest("test", docBuilder);

// 同步
TermVectorsResponse response = client.termvectors(request, RequestOptions.DEFAULT);
TermVectorsResponse response2 = client.termvectors(request2, RequestOptions.DEFAULT);

批量文档操作

Bulk操作

BulkRequest request = new BulkRequest();
request.add(new IndexRequest("book").id("1").source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field", "baz"));

BulkRequest request2 = new BulkRequest();
request2.add(new DeleteRequest("book", "3"));
request2.add(new UpdateRequest("book", "2").doc(XContentType.JSON,"other", "test"));
request2.add(new IndexRequest("book").id("4").source(XContentType.JSON,"field", "baz"));

// 同步
BulkResponse bulkResponse = client.bulk(request2, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
	if (bulkItemResponse.isFailed()) {
		BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
	}
	DocWriteResponse itemResponse = bulkItemResponse.getResponse();
	switch (bulkItemResponse.getOpType()) {
		case INDEX:
		case CREATE:
			IndexResponse indexResponse = (IndexResponse) itemResponse;
			break;
		case UPDATE:
			UpdateResponse updateResponse = (UpdateResponse) itemResponse;
			break;
		case DELETE:
			DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
	}
}
···
同样可以对request进行配置可以使用异步调用

使用BulkProcessor可以进行简单的调用
```java
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
	@Override
	public void beforeBulk(long executionId, BulkRequest request) {
		System.out.println("before");
	}
	@Override
	public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
		System.out.println("after: " + response.hasFailures());
	}
	@Override
	public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
		System.out.println("failure");
	}
};

BulkProcessor.Builder builder = BulkProcessor.builder(
		(request, bulkListener) ->
				client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
		listener, "bulk-processor-name");
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor bulkProcessor = builder.build();

IndexRequest one = new IndexRequest("book").id("4")
		.source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("book").id("5")
		.source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("book").id("6")
		.source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println(terminated);
bulkProcessor.close();

批量获取

MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("book", "1").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
request.add(new MultiGetRequest.Item("book", "2"));
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("book", "3").fetchSourceContext(fetchSourceContext));
// 默认查询数据,返回的属性字段都在_source中,需要在创建索引时设置字段的store属性为true,那么查询会再fields里显示
request.add(new MultiGetRequest.Item("book", "4").storedFields("title"));

// 同步
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
for (MultiGetItemResponse item : response.getResponses()) {
	System.out.println(JSON.toJSONString(item));
}

按查询批量操作

按照查询批量更新:

UpdateByQueryRequest request = new UpdateByQueryRequest("test", "test2..");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setConflicts("proceed");
request.setMaxDocs(10);
// request.setBatchSize(100);
request.setScript(new Script(ScriptType.INLINE, "painless",
				"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
				Collections.emptyMap()));
// request.setSlices(2);
// request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat");
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);

// 同步
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

查询批量删除类似

DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); 
request.setQuery(new TermQueryBuilder("user", "kimchy"));
// ...
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);

批量term查询

// 第一种
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
TermVectorsRequest tvrequest1 = new TermVectorsRequest("test", "1");
tvrequest1.setFields("user");
request.add(tvrequest1);

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "??").endObject();
TermVectorsRequest tvrequest2 = new TermVectorsRequest("test", docBuilder);
request.add(tvrequest2);

// 第二种
TermVectorsRequest tvrequestTemplate = new TermVectorsRequest("test", "fake_id");
tvrequestTemplate.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request2 = new MultiTermVectorsRequest(ids, tvrequestTemplate);

// 同步
MultiTermVectorsResponse response = client.mtermvectors(request2, RequestOptions.DEFAULT);
List<TermVectorsResponse> tvresponseList = response.getTermVectorsResponses();
if (tvresponseList != null) {
	for (TermVectorsResponse tvresponse : tvresponseList) {
		System.out.println(tvresponse);
	}
}

查询

// SearchRequest searchRequest = new SearchRequest();
// searchRequest.indices("test");
SearchRequest searchRequest = new SearchRequest("test");
// searchRequest.routing("routing");

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// sourceBuilder .query(QueryBuilders.matchAllQuery());
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy2"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
// sourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
// sourceBuilder.fetchSource(false);
// String[] includeFields = new String[] {"title", "innerObject.*"};
// String[] excludeFields = new String[] {"user"};
// sourceBuilder.fetchSource(includeFields, excludeFields);
searchRequest.source(sourceBuilder);

// 同步
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse.status().getStatus());
SearchHits hits = searchResponse.getHits();
System.out.println(hits.getTotalHits().value);
for (SearchHit hit : hits.getHits()) {
	System.out.println("============ " + hit.getIndex() + ": " + hit.getId());
	hit.getSourceAsMap().forEach((k, v) -> System.out.println(k + ": " + v));
	System.out.println(hit.getHighlightFields());
}
// Aggregations aggregations = searchResponse.getAggregations();
// Suggest suggest = searchResponse.getSuggest();
// Map<String, ProfileShardResult> profilingResults = searchResponse.getProfileResults();

其他构建方式,和高亮、聚合、查询建议

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);

QueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery("user", "kimchy")
		.fuzziness(Fuzziness.AUTO)
		.prefixLength(3)
		.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder);
// searchSourceBuilder.profile(true);

// 高亮
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);

// 聚合
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
searchSourceBuilder.aggregation(aggregation);

// 查询建议
SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder.suggest(suggestBuilder);

滚动查询:

SearchRequest searchRequest = new SearchRequest("test");
// ...
searchRequest.scroll(TimeValue.timeValueMinutes(1L)); 

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();

// 如果searchHits不为空,使用scrollId滚动查询
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 
scrollRequest.scroll(TimeValue.timeValueSeconds(30));
SearchResponse searchScrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();  

清除

ClearScrollRequest request = new ClearScrollRequest(); 
request.addScrollId(scrollId);

使用模板查询

SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("test"));

// request.setScriptType(ScriptType.STORED);
// request.setScript("message_search");
request.setScriptType(ScriptType.INLINE);
request.setScript("{" +
		"  \"query\": { \"match\" : { \"\" : \"\" } }," +
		"  \"size\" : \"\"" +
		"}");
Map<String, Object> scriptParams = new HashMap<>();
scriptParams.put("field", "message");
scriptParams.put("value", "Elasticsearch");
scriptParams.put("size", 5);
request.setScriptParams(scriptParams);

request.setExplain(true);
request.setProfile(true);
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
SearchResponse searchResponse = response.getResponse();

如果只要查询数量,可以使用CountRequest

CountRequest countRequest = new CountRequest("test");
QueryBuilder queryBuilder = QueryBuilders.matchQuery("user", "kimchy2");
countRequest.query(queryBuilder);

CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
System.out.println(countResponse.status());
System.out.println(countResponse.getCount());

Index APIs

Java Client

两种创建方法:

// 第一种
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// 第二种
RestClientBuilder httpClientBuilder = RestClient.builder(new HttpHost("localhost", 9200));
RestHighLevelClient hlrc = new RestHighLevelClient(httpClientBuilder);
RestClientTransport transport = new RestClientTransport(hlrc.getLowLevelClient(), new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);

创建索引

CreateIndexResponse res = client.indices().create(c -> c.index("products"));

CreateIndexResponse createResponse = client.indices()
		.create(createIndexBuilder -> createIndexBuilder
				.index("myIndex")
				.aliases("abc", aliasBuilder -> aliasBuilder.isWriteIndex(true))
				// .mappings(JsonValue)
		);

查询

SearchResponse<Object> search = client.search(s -> s
				.index("test")
				.query(q -> q
						.term(t -> t
								.field("user")
								.value("kimchy3")
						)),
		Object.class);

for (Hit<Object> hit: search.hits().hits()) {
	System.out.println(hit.source());
}