# 1. Elasticsearch简介
# 1.1 基本介绍
ElasticSearch(ES)是一个基于Lucene构建的开源、分布式、RESTful接口的全文搜索引擎。ES还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,ES能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的引擎。
# 1.2 关键概念
为了快速了解ES的关键概念及与传统关系型数据库的不同,可以与MySQL从几个方面做个对比。
[1] 结构名称不同
ElasticSearch | MySQL |
---|---|
字段(Field) | 属性(列) |
文档(Document) | 记录(行) |
类型(Type) | 表 |
索引(Index) | 数据库 |
注:ES 在7.0以及之后的版本中 Type 被废弃了。一个 index 中只有一个默认的 type,即 _doc。被废弃后,库表合一,Index 既可以被认为是对应 MySQL 的 Database,也可以认为是对应的 Table。
[2] ES分布式搜索,传统数据库遍历式搜索
ES支持分片和复制,从而方便水平分割和扩展,复制保证了ES的高可用与高吞吐。
在ES中,当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的索引,索引可以被放置到集群中的任何节点上。分片优点:
- 允许你水平分割/扩展你的内容容量。
- 允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量。
- 分片的分布,它的文档怎样聚合回搜索请求,完全由Elasticsearch管理。
[3] ES采用倒排索引,传统数据库采用B+树索引
假设一个文档(用id标识)是有许多的单词(用value标识)组成的,每个单词可能同一个文档中重复出现很多次,也可能出现在不同的文档中。
正排索引:从文档角度看其中的单词,表示每个文档都含有哪些单词,以及每个单词出现了多少次(词频)及其出现位置(相对于文档首部的偏移量)。【即id ---> value】
倒排索引:从单词角度看文档,标识每个单词分别在那些文档中出现(文档ID),以及在各自的文档中每个单词分别出现了多少次(词频)- 及其出现位置(相对于该文档首部的偏移量)。【即value ---> id】
ES中为所有字段默认都建了倒排索引。
# 1.3 倒排索引
[1] 正排索引与倒排索引
所谓正排索引,就是以文本为key,以分词的列表为value,通过检索文本信息来找到对应的分词列表。
然而在实际进行搜索时所需要的结果恰恰与之相反,即以分词列表为key,检索包含该词条的文本,这种结构即为倒排索引。
[2] ES倒排索引机制
在ES的倒排索引机制中有四个重要的名词:Term、Term Dictionary、Term Index、Posting List。
Term(词条):词条是索引里面最小的存储和查询单元。一段文本经过分析器分析以后就会输出一串词条。一般来说英文语境中词条是一个单词,中文语境中一个词条是分词后的一个词组。
此处涉及到分词器,分词器的作用是将一段文字分解为若干个词组,不同的分词器使用的分词算法不同,得到的分词结果也不同。
Term Dictionary(词典):词典是词条的集合,顾名思义,词典中维护的是Term。词典一般是由文本集合中出现过的所有词条所组成的集合。
Term Index(词条索引):由于词典中维护着文本中所有的词条,为了在其中更快的找到某个词条,我们为词条建立索引。通过压缩算法,词条索引的大小只有所有词条的几十分之一,因此词条索引可以存储在内存中,因此可以提供更快的查找速度。
Posting List(倒排表):倒排表记录的是词条出现在哪些文档里,以及出现的位置和频率等信息。倒排表中的每条记录称为一个倒排项(posting)。
将以上概念类比到词典中,Term相当于词典中的词语,Term Dictionary相当于词典本身,Term Index相当于词典的目录。
假设现在我们输入系统多段文本,经过分词器分词后得到以下词条:elastic、flink、hadoop、kafka、spark,我们使用ES进行全文搜索时,如图所示,系统首先会通过Term Index找到该Term在Term Dictionary中的位置,再通过倒排索引结构找到对应的Posting,从而定位到该词组在文本中的位置,完成一次搜索。
# 1.4 分片与副本
[1] 分片(shard)
- 当有大量的文档时,由于内存的限制、磁盘处理能力不足、无法足够快的响应客户端的请求等,一个节点可能不够。这种情况下,数据可以分为较小的分片。每个分片放到不同的服务器上。
- 当你查询的索引分布在多个分片上时,ES会把查询发送给每个相关的分片,并将结果组合在一起,而应用程序并不知道分片的存在。
[2] 副本(Replia)
- 为提高查询吞吐量或实现高可用性,可以使用副本。
- 副本是一个分片的精确复制,每个分片可以有零个或多个副本。ES中可以有许多相同的分片,其中之一被选择更改索引操作,这种特殊的分片称为主分片。 当主分片丢失时,如:该分片所在的数据不可用时,集群将副本提升为新的主分片。
[3] 分片与副本的区别
- 当你分片设置为5,数据量为30G时,es会自动帮我们把数据均衡地分配到5个分片上,即每个分片大概有6G数据,当你查询数据时,ES会把查询发送给每个相关的分片,并将结果组合在一起。
- 而副本,就是对分布在5个分片的数据进行复制。因为分片是把数据进行分割而已,数据依然只有一份,这样的目的是保障查询的高效性,副本则是多复制几份分片的数据,这样的目的是保障数据的高可靠性,防止数据丢失。
注意:索引建立后,分片个数是不可以更改的,想要调整就只能删除索引重建。
# 1.5 元字段介绍与业务字段类型
ES文档字段分为两类:
- 元字段(Meta-field) :不需要用户定义,在任一文档中都存在, 如
_id
、_index
、_type
等 - 业务字段: 用户自定义的字段。
元字段在名称上有一个显著特征,就是以下划线_
开头,有些字段只是为了存储,会出现在文档检索的结果中,却不能通过这个字段本身做检索,如_source
;有些字段则只是为了索引,他会创建出一个索引,用户可以在这个索引上检索文档,但这个字段却不会出现在最终的检索结果中, 如_all
字段。 且不是所有的字段都是默认开启的,有些元字段需要在索引中配置开启才可使用。
下面是一些元字段的介绍:
业务字段我们才会考虑ES支持的数据类型:
- 字符串类型:text、keyword(当一个字段是要被全文搜索的,设置text类型,字段内容会被分析,text类型的字段不用于排序,很少用于聚合;keyword类型的字段只能通过精确值搜索到,如果字段需要进行过滤、排序、聚合,设置keyword类型)
- 整数类型:long、integer、short、byte(在满足需求的情况下,尽可能选择范围小的数据类型,字段的长度越短,索引和搜索的效率越高)
- 浮点类型:double--64位双精度IEEE 754浮点类型,float--32位单精度IEEE 754浮点类型,half_float:16位半精度IEEE 754浮点类型,scaled_float : 缩放类型的的浮点数
- 逻辑类型 :boolean
- 日期类型 :date(支持的格式“日期格式的字符串”、“long类型的毫秒数”、“integer的秒数”
- 范围类型 :range范围类型要求字段值描述的是一个数值、日期或IP地址的范围(具体类型又包含:integer_range、float_range、long_range、double_range、data_range、ip_range)
- 二进制类型 :binary(二进制字段是指用base64来表示索引中存储的二进制数据,可用来存储二进制形式的数据,例如图像)
- 复合类型:数组类型 array、对象类型 object(JSON格式对象数据)、嵌套类型 nested 、地理坐标类型 geo_point、地理地图 geo_shape 、IP类型ip、范围类型 completion、令牌计数类型 token_count、附件类型 attachment、抽取类型 percolator
# 1.6 适用情形
[1] 全文检索:Elasticsearch 靠全文检索起步,将 Lucene 开发包做成一个数据产品,屏蔽了 Lucene 各种复杂的设置,为开发人员提供了便利。
[2] 应用查询:Elasticsearch 最擅长的就是查询,基于倒排索引核心算法,查询性能强于 B-Tree 类型所有数据产品,尤其是关系型数据库方面。当数据量超过千万或者上亿时,数据检索的效率非常明显。
[3] 大数据领域:Elasticserach 已经成为大数据平台对外提供查询的重要组成部分之一。大数据平台将原始数据经过迭代计算,之后结果输出到一个数据库提供查询,特别是大批量的明细数据。
[4] 日志检索:著名的 ELK 三件套,讲的就是 Elasticsearch,Logstash,Kibana,专门针对日志采集、存储、查询设计的产品组合。
[5] 监控领域:指标监控,Elasticsearch 进入此领域比较晚,却赶上了好时代,Elasticsearch 由于其倒排索引核心算法,也是支持时序数据场景的,性能也是相当不错的,在功能性上完全压住时序数据库。
[6] 机器学习:很多数据产品都集成了,Elasticsearch真正将机器学习落地成为一个产品 ,简化使用,所见即所得。而不像其它数据产品,仅仅集成算法包,使用者还必须开发很多应用支持。
# 2. Docker-ElasticSearch环境搭建
# 2.1 拉取镜像并运行容器
# 2.1.1 部署命令
$ docker pull elasticsearch:7.16.2
$ docker run -d --name es \
-p 9200:9200 -p 9300:9300 \
-v /root/docker/es/data:/usr/share/elasticsearch/data \
-v /root/docker/es/config:/usr/share/elasticsearch/config \
-v /root/docker/es/plugins:/usr/share/elasticsearch/plugins \
-e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms1g -Xmx1g" \
elasticsearch:7.16.2
$ docker update es --restart=always
2
3
4
5
6
7
8
9
# 2.1.2 进入容器进行配置
这时使用docker ps
命令查看虽然运行起来了,但还无法访问,需要进入容器内部修改配置解决跨域问题。
$ docker ps
$ docker exec -it es /bin/bash
$ cd config
$ chmod o+w elasticsearch.yml
$ vi elasticsearch.yml
2
3
4
5
其中,在 elasticsearch.yml 文件的末尾添加以下三行代码(前两行如果开启则代表允许跨域,出于安全考虑把它关了,第三行开启xpack安全认证)
# http.cors.enabled: true
# http.cors.allow-origin: "*"
xpack.security.enabled: true
2
3
然后把权限修改回来,重启容器,设置账号密码,浏览器访问http://IP:9200
地址即可(用 elastic账号 和自己设置的密码登录即可)
$ chmod o-w elasticsearch.yml
$ exit
$ docker restart es
$ docker exec -it es /bin/bash
$ ./bin/elasticsearch-setup-passwords interactive // 然后设置一大堆账号密码
2
3
4
5
# 2.1.3 注意事项
1)Elasticsearch请选择7.16.0之后的版本,之前的所有版本都使用了易受攻击的 Log4j版本,存在严重安全漏洞。
2)ES_JAVA_OPTS="-Xms1g -Xmx1g"
只是一个示例,内存设置的少了会导致数据查询速度变慢,具体设置多少要根据业务需求来定,一般而言公司的实际项目要设置8g内存以上。
# 2.1.4 数据挂载遇到的问题
[1] 数据锁定问题
报错信息:
java.lang.IllegalStateException: failed to obtain node locks, tried [[/usr/share/elasticsearch/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing
产生原因:ES在运行时会在
/data/nodes/具体分片
目录里生成一个node.lock
文件,由于我是在运行期scp过来的挂载数据,这个也被拷贝过来了,导致数据被锁定。解决办法:删掉
/data/nodes/具体分片/node.lock
文件即可
[2] data目录权限问题
- 解决办法:进入容器内部,把data目录的权限设置为777即可
[3] 集群与单节点问题
- 解决办法:修改
config/elasticsearch.yml
里的集群配置即可,如果原来是集群,现在要单节点,就把集群配置去掉。
[4] 堆内存配置问题
报错信息:
initial heap size [8589934592] not equal to maximum heap size [17179869184]; this can cause resize pauses
解决办法:-Xms 与 -Xmx 设置成相同大小的内存。
# 2.2 可视化管理ES
# 2.2.1 使用Elasticvue浏览器插件
可借助 Elasticvue (opens new window) Chrome插件实现ES数据库的可视化管理,支持所有版本ES。
# 2.2.2 使用ElasticHD可视化面板
ElasticHD支持所有版本ES,特色功能是支持“SQL转DSL”。
项目地址:https://github.com/qax-os/ElasticHD (opens new window)
$ docker run -d --name elastichd -p 9800:9800 containerize/elastichd
$ docker update elastichd --restart=always
2
浏览器打开http://ip:9800/
地址,即可访问面板,在左上角配置ES连接信息即可。如果是带鉴权的ES,按照http://user:[email protected]:9800
配置ES连接信息即可。
在Tools——SQL Convert DSL处,可以编写SQL生成操作ES的DSL语句(作为辅助手段使用,一些复杂的SQL可能不适用)
另注:也可以使用一些在线工具进行转换,例如,https://printlove.cn/tools/sql2es (opens new window)、http://sql2dsl.atotoa.com (opens new window)
# 2.2.3 安装Kibana可视化插件
下载与ES版本相同的Kibana
$ mkdir -p /root/kibana
$ cd /root/kibana
$ wget https://artifacts.elastic.co/downloads/kibana/kibana-7.16.2-linux-x86_64.tar.gz
$ tar -zxvf kibana-7.16.2-linux-x86_64.tar.gz
$ cd /root/kibana/kibana-7.16.2-linux-x86_64
$ vi /config/kibana.yml
2
3
4
5
6
修改配置文件内容如下(用不到的我这里给删掉了,原配置文件有着很详尽的英文说明):
server.port: 5601
server.host: "ip"
elasticsearch.hosts: ["http://ip:9200"]
elasticsearch.username: "username"
elasticsearch.password: "password"
i18n.locale: "zh-CN"
2
3
4
5
6
启动kibana:
$ cd /root/kibana/kibana-7.16.2-linux-x86_64/bin # 进入可执行目录
$ nohup /root/kibana/kibana-7.16.2-linux-x86_64/bin/kibana & # 启动kibana
2
说明:如果是root用户,会报Kibana should not be run as root. Use --allow-root to continue.
的错误,建议切换别的用户去执行,如果就是想用root用户启动,则使用nohup /root/docker/kibana/kibana-7.16.2-linux-x86_64/bin/kibana --allow-root &
。
启动成功后,浏览器打开http://ip:5601/
地址,用es的用户名和密码进行登录,就可以使用了。
关闭kibana:
$ ps -ef | grep kibana
$ kill -9 [PID]
2
# 2.3 安装ik分词器插件
IK 分析插件将 Lucene IK 分析器集成到 elasticsearch 中,支持自定义字典。
安装方式:挂载目录或者进容器下载(版本一定不要安装错,不然就进不去容器了)
方式一:去Releases下载对应ES版本的ik分词器插件,然后上传到plugins目录将其挂载到容器内。
方式二:进入容器内直接下载对应ES版本的ik分词器插件,并放到相应目录。
$ docker exec -it es /bin/bash $ apt-get install -y wget $ wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip $ unzip -o -d /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2 /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2.zip $ rm –f elasticsearch-analysis-ik-7.16.2.zip $ mv /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2 /usr/share/elasticsearch/plugins/ik $ exit $ docker restart es
1
2
3
4
5
6
7
8
测试方式:可以进行存在测试和功能测试
$ docker exec -it es /bin/bash
$ cd /usr/share/elasticsearch/bin
$ elasticsearch-plugin list
2
3
ik分词器有2种算法:ik_smart和ik_max_word,下面我们通过postman工具来测试ik分词器的分词算法。
[1] 测试ik_smart分词
请求url:http://ip:9200/_analyze 请求方式:get
请求参数:
{
"analyzer":"ik_smart",
"text":"我爱你,特靠谱"
}
2
3
4
[2] 测试ik_max_word分词
请求url:http://ip:9200/_analyze 请求方式:get
请求参数:
{
"analyzer":"ik_max_word",
"text":"我爱你,特靠谱"
}
2
3
4
上面测试例子可以看到,不管是ik_smart还是ik_max_word算法,都不认为"特靠谱"是一个关键词(ik分词器的自带词库中没有有"特靠谱"这个词),所以将这个词拆成了三个词:特、靠、谱。
自定义词库:ik分词器会把分词库中没有的中文按每个字进行拆分。如果不想被拆分,那么就需要维护一套自己的分词库。
Step1:进入
ik分词器路径/config
目录,新建一个my.dic
文件,添加一些关键词,如"特靠谱"、"靠谱"等,每一行就是一个关键词。Step2:修改配置文件
IKAnalyzer.cfg.xml
,配置<entry key="ext_dict"></entry>
。<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <!--用户可以在这里配置自己的扩展字典 --> <entry key="ext_dict">my.dic</entry> <!--用户可以在这里配置自己的扩展停止词字典--> <entry key="ext_stopwords"></entry> <!--用户可以在这里配置远程扩展字典 --> <!-- <entry key="remote_ext_dict">words_location</entry> --> <!--用户可以在这里配置远程扩展停止词字典--> <!-- <entry key="remote_ext_stopwords">words_location</entry> --> </properties>
1
2
3
4
5
6
7
8
9
10
11
12
13Step3:重启ES,并再次使用Postman测试上述请求,发现"特靠谱"、"靠谱"等将其视为一个词了。
# 2.4 使用curl命令操作ES
# 2.4.1 索引操作
// 查询所有索引
$ curl -u 用户名:密码 http://ip:port/_cat/indices
// 删除索引(包含结构)
$ curl -u 用户名:密码 -XDELETE http://ip:port/索引名
// 清空索引(不包含结构,即删除所有文档)
$ curl -u 用户名:密码 -XPOST 'http://ip:port/索引名/_delete_by_query?refresh&slices=5&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'
// 创建索引
$ curl -u 用户名:密码 -XPUT 'http://ip:port/索引名' -H 'Content-Type: application/json' -d'
{
"settings" : {
"index" : {
"number_of_shards" : "5",
"number_of_replicas" : "1"
}
},
"mappings" : {
"properties" : {
"post_date": {
"type": "date"
},
"tags": {
"type": "keyword"
},
"title" : {
"type" : "text"
}
}
}
}'
// 修改索引
$ curl -u 用户名:密码 -XPUT 'http://ip:port/索引名/_mapping' -H 'Content-Type: application/json' -d'
{
"properties" : {
"post_date": {
"type": "date"
},
"tags_modify": {
"type": "keyword"
},
"title" : {
"type" : "text"
},
"content": {
"type": "text"
}
}
}'
// 调整副本数量(分片数量不可调整,要修改就只能删除索引重建)
$ curl -u 用户名:密码 -XPUT 'ip:port/索引名/_settings' -H 'Content-Type: application/json' -d '
{
"index": {
"number_of_replicas": "0"
}
}'
// 查看单个索引信息(可以查看到单个索引的数据量)
$ curl -u 用户名:密码 -XGET 'http://ip:port/_cat/indices/index_1?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open index_1 aado9-iGRFGN9twQb040ds 5 1 28800345 0 3gb 1.5gb
// 按照文档数量排序索引(可以查看到所有索引的数据量)
$ curl -u 用户名:密码 -XGET 'http://ip:port/_cat/indices?v&s=docs.count:desc'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
注意事项:创建索引时,有的教程在“mappings”里嵌套了“_doc”,会报如下错误,这是因为版本 7.x 不再支持映射类型,将其删除即可。
{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The mapping definition cannot be nested under a type [_doc] unless include_type_name is set to true."}],"type":"illegal_argument_exception","reason":"The mapping definition cannot be nested under a type [_doc] unless include_type_name is set to true."},"status":400}%
# 2.4.2 文档操作
// 根据_id查询文档
$ curl -u 用户名:密码 -XGET 'http://ip:port/索引名/_doc/1'
// 新增和修改文档
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XPOST 'http://ip:port/索引名/_doc/1' -d '
{
"msgId": "10010",
"title": "test-title",
"content": "test-content",
"isDeleted": 0,
"publishTime": "1586707200000",
"insertTime": "1668212021000",
"updateTime": "1678687631000"
}'
// 根据_id删除文档
$ curl -u 用户名:密码 -XDELETE "http://ip:port/索引名/_doc/1"
// 查询所有数据
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"match_all":{}}}'
// 查询指定条数的数据
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"match_all":{}},"size":2}'
// 查询指定列数据
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"match_all":{}},"_source":["publishTime","updateTime"]}'
// 查询数据并排序
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"match_all":{}},"sort":{"_id":{"order":"desc"}}}'
// 匹配查询
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"match":{"title":"test"}}}'
// 精准查询
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"term":{"title.keyword":"test-title"}}}'
// 范围查询
$ curl -u 用户名:密码 -H "Content-Type:application/json" -XGET http://ip:port/索引名/_search?pretty -d '{"query":{"range":{"msgId":{"gt":"1","lte":"20000"}}}}'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 3. ElasticSearch与Springboot的整合
# 3.1 依赖及配置文件
# 3.1.1 配置项目依赖
使用Maven拉取项目依赖,注意服务端与高级客户端的版本要与你搭建的Elasticsearch服务版本一致。
pom.xml
<!--统一管理全局变量-->
<properties>
<elasticsearch.version>7.16.2</elasticsearch.version>
<elasticsearch.rest.high.level.client.version>7.16.2</elasticsearch.rest.high.level.client.version>
</properties>
<!-- elasticsearch 服务端 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- elasticsearch 高级客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.rest.high.level.client.version}</version>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
注意事项:
1)Elasticsearch请选择7.16.0之后的版本,之前的所有版本都使用了易受攻击的 Log4j2版本,存在严重安全漏洞。
2)如果缺失 httpcore 和 httpclient 依赖,可手动进行添加。
<!--统一管理全局变量-->
<properties>
<httpclient.version>4.5.5</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
</properties>
<!-- HttpClient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- HttpCore -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3.1.2 编写配置文件
我搭建ES服务时开启了xpack安全验证,所以是需要账号密码的,没开启的话就不需要。
application.properties
## elasticsearch配置
elasticsearch.host=127.0.0.1
elasticsearch.port=9200
elasticsearch.username=username
elasticsearch.password=password
2
3
4
5
/config/ElasticsearchConfiguration.java
@Configuration
public class ElasticsearchConfiguration {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
@Bean(destroyMethod = "close", name = "client")
public RestHighLevelClient initRestClient() {
// 用户认证对象
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// 设置账号密码
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
// 创建rest client对象
RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider);
}
});
return new RestHighLevelClient(builder);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 3.2 基本增删查改
完整示例代码已在Github上开源:https://github.com/Logistic98/es-springboot-demo (opens new window)
# 3.2.1 索引操作
[1] 创建索引
@Override
public boolean createIndex(String index) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
}
2
3
4
5
6
[2] 查询索引
@Override
public String[] queryIndex(String index) throws IOException {
GetIndexRequest queryIndexRequest = new GetIndexRequest(index);
GetIndexResponse getIndexResponse = client.indices().get(queryIndexRequest, RequestOptions.DEFAULT);
return getIndexResponse.getIndices();
}
2
3
4
5
6
[3] 删除索引
@Override
public boolean deleteIndex(String index) throws IOException {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
return deleteIndexResponse.isAcknowledged();
}
2
3
4
5
6
[4] 检查索引是否存在
GetIndexRequest getIndexRequest = new GetIndexRequest (Constant.INDEX);
boolean exists = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
2
# 3.2.2 文档操作
公共文件如下:
/constant/Constant.java
public interface Constant {
String INDEX = "user";
}
2
3
/pojo/UserDocument.java
@Data
public class UserDocument {
private String id;
private String name;
private String sex;
private Integer age;
private String city;
}
2
3
4
5
6
7
8
[1] 新增文档
@Override
public Boolean createDocument(UserDocument document) throws Exception {
String id = document.getId();
IndexRequest indexRequest = new IndexRequest(Constant.INDEX)
.id(id)
.source(JSON.toJSONString(document), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
return indexResponse.status().equals(RestStatus.CREATED);
}
2
3
4
5
6
7
8
9
注:如果没加.id(id)
部分,则会自动生成一个20位的 UUID 作为 _id 字段。
[2] 查询文档
@Override
public UserDocument queryDocument(String id) throws IOException {
GetRequest getRequest = new GetRequest(Constant.INDEX, id);
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
UserDocument result = new UserDocument();
if (getResponse.isExists()) {
String sourceAsString = getResponse.getSourceAsString();
result = JSON.parseObject(sourceAsString, UserDocument.class);
} else {
logger.error("没有找到该 id 的文档");
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
[3] 修改文档
@Override
public Boolean updateDocument(UserDocument document) throws Exception {
String id = document.getId();
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(Constant.INDEX).id(id);
updateRequest.doc(JSON.toJSONString(document), XContentType.JSON);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
return updateResponse.status().equals(RestStatus.OK);
}
2
3
4
5
6
7
8
9
[4] 删除文档
@Override
public String deleteDocument(String id) throws Exception {
DeleteRequest deleteRequest = new DeleteRequest(Constant.INDEX, id);
DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
return response.getResult().name();
}
2
3
4
5
6
[5] 批量操作文档
批量新增:
@Override
public Boolean bulkCreateDocument(List<UserDocument> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (UserDocument document : documents) {
String id = document.getId();
IndexRequest indexRequest = new IndexRequest(Constant.INDEX)
.id(id)
.source(JSON.toJSONString(document), XContentType.JSON);
bulkRequest.add(indexRequest);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulkResponse.status().equals(RestStatus.OK);
}
2
3
4
5
6
7
8
9
10
11
12
13
批量删除:
@Override
public Boolean bulkDeleteDocument(List<UserDocument> documents) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (UserDocument document : documents) {
String id = document.getId();
bulkRequest.add(new DeleteRequest().index(Constant.INDEX).id(id));
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulkResponse.status().equals(RestStatus.OK);
}
2
3
4
5
6
7
8
9
10
[6] 全量查询文档
@Override
public List<UserDocument> queryAllDocument() throws IOException {
SearchRequest getAllRequest = new SearchRequest();
getAllRequest.indices(Constant.INDEX);
getAllRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
SearchResponse getAllResponse = client.search(getAllRequest, RequestOptions.DEFAULT);
SearchHits hits = getAllResponse.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
[7] 查询结果过滤
全量查询文档结果处理(字段排序、过滤字段)
@Override
public List<UserDocument> queryFilterDocument() throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
builder.sort("age", SortOrder.DESC);
String[] excludes = {"id","city"};
String[] includes = {};
builder.fetchSource(includes, excludes);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[8] 分页查询文档
@Override
public List<UserDocument> queryPageDocument(int from, int size) throws IOException {
SearchRequest getPartRequest = new SearchRequest();
getPartRequest.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
builder.from(from); // 分页起始位置,(当前页码-1)*每页显示数据条数
builder.size(size); // 每页展示条数
getPartRequest.source(builder);
SearchResponse response = client.search(getPartRequest, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[9] 条件查询文档
单条件查询:
@Override
public List<UserDocument> querySingleConditionDocument(String name) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
request.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("name", name)));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
组合条件查询:
@Override
public List<UserDocument> queryCombinationConditionDocument(String name,String city) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("name", name));
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("city", city));
builder.query(boolQueryBuilder);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
注意事项:matchQuery搜索的时候,首先会解析查询字符串,进行分词然后再查询;而termQuery,输入的查询内容是什么,就会按照什么去查询,并不会解析查询内容。
[10] 范围查询文档
@Override
public List<UserDocument> queryRangeDocument(int minAge, int maxAge) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// gt大于,gte大于等于,lt小于,lte小于等于
rangeQuery.gte(minAge);
rangeQuery.lt(maxAge);
builder.query(rangeQuery);
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[11] 模糊查询文档
@Override
public List<UserDocument> queryFuzzyDocument(String name) throws IOException {
SearchRequest request = new SearchRequest();
request.indices(Constant.INDEX);
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.fuzzyQuery("name", name).fuzziness(Fuzziness.ONE)); // 模糊字段偏移量
request.source(builder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<UserDocument> result = new ArrayList<>();
for ( SearchHit hit : hits ) {
result.add(JSON.parseObject(hit.getSourceAsString(), UserDocument.class));
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.3 通用方法封装
为了减少代码重复,实际开发中会将常用的ES方法进行封装,使用时直接传参调用即可。
其中 pom.xml
、application.properties
、/config/ElasticsearchConfiguration.java
等文件的配置同上。
/service/ElasticSearchService.java的代码如下:
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 通用ElasticSearch工具接口
*/
@Slf4j
@Service
public class ElasticSearchService {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 共用方法:ElasticSearch通用查询
* @param searchSourceBuilder
* @param index
* @return
*/
private SearchResponse pageQuerySearchResponse(SearchSourceBuilder searchSourceBuilder, String index) {
SearchRequest searchRequest = new SearchRequest()
.source(searchSourceBuilder)
.indices(index);
SearchResponse searchResponse = null;
try {
searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return searchResponse;
}
/**
* 共用方法:ElasticSearch批量操作(新增、删除)数据
*
* @param bulkRequest
*/
private void esBatch(BulkRequest bulkRequest) {
try {
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
log.error("bulk错误信息:{}", bulkResponse.buildFailureMessage());
}
} catch (Exception e) {
log.error("批量操作es数据错误", e);
}
}
/**
* 查询指定条件的数据
* @param indexName 索引名称
* @param sortField 排序字段
* @param sortType 排序类别
* @param page 页码
* @param rows 每页大小
* @param boolQueryBuilder 查询条件
* @param includeFields 返回字段
* @param excludeFields 排除字段
* @return
*/
public SearchResponse search(String indexName, String sortField, String sortType, Integer page, Integer rows, BoolQueryBuilder boolQueryBuilder,
String[] includeFields, String[] excludeFields) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.from((page - 1) * rows)
.size(rows)
.fetchSource(includeFields, excludeFields)
.trackTotalHits(true);
SortOrder so = SortOrder.DESC;
if (ObjectUtil.isNotEmpty(sortType) && "asc".equals(sortType)) {
so = SortOrder.ASC;
}
searchSourceBuilder.sort(sortField, so);
searchSourceBuilder.query(boolQueryBuilder);
return pageQuerySearchResponse(searchSourceBuilder, indexName);
}
/**
* 查询单条数据
* @param indexName 索引名称
* @param boolQueryBuilder 查询条件
* @return
*/
public Map<String, Object> searchOne(String indexName, BoolQueryBuilder boolQueryBuilder) {
Map<String, Object> resultMap = new HashMap();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(0).size(1);
searchSourceBuilder.query(boolQueryBuilder);
SearchResponse searchResponse = pageQuerySearchResponse(searchSourceBuilder, indexName);
if (searchResponse.getHits().getTotalHits().value > 0) {
SearchHit searchHit = searchResponse.getHits().getHits()[0];
resultMap = searchHit.getSourceAsMap();
}
return resultMap;
}
/**
* 获取指定索引下的数据量
*
* @param indexName
* @return
*/
public Long getCountByIndex(String indexName) {
Long totalHites = 0L;
if (!StrUtil.isEmpty(indexName)) {
try {
CountRequest countRequest = new CountRequest(indexName);
totalHites = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT).getCount();
} catch (Exception e) {
e.printStackTrace();
}
}
return totalHites;
}
/**
* 查询聚合数据
* @param indexName 索引名称
* @param boolQueryBuilder 查询条件
* @param aggregationBuilder 聚合条件
* @param includeFields 返回字段
* @param excludeFields 排除字段
* @return
*/
public SearchResponse aggsSearch(String indexName, BoolQueryBuilder boolQueryBuilder, AggregationBuilder aggregationBuilder,
String[] includeFields, String[] excludeFields) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.size(0)
.fetchSource(includeFields, excludeFields)
.trackTotalHits(true);
searchSourceBuilder.aggregation(aggregationBuilder);
searchSourceBuilder.query(boolQueryBuilder);
return pageQuerySearchResponse(searchSourceBuilder, indexName);
}
/**
* 根据图片特征向量实现以图搜图(字段类型需为dense_vector)
* @param indexName
* @param scriptScoreQueryBuilder
* @param score
* @param page
* @param rows
* @param includeFields
* @param excludeFields
* @return
*/
public SearchResponse imageSearch(String indexName, ScriptScoreQueryBuilder scriptScoreQueryBuilder, Float score, Integer page, Integer rows, String[] includeFields, String[] excludeFields) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);
sourceBuilder.query(scriptScoreQueryBuilder);
sourceBuilder.from((page - 1) * rows);
sourceBuilder.size(rows);
sourceBuilder.minScore(score);
sourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS));
sourceBuilder.fetchSource(includeFields, excludeFields);
return pageQuerySearchResponse(sourceBuilder, indexName);
}
/**
* 批量添加数据(自动生成id)
* @param indexName
* @param dataList
*/
public void addBatchDataAutoId(String indexName, List<Map> dataList) {
BulkRequest request = new BulkRequest();
try {
int count = 0;
for (Map map : dataList) {
request.add(new IndexRequest(indexName).source(map));
count++;
if (count == 1000) {
esBatch(request);
count = 0;
}
}
if (count != 0) {
esBatch(request);
}
} catch (Exception e) {
log.error("生成BulkRequest错误", e);
}
}
/**
* 批量添加数据(主动指定id)
* @param indexName
* @param dataList
*/
public void addBatchData(String indexName, List<Map> dataList) {
BulkRequest request = new BulkRequest();
try {
int count = 0;
for (Map map : dataList) {
String id = map.get("id").toString();
request.add(new IndexRequest(indexName).id(id).source(map));
count++;
if (count == 1000) {
esBatch(request);
count = 0;
}
}
if (count != 0) {
esBatch(request);
}
} catch (Exception e) {
log.error("生成BulkRequest错误", e);
}
}
/**
* 批量删除数据
* @param indexName
* @param idList
*/
public void deleteBatchData(String indexName, List<String> idList) {
BulkRequest request = new BulkRequest();
try {
int count = 0;
for (String id : idList) {
request.add(new DeleteRequest().index(indexName).id(id));
count++;
if (count == 1000) {
esBatch(request);
count = 0;
}
}
if (count != 0) {
esBatch(request);
}
} catch (Exception e) {
log.error("生成BulkRequest错误", e);
}
}
/**
* 修改数据
* @param indexName
* @param dataMap
*/
public void update(String indexName, Map dataMap) {
String contentId = dataMap.get("id").toString();
UpdateRequest updateRequest = new UpdateRequest(indexName, "_doc", contentId);
updateRequest.doc(JSONUtil.parse(dataMap.get("data")).toString(), XContentType.JSON);
try {
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# 3.4 常见问题
# 3.4.1 依赖版本不正确的问题
问题情景:使用上述通用封装的“修改数据”方法时,意外报错Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: org/elasticsearch/xcontent/XContentType
,是依赖问题。
问题排查:首先我排查了引用的项目依赖,发现其中有 7.16.2 和 7.12.1 两个版本,实际用的是 7.12.1 版本,那个版本缺东西所以报错,但是我并没有配置过7.12.1版本的ES。查阅资料后发现,是 2.5.4 版本的 spring-boot-starter-parent 里默认指定了ES版本为7.12.1,所以导致了该问题。
Step1:找到项目总 pom.xml ,找到 spring-boot-starter-parent,选中后按 Ctrl + 单击进行跳转
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.4</version> <relativePath/> <!-- lookup parent from repository --> </parent>
1
2
3
4
5
6Step2:在跳转文件 spring-boot-starter-parent-2.5.4.pom 里找到 spring-boot-dependencies,选中后按 Ctrl + 单击进行跳转
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.5.4</version> </parent>
1
2
3
4
5Step3:在跳转文件 spring-boot-dependencies-2.5.4.pom 里我们可以看到默认使用了 7.12.1 版本的ES
<properties> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
1
2
3
问题解决:找到了问题所在,我们只需要在项目总 pom.xml 里指定一下我们需要的 ES 版本即可。
<properties>
<!--2.5.4版本的 spring-boot-starter-parent 默认使用的es依赖为 7.12.1,因此这里需要手动指定-->
<elasticsearch.version>7.16.2</elasticsearch.version>
</properties>
2
3
4
# 3.4.2 ES出错返回404状态码的问题
问题情景:前端在请求 业务API 时返回了404状态码,一开始以为是请求路径不对,后来发现是涉及ES的业务代码出错导致的。
问题原因:Java对ES的请求API进行了封装,ES代码出错实际对应的是调用API请求ES出错,所以就返回了404。
问题解决:查看日志找到开头的错误原因进行修复即可(ES出错的日志特别多,可能会超出默认控制台的显示上限,查看不到有效的报错信息,这里修改IDEA的/安装目录/bin/idea.properties
配置文件,可以取消限制,具体操作以MacOS为例如下)
Step1:打开 Finder 通过 Application 找到 IDEA 应用,右键点击“显示包内容”。
Step2:修改
/安装目录/bin/idea.properties
配置文件,将idea.cycle.buffer.size
设置为 disabled#--------------------------------------------------------------------- # This option controls console cyclic buffer: keeps the console output size not higher than the specified buffer size (KiB). # Older lines are deleted. In order to disable cycle buffer use idea.cycle.buffer.size=disabled #--------------------------------------------------------------------- #idea.cycle.buffer.size=1024 idea.cycle.buffer.size=disabled
1
2
3
4
5
6Step3:重启IDEA即可(如果还不好使的话,可能是插件影响了,比如 Grep console,卸载掉就好了)
# 4. Python操作ElasticSearch
# 4.1 数据导入导出
Step1:安装依赖并编写配置文件
$ pip install elasticsearch==7.16.2 // 注意要和服务端的ES版本尽量相近(实测7.16.2、8.4.1是没问题的)
config.ini(把ES连接信息换成自己的)
[SOURCE_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 滚动查询的超时时间,这里设置为10分钟
scroll = 10m
# 单次查询的条数
size = 1000
# 索引列表(多个之间用","分隔)
index_list = index_1,index_2
[TARGET_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 单次批量插入数据量
step = 1000
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
注:多个索引之间用英文逗号分隔(逗号后面有没有空格都无所谓,读取配置时会进行处理)
log.py
# -*- coding: utf-8 -*-
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./es_data.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Step2:编写ES导入导出脚本并执行
export_es_data.py
# -*- coding: utf-8 -*-
import json
import os
import sys
import time
from decimal import Decimal
from elasticsearch import Elasticsearch
from configparser import ConfigParser
from log import logger
# 将ES查询出来list写入到json文件
def write_list_to_json(list, json_file_path):
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(list, f, ensure_ascii=False)
# 将符合条件的ES数据查出保存为json
def es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time):
for i in index_list:
logger.info("正在保存{}索引的数据".format(i))
query = {
"range": {
"update_time": {
# 小于等于本次读取开始时间
"lte": now_time
}
}
}
logger.info("全量导出,时间范围为{}之前".format(now_time))
try:
source_list = []
# 滚动查询符合条件的所有es数据
page = es_connect.search(index=i, query=query, size=es_size, scroll=es_scroll)
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
# 游标用于输出es查询出的所有结果
sid = page['_scroll_id']
# es查询出的结果总量
scroll_size = page['hits']['total']['value']
while (scroll_size > 0):
page = es_connect.scroll(scroll_id=sid, scroll=es_scroll)
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
json_file_path = original_data_path + "/" + str(i) + ".json"
if len(source_list) != 0:
write_list_to_json(source_list, json_file_path)
logger.info('{}索引的数据已保存至{}路径,导出的数据总量为{}'.format(str(i), json_file_path, str(len(source_list))))
else:
logger.info('{}索引无更新'.format(str(i)))
except Exception as e:
logger.error("ES索引数据导出至JSON文件的过程出错:{}".format(e))
# 将符合条件的ES数据查出保存为json--调用入口
def export_es_data_main(source_export_dict, original_data_path, now_time):
es_connect = Elasticsearch(
hosts=[str(source_export_dict['es_host']) + ":" + str(source_export_dict['es_port'])],
http_auth=(str(source_export_dict['es_user']), str(source_export_dict['es_password'])),
request_timeout=int(source_export_dict['es_timeout'])
)
index_list = ''.join(source_export_dict['es_index_list'].split()).split(",")
es_size = int(source_export_dict['es_size'])
es_scroll = str(source_export_dict['es_scroll'])
es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time)
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
if __name__ == '__main__':
# 获取基础路径并读取配置信息
base_path = os.getcwd()
logger.info("基础路径:{}".format(base_path))
config_path = base_path + '/config.ini'
logger.info("配置文件路径:{}".format(config_path))
source_export_dict = {}
try:
source_export_dict = read_config(config_path)
except:
logger.error("读取配置文件出错,程序已终止执行")
sys.exit()
# 导出ES数据并统计耗时
start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info("----------开始导出源数据----------")
export_es_data_main(source_export_dict, base_path, start_time_str)
end_time = time.time()
time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
logger.info("----------导出源数据已完成,共耗时:{}----------".format(time_consuming_str))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import_es_data.py
# -*- coding: utf-8 -*-
import glob
import os
import sys
import time
from decimal import Decimal
from elasticsearch import Elasticsearch, helpers
from configparser import ConfigParser
from log import logger
# 读取json文件并批量写入ES(不存在则插入,存在则更新)
def read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step):
with open(json_path, 'r', encoding='utf-8') as file:
json_str = file.read()
# json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None
null = None # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
# 将字符串形式的列表数据转成列表数据
json_list = eval(json_str)
# 按照配置文件中的步长进行写入,缓解批量写入的压力
length = len(json_list)
for i in range(0, length, es_step):
# 要写入的数据长度大于步长,那么就分批写入
if i + es_step < length:
actions = []
for j in range(i, i + es_step):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(es_connect, actions, request_timeout=es_timeout)
# 要写入的数据小于步长,那么就一次性写入
else:
actions = []
for j in range(i, length):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(es_connect, actions, request_timeout=es_timeout)
logger.info("{}索引插入了{}条数据".format(str(index_name), str(length)))
# 将json数据文件导入到ES--调用入口
def import_es_data_main(target_import_dict, original_data_path):
es_timeout = int(target_import_dict['es_timeout'])
es_step = int(target_import_dict['es_step'])
es_connect = Elasticsearch(
hosts=[str(target_import_dict['es_host']) + ":" + str(target_import_dict['es_port'])],
http_auth=(str(target_import_dict['es_user']), str(target_import_dict['es_password'])),
request_timeout=es_timeout
)
json_path_list = glob.glob(original_data_path + '/*.json')
for json_path in json_path_list:
file_dir, file_full_name = os.path.split(json_path)
index_name, file_ext = os.path.splitext(file_full_name)
read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step)
os.remove(json_path) # 数据导入完成后删除json数据文件
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
if __name__ == '__main__':
# 获取基础路径并读取配置信息
base_path = os.getcwd()
logger.info("基础路径:{}".format(base_path))
config_path = base_path + '/config.ini'
logger.info("配置文件路径:{}".format(config_path))
target_import_dict = {}
try:
target_import_dict = read_config(config_path)
except:
logger.error("读取配置文件出错,程序已终止执行")
sys.exit()
# 导出ES数据并统计耗时
start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info("----------开始导入源数据----------")
import_es_data_main(target_import_dict, base_path)
end_time = time.time()
time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
logger.info("----------导入源数据已完成,共耗时:{}----------".format(time_consuming_str))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
注意:json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None。
null = None # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
# 4.2 滚动查询数据
ES默认限制10000条查询限制,想要查询出符合条件的所有数据可以使用滚动查询。
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
es_connect = Elasticsearch(
hosts="ip:port",
http_auth=("your_user", "your_password"),
request_timeout=60
)
source_list = []
# Elasticsearch 需要保持搜索的上下文环境多久,游标查询过期时间为10分钟(10m)
query = {
"match_all": {}
}
page = es_connect.search(index="your_index", query=query, size=1000, scroll='10m')
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
# 游标用于输出es查询出的所有结果
sid = page['_scroll_id']
# es查询出的结果总量
scroll_size = page['hits']['total']['value']
while (scroll_size > 0):
page = es_connect.scroll(scroll_id=sid, scroll='10m')
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
print(len(source_list))
print(source_list)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 5. 参考资料
[1] 安装elasticsearch-analysis-ik分词器插件 from CSDN (opens new window)
[2] Elasticsearch可视化工具 from CSDN (opens new window)
[3] Elasticsearch:权威指南 from 官方文档 (opens new window)
[4] Java高级REST客户端使用指南 from 官方文档 (opens new window)
[5] 尚硅谷-ElasticSearch教程入门到精通 from Bilibili (opens new window)
[7] Elasticsearch客户端基本身份验证 from 官方文档 (opens new window)
[8] ES 既是搜索引擎又是数据库?真的有那么全能吗?from InfoQ (opens new window)
[9] Elasticsearch学习笔记 from CSDN (opens new window)
[10] ES termQuery和matchQuery区别浅析 from CSDN (opens new window)
[11] SpringBoot查看和修改依赖的版本 from CSDN (opens new window)
[12] Elasticsearch基本CURD操作 from 知乎 (opens new window)
[13] NameError:名称“null”或“json”未在 Python 中定义 from bobbyhadz blog (opens new window)
[14] ElasticSearch——倒排索引和正向索引 from 稀土掘金 (opens new window)
[15] ES中的倒排索引机制 from CoolCode (opens new window)