Elasticsearch整合及基本操作示例

1. Elasticsearch简介

1.1 基本介绍

ElasticSearch(ES)是一个基于Lucene构建的开源、分布式、RESTful接口的全文搜索引擎。ES还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,ES能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的引擎。

1.2 下载安装

ElasticSearch的本地安装及服务器部署分别见我的另外两篇博客,这里就不赘述了。

官方文档:https://www.elastic.co/guide/cn/elasticsearch/guide/2.x/index.html

下载地址:https://www.elastic.co/cn/downloads/elasticsearch

视频教程:尚硅谷-ElasticSearch教程入门到精通 (有配套的学习资料及笔记,见评论区)

1.3 关键概念

为了快速了解ES的关键概念及与传统关系型数据库的不同,可以与MySQL从几个方面做个对比。

[1] 结构名称不同

ElasticSearch MySQL
字段(Field) 属性(列)
文档(Document) 记录(行)
类型(Type)
索引(Index) 数据库

注:ES 在7.0以及之后的版本中 Type 被废弃了。一个 index 中只有一个默认的 type,即 _doc。被废弃后,库表合一,Index 既可以被认为是对应 MySQL 的 Database,也可以认为是对应的 Table。

[2] ES分布式搜索,传统数据库遍历式搜索

ES支持分片和复制,从而方便水平分割和扩展,复制保证了ES的高可用与高吞吐。

在ES中,当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的索引,索引可以被放置到集群中的任何节点上。分片优点:

  • 允许你水平分割/扩展你的内容容量。
  • 允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量。
  • 分片的分布,它的文档怎样聚合回搜索请求,完全由Elasticsearch管理。

[3] ES采用倒排索引,传统数据库采用B+树索引

假设一个文档(用id标识)是有许多的单词(用value标识)组成的,每个单词可能同一个文档中重复出现很多次,也可能出现在不同的文档中。

  • 正排索引:从文档角度看其中的单词,表示每个文档都含有哪些单词,以及每个单词出现了多少次(词频)及其出现位置(相对于文档首部的偏移量)。【即id —> value】

  • 倒排索引:从单词角度看文档,标识每个单词分别在那些文档中出现(文档ID),以及在各自的文档中每个单词分别出现了多少次(词频)- 及其出现位置(相对于该文档首部的偏移量)。【即value —> id】

ES中为所有字段默认都建了倒排索引。

1.4 适用情形

[1] 全文检索

  • Elasticsearch 靠全文检索起步,将 Lucene 开发包做成一个数据产品,屏蔽了 Lucene 各种复杂的设置,为开发人员提供了便利。

[2] 应用查询

  • Elasticsearch 最擅长的就是查询,基于倒排索引核心算法,查询性能强于 B-Tree 类型所有数据产品,尤其是关系型数据库方面。当数据量超过千万或者上亿时,数据检索的效率非常明显。

[3] 大数据领域

  • Elasticserach 已经成为大数据平台对外提供查询的重要组成部分之一。大数据平台将原始数据经过迭代计算,之后结果输出到一个数据库提供查询,特别是大批量的明细数据。

[4] 日志检索

  • 著名的 ELK 三件套,讲的就是 Elasticsearch,Logstash,Kibana,专门针对日志采集、存储、查询设计的产品组合。

[5] 监控领域

  • 指标监控,Elasticsearch 进入此领域比较晚,却赶上了好时代,Elasticsearch 由于其倒排索引核心算法,也是支持时序数据场景的,性能也是相当不错的,在功能性上完全压住时序数据库。

[6] 机器学习

  • 很多数据产品都集成了,Elasticsearch真正将机器学习落地成为一个产品 ,简化使用,所见即所得。而不像其它数据产品,仅仅集成算法包,使用者还必须开发很多应用支持。

2. 与Springboot的整合

完整示例代码已在Github上开源:https://github.com/Logistic98/es-springboot-demo

2.1 拉取项目依赖

使用Maven拉取项目依赖,注意服务端与高级客户端的版本要与你搭建的elasticsearch服务版本一致。

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<!--统一管理全局变量-->
<properties>
<elasticsearch.version>7.14.1</elasticsearch.version>
<elasticsearch.rest.high.level.client.version>7.14.1</elasticsearch.rest.high.level.client.version>
<httpclient.version>4.5.5</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
</properties>

<!-- elasticsearch 服务端 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- elasticsearch 高级客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.rest.high.level.client.version}</version>
</dependency>
<!-- HttpClient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- HttpCore -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>

2.2 编写配置文件

我搭建ES服务时开启了xpack安全验证,所以是需要账号密码的,没开启的话就不需要。

application.properties

1
2
3
4
5
## elasticsearch配置
elasticsearch.host=127.0.0.1
elasticsearch.port=9200
elasticsearch.username=username
elasticsearch.password=password

/config/ElasticsearchConfiguration.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
public class ElasticsearchConfiguration {

@Value("${elasticsearch.host}")
private String host;

@Value("${elasticsearch.port}")
private int port;

@Value("${elasticsearch.username}")
private String username;

@Value("${elasticsearch.password}")
private String password;

@Bean(destroyMethod = "close", name = "client")
public RestHighLevelClient initRestClient() {
// 用户认证对象
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// 设置账号密码
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
// 创建rest client对象
RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
return new RestHighLevelClient(builder);
}
}

3. 基本增删查改操作

本文只讲Java里如何操作ES,关于如何直接通过HTTP请求操作ES的部分略过。

3.1 索引操作

3.1.1 创建索引

