ElasticSearch 开发随笔

ElasticSearch 客户端选择

第一种

基于 TCP 协议(ES 的 9300 端口,用于集群通信),依赖 spring-data-elasticsearch:transport-api.jar,此方式的缺点如下:

  • SpringBoot 版本不同, 依赖的 transport-api.jar 版本也就不同,不能适配不同版本的 ES
  • 从 ES 7.x 版本开始,官方已经不建议使用 9300 端口来操作,而且 ES 8.x 以后就要移除该操作方式

第二种

基于 HTTP 协议(ES 的 9200 端口,用于 RESTful API),可选的客户端如下:

  • RestTemplate、HttpClient、OkHttp:直接发送 HTTP 请求,ES 的很多操作需要自己封装,使用起来比较麻烦
  • Elasticsearch-Rest-Client:官方的 Rest 客户端,分为 Java Low Level REST ClientJava High Level REST Client,API 层次分明,上手简单

客户端对比

客户端优点缺点说明
Java Low Level Rest Client 与 ES 版本之间没有关系,适用于作为所有版本 ES 的客户端可以看做是低级的 HTTP 客户端,没有封装过多的 ES 操作
Java High Level Rest Client 使用最多使用时必须与 ES 版本保持一致基于 Low Level Rest Client,但在 ES 7.15.0 版本之后被弃用
TransportClient 使用 Transport 端口 (9300) 进行通信,能够使用 ES 集群中的一些特性,性能最好 JAR 包版本必须与 ES 集群版本一致,ES 集群升级,客户端也要跟着升级到相同版本已过时,官方从 ES 7 版本开始不建议使用,ES 8 版本之后被移除
Elasticsearch Java API Client 最新的 ES 客户端文档较少

提示

关于更多的 Elasticsearch 客户端说明,建议阅读 官方文档

ElasticSearch 客户端使用案例

下面将简单介绍 SpringBoot 项目如何引入 Java High Level Rest Client,由于 SpringBoot Starter 默认依赖了某版本的 Elasticsearch,因此需要在 pom.xml 配置文件中使用 <elasticsearch.version> 来指定(覆盖) Elasticsearch 的实际版本号,否则会出现兼容性问题。

引入 Maven 坐标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<properties>
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>

Java 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticSearchConfig {

public static final RequestOptions COMMON_OPTIONS;

static {
// 基础配置信息
String token = "";
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// builder.addHeader("Authorization", "Bearer " + token);
// builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}

/**
* 定义 ES 客户端
*
* @return ES 客户端
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
// 指定ES的连接地址
RestClientBuilder builder = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"));
return new RestHighLevelClient(builder);
}

}

Java 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import com.alibaba.fastjson2.JSON;
import com.clay.gulimall.search.config.ElasticSearchConfig;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@Slf4j
@SpringBootTest
public class ElasticSearchApiTest {

@Autowired
private RestHighLevelClient esClient;

/**
* 创建索引数据
*/
@Test
public void indexData() throws Exception {
IndexRequest request = new IndexRequest("posts").id("1")
.source("user", "Jim", "postDate", new Date(), "message", "trying out ElasticSearch");

IndexResponse indexResponse = esClient.index(request, ElasticSearchConfig.COMMON_OPTIONS);
log.info(JSON.toJSONString(indexResponse));
}

/**
* 聚合查询
* <p> 查询 address 中包含 mill 的所有人的年龄分布以及平均薪资
*/
@Test
public void searchData() throws Exception {
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices("bank");
// 检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
// 按照年龄的分布进行聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("group_by_age").field("age").size(100);
searchSourceBuilder.aggregation(ageAgg);
// 计算所有人的平均薪资
AvgAggregationBuilder avgBalance = AggregationBuilders.avg("avgBalance").field("balance");
searchSourceBuilder.aggregation(avgBalance);
// 执行检索
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = esClient.search(searchRequest, ElasticSearchConfig.COMMON_OPTIONS);

// 获取搜索结果
SearchHits searchHits = searchResponse.getHits();
SearchHit[] hitArray = searchHits.getHits();
for (SearchHit hit : hitArray) {
String recored = hit.getSourceAsString();
log.info("id: {}, data: {}", hit.getId(), recored);
}

// 获取聚合结果 - 年龄的分布
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("group_by_age");
for (Terms.Bucket bucket : terms.getBuckets()) {
log.info("age: {}, total: {}", bucket.getKeyAsString(), bucket.getDocCount());
}

// 获取聚合结果 - 平均薪资
Avg avg = aggregations.get("avgBalance");
log.info("avg balance: {}", avg.getValue());

log.info("search params: {}\n", searchSourceBuilder.toString());
log.info("search result: {}\n", JSON.toJSONString(searchResponse));
}

}

上述的聚合查询代码,最终发出 HTTP 请求体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
GET /bank/_search
{
"query": {
"match": {
"address": "mill"
}
},
"aggs": {
"group_by_age": {
"terms": {
"field": "age",
"size": 100
}
},
"avgBalance": {
"avg": {
"field": "balance"
}
}
}
}

ElasticSearch 日志分析技术栈

大型项目的日志分析一般有以下几种技术栈:

  • Kafka + ElasticSearch + Kibana

  • Logstash + ElasticSearch + Kibana

  • Filebeat + Logstash + ElasticSearch + Kibana

  • Filebeat + Kafka + Logstash + ElasticSearch + Kibana