ElasticSearch 客户端选择
第一种
基于 TCP 协议(ES 的 9300 端口,用于集群通信),依赖 spring-data-elasticsearch:transport-api.jar
,此方式的缺点如下:
- SpringBoot 版本不同, 依赖的
transport-api.jar
版本也就不同,不能适配不同版本的 ES - 从 ES
7.x
版本开始,官方已经不建议使用 9300 端口来操作,而且 ES 8.x
以后就要移除该操作方式
第二种
基于 HTTP 协议(ES 的 9200 端口,用于 RESTful API),可选的客户端如下:
- RestTemplate、HttpClient、OkHttp:直接发送 HTTP 请求,ES 的很多操作需要自己封装,使用起来比较麻烦
- Elasticsearch-Rest-Client:官方的 Rest 客户端,分为
Java Low Level REST Client
和 Java High Level REST Client
,API 层次分明,上手简单
客户端对比
客户端 | 优点 | 缺点 | 说明 |
---|
Java Low Level Rest Client | 与 ES 版本之间没有关系,适用于作为所有版本 ES 的客户端 | | 可以看做是低级的 HTTP 客户端,没有封装过多的 ES 操作 |
Java High Level Rest Client | 使用最多 | 使用时必须与 ES 版本保持一致 | 基于 Low Level Rest Client,但在 ES 7.15.0 版本之后被弃用 |
TransportClient | 使用 Transport 端口 (9300) 进行通信,能够使用 ES 集群中的一些特性,性能最好 | JAR 包版本必须与 ES 集群版本一致,ES 集群升级,客户端也要跟着升级到相同版本 | 已过时,官方从 ES 7 版本开始不建议使用,ES 8 版本之后被移除 |
Elasticsearch Java API Client | 最新的 ES 客户端 | 文档较少 | |
提示
关于更多的 Elasticsearch 客户端说明,建议阅读 官方文档。
ElasticSearch 客户端使用案例
下面将简单介绍 SpringBoot 项目如何引入 Java High Level Rest Client
,由于 SpringBoot Starter 默认依赖了某版本的 Elasticsearch,因此需要在 pom.xml
配置文件中使用 <elasticsearch.version>
来指定(覆盖) Elasticsearch 的实际版本号,否则会出现兼容性问题。
引入 Maven 坐标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <properties> <elasticsearch.version>7.4.2</elasticsearch.version> </properties>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${elasticsearch.version}</version> </dependency> </dependencies>
|
Java 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class ElasticSearchConfig { public static final RequestOptions COMMON_OPTIONS; static { String token = ""; RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); COMMON_OPTIONS = builder.build(); }
@Bean public RestHighLevelClient restHighLevelClient() { RestClientBuilder builder = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")); return new RestHighLevelClient(builder); } }
|
Java 测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| import com.alibaba.fastjson2.JSON; import com.clay.gulimall.search.config.ElasticSearchConfig; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.Avg; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
@Slf4j @SpringBootTest public class ElasticSearchApiTest { @Autowired private RestHighLevelClient esClient;
@Test public void indexData() throws Exception { IndexRequest request = new IndexRequest("posts").id("1") .source("user", "Jim", "postDate", new Date(), "message", "trying out ElasticSearch"); IndexResponse indexResponse = esClient.index(request, ElasticSearchConfig.COMMON_OPTIONS); log.info(JSON.toJSONString(indexResponse)); }
@Test public void searchData() throws Exception { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("bank"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill")); TermsAggregationBuilder ageAgg = AggregationBuilders.terms("group_by_age").field("age").size(100); searchSourceBuilder.aggregation(ageAgg); AvgAggregationBuilder avgBalance = AggregationBuilders.avg("avgBalance").field("balance"); searchSourceBuilder.aggregation(avgBalance); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = esClient.search(searchRequest, ElasticSearchConfig.COMMON_OPTIONS); SearchHits searchHits = searchResponse.getHits(); SearchHit[] hitArray = searchHits.getHits(); for (SearchHit hit : hitArray) { String recored = hit.getSourceAsString(); log.info("id: {}, data: {}", hit.getId(), recored); } Aggregations aggregations = searchResponse.getAggregations(); Terms terms = aggregations.get("group_by_age"); for (Terms.Bucket bucket : terms.getBuckets()) { log.info("age: {}, total: {}", bucket.getKeyAsString(), bucket.getDocCount()); } Avg avg = aggregations.get("avgBalance"); log.info("avg balance: {}", avg.getValue()); log.info("search params: {}\n", searchSourceBuilder.toString()); log.info("search result: {}\n", JSON.toJSONString(searchResponse)); } }
|
上述的聚合查询代码,最终发出 HTTP 请求体内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| GET /bank/_search { "query": { "match": { "address": "mill" } }, "aggs": { "group_by_age": { "terms": { "field": "age", "size": 100 } }, "avgBalance": { "avg": { "field": "balance" } } } }
|
ElasticSearch 日志分析技术栈
大型项目的日志分析一般有以下几种技术栈:
- Kafka + ElasticSearch + Kibana
- Logstash + ElasticSearch + Kibana
- Filebeat + Logstash + ElasticSearch + Kibana
- Filebeat + Kafka + Logstash + ElasticSearch + Kibana