1
2
3
4
5
6
@Override
public boolean createIndex(String index) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
}

3.1.2 查询索引

1
2
3
4
5
6
@Override
public String[] queryIndex(String index) throws IOException {
GetIndexRequest queryIndexRequest = new GetIndexRequest(index);
GetIndexResponse getIndexResponse = client.indices().get(queryIndexRequest, RequestOptions.DEFAULT);
return getIndexResponse.getIndices();
}

3.1.3 删除索引

1
2
3
4
5
6
@Override
public boolean deleteIndex(String index) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
return deleteIndexResponse.isAcknowledged();
}

3.1.4 检查索引是否存在

1
2
GetIndexRequest getIndexRequest = new GetIndexRequest (Constant.INDEX);
boolean exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);

3.2 文档操作

公共文件如下:

/constant/Constant.java

1
2
3
public interface Constant {
String INDEX = "user";
}

/pojo/UserDocument.java

1
2
3
4
5
6
7
8
@Data
public class UserDocument {
private String id;
private String name;
private String sex;
private Integer age;
private String city;
}

3.2.1 新增文档

1
2
3
4
5
6
7
8
9
@Override
public Boolean createDocument(UserDocument document) throws Exception {
String id = document.getId();
IndexRequest indexRequest = new IndexRequest(Constant.INDEX)
.id(id)
.source(JSON.toJSONString(document), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
return indexResponse.status().equals(RestStatus.CREATED);
}

3.2.2 查询文档

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public UserDocument queryDocument(String id) throws IOException {
GetRequest getRequest = new GetRequest(Constant.INDEX, id);
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
UserDocument result = new UserDocument();
if (getResponse.isExists()) {
String sourceAsString = getResponse.getSourceAsString();
result = JSON.parseObject(sourceAsString, UserDocument.class);
} else {
logger.error("没有找到该 id 的文档");
}
return result;
}

3.2.3 修改文档

1
2
3
4
5
6
7
8
9
@Override
public Boolean updateDocument(UserDocument document) throws Exception {
String id = document.getId();
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(Constant.INDEX).id(id);
updateRequest.doc(JSON.toJSONString(document), XContentType.JSON);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
return updateResponse.status().equals(RestStatus.OK);
}

3.2.4 删除文档

1
2
3
4
5
6
@Override
public String deleteDocument(String id) throws Exception {
DeleteRequest deleteRequest = new DeleteRequest(Constant.INDEX, id);
DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
return response.getResult().name();
}

3.2.5 批量操作文档

批量新增:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Boolean bulkCreateDocument(List<UserDocument> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (UserDocument document : documents) {
String id = document.getId();
IndexRequest indexRequest = new IndexRequest(Constant.INDEX)
.id(id)
.source(JSON.toJSONString(document), XContentType.JSON);
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulkResponse.status().equals(RestStatus.OK);
}

批量删除:

1
2
3
4
5
6
7
8
9
10
@Override
public Boolean bulkDeleteDocument(List<UserDocument> documents) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (UserDocument document : documents) {
String id = document.getId();
bulkRequest.add(new DeleteRequest().index(Constant.INDEX).id(id));
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulkResponse.status().equals(RestStatus.OK);
}

3.2.6 全量查询文档

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public List<UserDocument> queryAllDocument() throws IOException {
SearchRequest getAllRequest = new SearchRequest();
getAllRequest.indices(Constant.INDEX);
getAllRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
SearchResponse getAllResponse = client.search(getAllRequest, RequestOptions.DEFAULT);
SearchHits hits = getAllResponse.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

3.2.7 查询结果过滤

全量查询文档结果处理(字段排序、过滤字段)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public List<UserDocument> queryFilterDocument() throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
builder.sort("age", SortOrder.DESC);
String[] excludes = {"id","city"};
String[] includes = {};
builder.fetchSource(includes, excludes);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

3.2.8 分页查询文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public List<UserDocument> queryPageDocument(int from, int size) throws IOException {
SearchRequest getPartRequest = new SearchRequest();
getPartRequest.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
builder.from(from); // 分页起始位置,(当前页码-1)*每页显示数据条数
builder.size(size); // 每页展示条数
getPartRequest.source(builder);
SearchResponse response = client.search(getPartRequest, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

3.2.9 条件查询文档

单条件查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public List<UserDocument> querySingleConditionDocument(String name) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
request.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("name", name)));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

组合条件查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public List<UserDocument> queryCombinationConditionDocument(String name,String city) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("name", name));
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("city", city));
builder.query(boolQueryBuilder);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

3.2.10 范围查询文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public List<UserDocument> queryRangeDocument(int minAge, int maxAge) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// gt大于,gte大于等于,lt小于,lte小于等于
rangeQuery.gte(minAge);
rangeQuery.lt(maxAge);
builder.query(rangeQuery);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

3.2.11 模糊查询文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public List<UserDocument> queryFuzzyDocument(String name) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.fuzzyQuery("name", name).fuzziness(Fuzziness.ONE)); // 模糊字段偏移量
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}

4. 参考资料

[1] Elasticsearch:权威指南 from 官方文档

[2] Java高级REST客户端使用指南 from 官方文档

[3] 尚硅谷-ElasticSearch教程入门到精通 from Bilibili

[4] “Exception in thread “I/O dispatcher 1” java.lang.AssertionError“报错的解决方案 from Github issue

[5] Elasticsearch客户端基本身份验证 from 官方文档

[6] ES 既是搜索引擎又是数据库?真的有那么全能吗?from InfoQ

[7] Elasticsearch学习笔记 from CSDN