easysearch-client 2.0.x Java 客户端 API 使用
#
bulk 批量写入
#
EasysearchClient client = SampleClient.create();
String json2 = "{"
+ " \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
+ " \"agent\": {"
+ " \"version\": \"7.3.2\","
+ " \"type\": \"filebeat\","
+ " \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
+ " \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
+ " }"
+ "}";
BulkRequest.Builder br = new BulkRequest.Builder();
for (int i = 0; i < 10; i++) {
br.operations(op -> op.index(idx -> idx.index(indexName).document(JsonData.fromJson(json2))));
}
BulkResponse bulkResponse = client.bulk(br.build());
if (bulkResponse.errors()) {
for (BulkResponseItem item : bulkResponse.items()) {
System.out.println(item.toString());
}
}
索引单个文档
#
String json2 = "{"
+ " \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
+ " \"agent\": {"
+ " \"version\": \"7.3.2\","
+ " \"type\": \"filebeat\","
+ " \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
+ " \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
+ " }"
+ "}";
IndexRequest<JsonData> request = IndexRequest.of(i -> i
.index("logs")
.withJson(new StringReader(json2))
);
IndexResponse response = client.index(request);
System.out.println(response);
// 也可以这样
LogEntry logEntry = mapper.readValue(json2, LogEntry.class);
IndexRequest<LogEntry> request2 = IndexRequest.of(i -> i
.index(indexName)
.id(logEntry.getAgent().getEphemeralId())
.document(logEntry)
);
IndexResponse response2 = client.index(request2);
// 或者这样
IndexRequest.Builder<LogEntry> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index(indexName);
indexReqBuilder.id(logEntry.getAgent().getEphemeralId());
indexReqBuilder.document(logEntry);
response2 = client.index(indexReqBuilder.build());
删除文档
#
DeleteRequest deleteRequest = new DeleteRequest.Builder()
.index(indexName)
.id("3ff1f2c8-1f7f-48c2-b560-4272591b8578")
.build();
DeleteResponse response = client.delete(deleteRequest);
deleteByQuery 删除
#
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder()
.index(indexName)
.query(q -> q.match(new MatchQuery.Builder()
.field("agent.type").query("filebeat").build())
).build();
DeleteByQueryResponse response = client.deleteByQuery(deleteByQueryRequest);
更新文档
#
UpdateRequest updateRequest = UpdateRequest.of(u -> u
.index(indexName)
.id("3ff1f2c8-1f7f-48c2-b560-4272591b8578")
.doc(Map.of("agent.type", "logstash"))
);
UpdateResponse<Map<String, Object>> response = client.update(updateRequest, Map.class);
updateByQuery 更新
#
Query query = Query.of(q -> q
.term(t -> t
.field("agent.type")
.value(v -> v.stringValue("filebeat"))
)
);
UpdateByQueryRequest updateByQueryRequest = UpdateByQueryRequest.of(u -> u
.index(indexName).query(query).script(s -> s.inline(in ->
in.source("ctx._source.agent.type = params.param1")
.lang("painless")
.params(Map.of("param1", JsonData.of("logstash"))))).refresh(true)
);
UpdateByQueryResponse response = client.updateByQuery(updateByQueryRequest);
System.out.println(response.updated());
搜索文档
#
Query query = Query.of(q -> q
.term(t -> t
.field("agent.type")
.value(v -> v.stringValue("filebeat"))
)
);
SortOptions.Builder sb = new SortOptions.Builder();
SortOptions sortOptions = sb.field(fs -> fs.field("@timestamp").order(SortOrder.Desc)).build();
final SearchRequest.Builder searchReq = new SearchRequest.Builder().allowPartialSearchResults(false)
.index(indexName)
.size(10)
.sort(sortOptions)
.source(sc -> sc.fetch(true))
.trackTotalHits(tr -> tr.enabled(true))
.query(query);
SearchResponse<LogEntry> searchResponse = client.search(searchReq.build(), LogEntry.class);
System.out.println(searchResponse.hits().total());
for (Hit<LogEntry> hit : searchResponse.hits().hits()) {
System.out.println(JsonData.of(hit.source()).toJson(new JacksonJsonpMapper()));
}