Elasticsearch(一) 入门
Elasticsearch(二) 搜索
Elasticsearch(三) API调用
Java Api
- Java REST Client
- Java Low Level REST Client
- 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串
- 兼容所有ES版本
- Java High Level REST Client
- 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api
- 使用的版本需要保持和ES服务端的版本一致,否则会有版本问题
- 官方推荐
- Java Low Level REST Client
- Java Client
Low Level REST Client
引入maven
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.15.2</version>
</dependency>
简单的demo:
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")).build();
Request request = new Request("GET", "/");
// request.setEntity(new NStringEntity("{\"json\":\"text\"}", ContentType.APPLICATION_JSON));
// request.setJsonEntity("{\"json\":\"text\"}");
// request.addParameter("key", "value");
// 同步
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println(responseBody);
restClient.close();
异步:
Cancellable cancellable = restClient.performRequestAsync(request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
// doSomething
}
@Override
public void onFailure(Exception exception) {
// doSomething
}
});
// 取消,客户端终止http请求
// cancellable.cancel();
可以在创建RestClient时进行一些配置:
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
// 全局的header,比如Authorization等
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
builder.setDefaultHeaders(defaultHeaders);
// 添加节点失败通知Listener
builder.setFailureListener(new RestClient.FailureListener() {
@Override
public void onFailure(Node node) {
// doSomething
}
});
// 设置超时
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(5000)
.setSocketTimeout(60000);
}
});
// 验证
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "user-password"));
// 使者线程id
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
RestClient restClient = builder.build();
可以再request上进行配置:
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// Header
builder.addHeader("Authorization", TOKEN);
// response consumer
builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
RequestOptions COMMON_OPTIONS = builder.build();
request.setOptions(COMMON_OPTIONS);
异步并发:
final CountDownLatch latch = new CountDownLatch(documents.length);
for (int i = 0; i < documents.length; i++) {
Request request = new Request("PUT", "/posts/doc/" + i);
request.setEntity(documents[i]);
restClient.performRequestAsync(request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
// doSomething
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
// doSomething
latch.countDown();
}
}
);
}
latch.await();
使用TLS:
// 第一种
Path trustStorePath = Paths.get("/path/to/truststore.p12");
KeyStore truststore = KeyStore.getInstance("pkcs12");
try (InputStream is = Files.newInputStream(trustStorePath)) {
truststore.load(is, keyStorePass.toCharArray());
}
// 第二种
Path caCertificatePath = Paths.get("/path/to/ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslBuilder.build();
RestClient.builder(new HttpHost("localhost", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext);
}
});
嗅探器(Sniffer): 允许从运行的elasticsearch集群中自动发现节点,并将其设置到现有的RestClient实例中
High Level REST Client
版本向后兼容
同样引入maven
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.15.2</version>
</dependency>
与spring-boot的2.1.8.RELEASE版本会有报错,NoClassDefFoundError,需要版本号对应,这里新建了个非spring boot项目测试接口
连接与关闭:
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
client.close();
索引
测试的几种方法:
AnalyzeRequest request = AnalyzeRequest.withGlobalAnalyzer("english", "Some text to analyze", "Some more text to analyze");
// 自定义分析器
Map<String, Object> stopFilter = new HashMap<>();
stopFilter.put("type", "stop");
stopFilter.put("stopwords", new String[]{"to"});
AnalyzeRequest request2 = AnalyzeRequest.buildCustomAnalyzer("standard")
.addCharFilter("html_strip")
.addTokenFilter("lowercase")
.addTokenFilter(stopFilter)
.build("Some text to analyze");
AnalyzeRequest request3 = AnalyzeRequest.buildCustomNormalizer()
.addTokenFilter("lowercase")
.build("<b>BaR</b>");
AnalyzeRequest request4 = AnalyzeRequest.withIndexAnalyzer(
"my_index",
"my_analyzer",
"some text to analyze"
);
AnalyzeRequest request5 = AnalyzeRequest.withNormalizer(
"my_index",
"my_normalizer",
"some text to analyze"
);
AnalyzeRequest request6 = AnalyzeRequest.withField("my_index", "my_field", "some text to analyze");
request.explain(true);
// request.attributes("keyword", "type");
AnalyzeResponse response = client.indices().analyze(request3, RequestOptions.DEFAULT);
if (response.detail() != null) {
for (AnalyzeResponse.AnalyzeToken token : response.detail().analyzer().getTokens() ) {
System.out.println(token.getTerm());
}
}
if (response.getTokens() != null) {
response.getTokens().forEach(token -> System.out.println(token.getTerm()));
}
创建索引的几种方法:
CreateIndexRequest request = new CreateIndexRequest("twitter");
// 设置settings
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
// 设置mapping方法1
request.mapping("{\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}",
XContentType.JSON);
// 设置mapping方法2
Map<String, Object> message = new HashMap<>();
message.put("type", "text");
Map<String, Object> properties = new HashMap<>();
properties.put("message", message);
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
request.mapping(mapping);
// 设置mapping方法3
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("properties");
{
builder.startObject("message");
{
builder.field("type", "text");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
request.mapping(builder);
// 设置别名
request.alias(new Alias("twitter_alias"));
// 直接json设置
request.source("{\n" +
" \"settings\" : {\n" +
" \"number_of_shards\" : 1,\n" +
" \"number_of_replicas\" : 0\n" +
" },\n" +
" \"mappings\" : {\n" +
" \"properties\" : {\n" +
" \"message\" : { \"type\" : \"text\" }\n" +
" }\n" +
" },\n" +
" \"aliases\" : {\n" +
" \"twitter_alias\" : {}\n" +
" }\n" +
"}", XContentType.JSON);
// 超时等设置
request.setTimeout(TimeValue.timeValueMinutes(2));
// 同步
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.index() + ":" + createIndexResponse.isAcknowledged() + "," + createIndexResponse.isShardsAcknowledged());
删除索引
try {
DeleteIndexRequest request = new DeleteIndexRequest("twitter");
AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(deleteIndexResponse.isAcknowledged()));
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
System.out.println("NOT_FOUND");
}
}
更新索引
PutMappingRequest request = new PutMappingRequest("twitter");
request.source(
"{\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}",
XContentType.JSON);
AcknowledgedResponse putMappingResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);
创建索引文档
几种创建方法:
IndexRequest request = new IndexRequest("test");
request.id("1");
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2021-12-01\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("test").id("1").source(jsonMap);
IndexRequest indexRequest2 = new IndexRequest("test")
.id("1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
// 超时时间
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
// 刷新规则
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
// 版本,指定create操作类型时不能精确指定
request.version(1);
request.versionType(VersionType.EXTERNAL);
// 操作类型,指定后id重复将发生冲突
request.opType(DocWriteRequest.OpType.CREATE);
request.opType("create");
// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
// 异步
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
// doSomething
}
@Override
public void onFailure(Exception e) {
// doSomething
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
打印返回结果:
System.out.println("index: " + indexResponse.getIndex() + ", id: " + indexResponse.getId());
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("created");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("updated");
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
System.out.println("fail: " + failure.reason());
}
}
第一次执行打印:
index: test, id: 1
created
success
第二次执行打印:
index: test, id: 1
updated
success
模拟版本冲突
IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
.setIfSeqNo(10L).setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
System.out.println("conflict");
}
} catch (Exception e) {
System.out.println(e);
}
如果在opType为create时同索引下已有此id也会发生冲突
IndexRequest request = new IndexRequest("test").id("1").source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
获取
GetRequest getRequest = new GetRequest("test", "1");
// 不获取source
// getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
// 取哪些
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
// String[] includes = Strings.EMPTY_ARRAY;
// String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
// 其他配置
// getRequest.routing("routing");
// getRequest.storedFields("message");
// 同步
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(getResponse.getIndex() + "/" + getResponse.getId() + ": " + getResponse.getFields());
if (getResponse.isExists()) {
getResponse.getSource().forEach((k, v) -> System.out.println(k + ": " + v));
String sourceAsString = getResponse.getSourceAsString();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
}
// 异步
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println(getResponse.isExists());
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
如果获取的索引不存在,会抛出ElasticsearchException
catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
System.out.println("404");
}
}
当要获取指定version的文档,且已存在文档的version不一致时也会抛出异常
try {
GetRequest request = new GetRequest("posts", "1").version(2);
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
System.out.println("409");
}
}
只获取_source字段时可以使用GetSourceRequest,使用和GetRequest差不多
GetSourceRequest getSourceRequest = new GetSourceRequest("test","1");
GetSourceResponse response = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
response.getSource().forEach((k, v) -> System.out.println(k + ": " + v));
EXISTS
如果只要判断是否存在
GetRequest getRequest = new GetRequest("test", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 异步
// client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
System.out.println("exists: " + exists);
删除
DeleteRequest request = new DeleteRequest("test", "1"); // .setIfSeqNo(100).setIfPrimaryTerm(2)
request.timeout("2m");
// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("404");
return;
}
System.out.println(deleteResponse.getIndex() + "/" + deleteResponse.getId() + ": " + deleteResponse.getVersion());
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
System.out.println(failure.reason());
}
}
// 异步
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
System.out.println(deleteResponse.getShardInfo().getSuccessful());
}
@Override
public void onFailure(Exception e) {
System.out.println("onFailure");
}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
与获取一样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT
更新
部分修改的几种方法
// String jsonString = "{" +
// "\"updated\":\"2021-12-01\"," +
// "\"reason\":\"daily update\"" +
// "}";
// UpdateRequest request = new UpdateRequest("test", "1").doc(jsonString, XContentType.JSON);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("total", 10);
jsonMap.put("message", "new update message");
UpdateRequest request = new UpdateRequest("test", "1").doc(jsonMap);
// XContentBuilder builder = XContentFactory.jsonBuilder();
// builder.startObject();
// {
// builder.timeField("updated", new Date());
// builder.field("reason", "daily update");
// }
// builder.endObject();
// UpdateRequest request = new UpdateRequest("test", "1").doc(builder);
// UpdateRequest request = new UpdateRequest("test", "1")
// .doc("updated", new Date(), "reason", "daily update");
// Upsert
// String jsonString = "{\"created\":\"2017-01-01\"}";
// request.upsert(jsonString, XContentType.JSON);
设置参数
request.fetchSource(true);
request.docAsUpsert(true);
// request.timeout("1s");
// request.retryOnConflict(3);
// String[] includes = new String[]{"updated", "r*"};
// String[] excludes = Strings.EMPTY_ARRAY;
// request.fetchSource(new FetchSourceContext(true, includes, excludes));
// request.detectNoop(false);
// ..
执行更新,处理结果
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
// 异步
// client.updateAsync(request, RequestOptions.DEFAULT, listener);
System.out.println(updateResponse.getIndex() + "/" + updateResponse.getId() + ": " + updateResponse.getVersion());
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println("CREATED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
System.out.println("UPDATED"); // UPDATED
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
System.out.println("DELETED");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
System.out.println("NOOP");
}
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
String sourceAsString = result.sourceAsString();
Map<String, Object> sourceAsMap = result.sourceAsMap();
sourceAsMap.forEach((k, v) -> System.out.println(k + ": " + v));
} else {
System.out.println("not exists");
}
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("less");
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
System.out.println("failure: " + failure.reason());
}
}
使用script脚本
UpdateRequest request = new UpdateRequest("test", "1");
// 使用inline脚本
Map<String, Object> parameters = Collections.singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.total += params.count", parameters);
request.script(inline);
// 使用已存储的脚本
// Script stored = new Script( ScriptType.STORED, null, "script_id", parameters);
// request.script(stored);
request.scriptedUpsert(true);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
System.out.println(updateResponse.getResult()); // UPDATED
同样,如果不存在的文档会返回DocWriteResponse.Result.NOT_FOUND的状态,如果version冲突返回RestStatus.CONFLICT
term查询
Term Vectors词条向量,是关于词的一些统计信息
TermVectorsRequest request = new TermVectorsRequest("test", "1");
request.setFields("user");
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("message", "trying out Elasticsearch").endObject();
TermVectorsRequest request2 = new TermVectorsRequest("test", docBuilder);
// 同步
TermVectorsResponse response = client.termvectors(request, RequestOptions.DEFAULT);
TermVectorsResponse response2 = client.termvectors(request2, RequestOptions.DEFAULT);
批量文档操作
Bulk操作
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("book").id("1").source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field", "baz"));
BulkRequest request2 = new BulkRequest();
request2.add(new DeleteRequest("book", "3"));
request2.add(new UpdateRequest("book", "2").doc(XContentType.JSON,"other", "test"));
request2.add(new IndexRequest("book").id("4").source(XContentType.JSON,"field", "baz"));
// 同步
BulkResponse bulkResponse = client.bulk(request2, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
···
同样可以对request进行配置,可以使用异步调用
使用BulkProcessor可以进行简单的调用
```java
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("before");
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println("after: " + response.hasFailures());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.println("failure");
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener, "bulk-processor-name");
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor bulkProcessor = builder.build();
IndexRequest one = new IndexRequest("book").id("4")
.source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("book").id("5")
.source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("book").id("6")
.source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println(terminated);
bulkProcessor.close();
批量获取
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("book", "1").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
request.add(new MultiGetRequest.Item("book", "2"));
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("book", "3").fetchSourceContext(fetchSourceContext));
// 默认查询数据,返回的属性字段都在_source中,需要在创建索引时设置字段的store属性为true,那么查询会再fields里显示
request.add(new MultiGetRequest.Item("book", "4").storedFields("title"));
// 同步
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
for (MultiGetItemResponse item : response.getResponses()) {
System.out.println(JSON.toJSONString(item));
}
按查询批量操作
按照查询批量更新:
UpdateByQueryRequest request = new UpdateByQueryRequest("test", "test2..");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setConflicts("proceed");
request.setMaxDocs(10);
// request.setBatchSize(100);
request.setScript(new Script(ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
// request.setSlices(2);
// request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat");
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
// 同步
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
查询批量删除类似
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
// ...
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
批量term查询
// 第一种
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
TermVectorsRequest tvrequest1 = new TermVectorsRequest("test", "1");
tvrequest1.setFields("user");
request.add(tvrequest1);
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "??").endObject();
TermVectorsRequest tvrequest2 = new TermVectorsRequest("test", docBuilder);
request.add(tvrequest2);
// 第二种
TermVectorsRequest tvrequestTemplate = new TermVectorsRequest("test", "fake_id");
tvrequestTemplate.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request2 = new MultiTermVectorsRequest(ids, tvrequestTemplate);
// 同步
MultiTermVectorsResponse response = client.mtermvectors(request2, RequestOptions.DEFAULT);
List<TermVectorsResponse> tvresponseList = response.getTermVectorsResponses();
if (tvresponseList != null) {
for (TermVectorsResponse tvresponse : tvresponseList) {
System.out.println(tvresponse);
}
}
查询
// SearchRequest searchRequest = new SearchRequest();
// searchRequest.indices("test");
SearchRequest searchRequest = new SearchRequest("test");
// searchRequest.routing("routing");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// sourceBuilder .query(QueryBuilders.matchAllQuery());
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy2"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
// sourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
// sourceBuilder.fetchSource(false);
// String[] includeFields = new String[] {"title", "innerObject.*"};
// String[] excludeFields = new String[] {"user"};
// sourceBuilder.fetchSource(includeFields, excludeFields);
searchRequest.source(sourceBuilder);
// 同步
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(searchResponse.status().getStatus());
SearchHits hits = searchResponse.getHits();
System.out.println(hits.getTotalHits().value);
for (SearchHit hit : hits.getHits()) {
System.out.println("============ " + hit.getIndex() + ": " + hit.getId());
hit.getSourceAsMap().forEach((k, v) -> System.out.println(k + ": " + v));
System.out.println(hit.getHighlightFields());
}
// Aggregations aggregations = searchResponse.getAggregations();
// Suggest suggest = searchResponse.getSuggest();
// Map<String, ProfileShardResult> profilingResults = searchResponse.getProfileResults();
其他构建方式,和高亮、聚合、查询建议
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);
QueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery("user", "kimchy")
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder);
// searchSourceBuilder.profile(true);
// 高亮
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);
// 聚合
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
searchSourceBuilder.aggregation(aggregation);
// 查询建议
SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder.suggest(suggestBuilder);
滚动查询:
SearchRequest searchRequest = new SearchRequest("test");
// ...
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
// 如果searchHits不为空,使用scrollId滚动查询
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueSeconds(30));
SearchResponse searchScrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();
清除
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
使用模板查询
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("test"));
// request.setScriptType(ScriptType.STORED);
// request.setScript("message_search");
request.setScriptType(ScriptType.INLINE);
request.setScript("{" +
" \"query\": { \"match\" : { \"\" : \"\" } }," +
" \"size\" : \"\"" +
"}");
Map<String, Object> scriptParams = new HashMap<>();
scriptParams.put("field", "message");
scriptParams.put("value", "Elasticsearch");
scriptParams.put("size", 5);
request.setScriptParams(scriptParams);
request.setExplain(true);
request.setProfile(true);
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
SearchResponse searchResponse = response.getResponse();
如果只要查询数量,可以使用CountRequest
CountRequest countRequest = new CountRequest("test");
QueryBuilder queryBuilder = QueryBuilders.matchQuery("user", "kimchy2");
countRequest.query(queryBuilder);
CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
System.out.println(countResponse.status());
System.out.println(countResponse.getCount());
Java Client
两种创建方法:
// 第一种
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
// 第二种
RestClientBuilder httpClientBuilder = RestClient.builder(new HttpHost("localhost", 9200));
RestHighLevelClient hlrc = new RestHighLevelClient(httpClientBuilder);
RestClientTransport transport = new RestClientTransport(hlrc.getLowLevelClient(), new JacksonJsonpMapper());
ElasticsearchClient esClient = new ElasticsearchClient(transport);
创建索引
CreateIndexResponse res = client.indices().create(c -> c.index("products"));
CreateIndexResponse createResponse = client.indices()
.create(createIndexBuilder -> createIndexBuilder
.index("myIndex")
.aliases("abc", aliasBuilder -> aliasBuilder.isWriteIndex(true))
// .mappings(JsonValue)
);
查询
SearchResponse<Object> search = client.search(s -> s
.index("test")
.query(q -> q
.term(t -> t
.field("user")
.value("kimchy3")
)),
Object.class);
for (Hit<Object> hit: search.hits().hits()) {
System.out.println(hit.source());
}