基于ElasticSearch的全文搜索

什么是全文检索

全文搜索引擎是目前广泛应用的主流搜索引擎。它的工作原理是计算机索引程序通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,根据权重将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程

技术选型

Lucene

Lucene是一套用于全文检索和搜索的开放源代码程序库,由Apache软件基金会支持和提供。Lucene提供了一个简单却强大的应用程序接口,能够做全文索引和搜索,在Java开发环境里Lucene是一个成熟的免费开放源代码工具;就其本身而论,Lucene是现在并且是这几年,最受欢迎的免费Java信息检索程序库

Solr

Apache Solr是一个基于Lucene的Java库构建的开源搜索平台。它以用户友好的方式提供Apache Lucene的搜索功能。作为一个行业参与者近十年,它是一个成熟的产品,拥有强大而广泛的用户社区。它提供分布式索引,复制,负载平衡查询以及自动故障转移和恢复。如果它被正确部署然后管理得好,它就能够成为一个高度可靠,可扩展且容错的搜索引擎。很多互联网巨头,如Netflix,eBay,Instagram和亚马逊(CloudSearch)都使用Solr,因为它能够索引和搜索多个站点

ElasticSearch

ElasticSearch是一个开源(Apache 2许可证),是一个基于Apache Lucene库构建的RESTful搜索引擎。它是JAVA应用,ElasticSearch是在Solr之后几年推出的。它提供了一个分布式,多租户能力的全文搜索引擎,具有HTTP Web界面(REST)和无架构JSON文档。ElasticSearch的官方客户端库提供Java,Groovy,PHP,Ruby,Perl,Python,.NET和Javascript

ElasticSearch和Apache Solr底层都是用到了Lucene库,Lucene只是一个搜索框架,只有在自研搜索产品时才考虑直接使用它。而对比ElasticSearch和Apache Solr,性能上两者相差伯仲,而ES具有易安装易扩展分布式的特性,所以采用ElasticSearch作为我们服务的搜索引擎

安装ElasticSearch

利用docker启动

拉取镜像,启动容器,注意配置环境变量:单节点模式discovery.type=single-node

1
2
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.2.0
docker run -d --name es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.2.0

进入容器中,在文件config/elasticsearch.yml中加入跨域配置,这样外部请求才能服务ES,修改配置后重启容器才能生效

1
docker exec -it es /bin/bash
1
2
http.cors.enabled: true
http.cors.allow-origin: "*"

利用docker-compose启动

下面是我本地配置的单实例,多实例集群可以参考官方文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
version: '2.2'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0
container_name: elasticsearch
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "discovery.zen.ping.unicast.hosts=elasticsearch"
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./esdata:/usr/share/elasticsearch/data
- ./config:/usr/share/elasticsearch/config
- ./plugins:/usr/share/elasticsearch/plugins
ports:
- 9200:9200
- 9300:9300

通过tar包启动

image

官方下载地址下载tar包到本地,解压后,到目录下运行bin/elasticsearch

1
2
cd elasticsearch-7.2.0
bin/elasticsearch

安装中文分词器IK

ElasticSearch自带的默认分词器standard会把中文拆分,为了适配中文搜索,因此需要安装中文分词器elasticsearch-analysis-ik,这里是IK的GitHub地址,里面有详细使用说明

下载安装

image

注意ElasticSearch和ik版本要匹配,在release中下载zip,并解压到your-es-root/plugins/ik目录下即可。再启动ElasticSearch服务会看到如下输出

1
plugins [analysis-ik]

两种模式:ik_max_word、ik_smart

  • ik_max_word: 会将文本做最细粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,中华人民,中华,华人,人民共和国,人民,人,民,共和国,共和,和,国国,国歌”,会穷尽各种可能的组合,适合 Term Query

  • ik_smart: 会做最粗粒度的拆分,比如会将“中华人民共和国国歌”拆分为“中华人民共和国,国歌”,适合 Phrase 查询

自定义分词词典

elasticsearch-7.2.0/plugins/ik/config目录增加一个my.dic,词语与词语直接需要换行,注意是UTF-8编码

1
2
3
touch my.dic
echo 猪齿鱼 > my.dic
cat my.dic

修改elasticsearch-7.2.0/plugins/ik/config/IKAnalyzer.cfg.xml文件后重启即可生效

1
2
3
4
5
6
7
8
9
10
11
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">my.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典-->
<entry key="ext_stopwords"></entry>
<!--用户可以在这里配置远程扩展字典 -->
<!-- <entry key="remote_ext_dict">words_location</entry> -->
<!--用户可以在这里配置远程扩展停止词字典-->
<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

ElasticSearch基本概念

存储结构

Elasticsearch集群可以包含多个索引(indices)(数据库),每一个索引可以包含多个类型(types)(表),每一个类型包含多个文档(documents)(行),然后每个文档包含多个字段(Fields)(列),存储结构是index->type->doc(包含fields),类似于关系型数据库的数据库->表->数据(包含字段),而在ES7.0之后,官方去掉了type类型,原因大概如下,具体可以参考官方解释

我们都知道elasticsearch是基于Lucene开发的搜索引擎,而ES中不同type下名称相同的filed最终在Lucene中的处理方式是一样的。举个例子,两个不同type下的两个user_name,在ES同一个索引下其实被认为是同一个filed,你必须在两个不同的type中定义相同的filed映射。否则,不同type中的相同字段名称就会在处理中出现冲突的情况,导致Lucene处理效率下降

面向文档

Elasticsearch是面向文档(document oriented)的,这意味着它可以存储整个对象或文档(document)。然而它不仅仅是存储,还会索引(index)每个文档的内容使之可以被搜索。在Elasticsearch中,你可以对文档(而非成行成列的数据)进行索引、搜索、排序、过滤。这种理解数据的方式与以往完全不同,这也是Elasticsearch能够执行复杂的全文搜索的原因之一

查询与过滤

查询语句过滤语句非常相似,但是它们由于使用目的不同而有差异

  • 一条过滤语句会询问每个文档的字段值是否包含着特定值

  • 一条查询语句会计算每个文档与查询语句的相关性,会给出一个相关性评分_score,并且 按照相关性对匹配到的文档进行排序。 这种评分方式非常适用于一个没有完全配置结果的全文本搜索

性能差异

使用过滤语句得到的结果集:一个简单的文档列表,快速匹配运算并存入内存是十分方便的,每个文档仅需要1个字节。这些缓存的过滤结果集与后续请求的结合使用是非常高效的

查询语句不仅要查找相匹配的文档,还需要计算每个文档的相关性,所以一般来说查询语句要比 过滤语句更耗时,并且查询结果也不可缓存。幸亏有了倒排索引,一个只匹配少量文档的简单查询语句在百万级文档中的查询效率会与一条经过缓存 的过滤语句旗鼓相当,甚至略占上风。 但是一般情况下,一条经过缓存的过滤查询要远胜一条查询语句的执行效率。过滤语句的目的就是缩小匹配的文档结果集。

过滤语句

  • term:主要用于精确匹配哪些值,比如数字,日期,布尔值或 not_analyzed的字符串(未经分析的文本数据类型)
  • terms:过滤多个字段
  • range:过滤允许我们按照指定范围查找一批数据
  • exists/missing:过滤可以用于查找文档中是否包含指定字段或没有某个字段,类似于SQL语句中的IS_NULL条件
  • bool过滤:可以用来合并多个过滤条件查询结果的布尔逻辑,它包含一下操作符:
    must :: 多个查询条件的完全匹配,相当于 and。
    must_not :: 多个查询条件的相反匹配,相当于 not。
    should :: 至少有一个查询条件匹配, 相当于 or

查询语句

  • match_all:可以查询到所有文档,是没有查询条件下的默认语句
  • match:查询是一个标准查询,不管你需要全文本查询还是精确查询基本上都要用到它
  • multi_match:查询允许你做match查询的基础上同时搜索多个字段
  • bool查询:与bool过滤相似,用于合并多个查询子句。不同的是,bool 过滤可以直接给出是否匹配成功, 而bool 查询要计算每一个查询子句的 _score (相关性分值)

9200与9300端口

  • 9200端口:ES节点和外部通讯使用
  • 9300端口:ES集群内,节点之间通讯使用

match、match_phrase和match_phrase_prefix

对于同一个数据集,三者检索出来的结果集数量不一样,所以两者有什么区别呢?

  • match

对于给定的搜索关键词十年之前,match搜索会将关键词拆分为十年之前,然后用关键词进行全文检索,进行搜索评分,同时包含十年之前的频率最高的排名最前

image

  • match_phrase

match_phrase是短语搜索,亦即它会将给定的短语(phrase)当成一个完整的查询条件。当使用match_phrase进行搜索的时候,你的结果集中,所有的Document都必须包含你指定的查询词组,这有点类似关系型数据库的like搜索,既要同时包含十年之前的文档,并且还有一个属性slop,slop值设置在搜索关键字被拆分搜索时,多个关键字间隔多少个其他字符从可以被匹配。

image

  • match_phrase_prefix

同样是短语搜索,但是它允许最后一个关键词可以进行前缀模糊匹配,例如当我们搜索He like a时,会匹配到He like apple

类似like查询

1
2
3
4
MoreLikeThisQueryBuilder queryBuilder = QueryBuilders
.moreLikeThisQuery("name")// 要匹配的字段, 不填默认_all
.like("西游")// 匹配的文本
.minTermFreq(1);// 文本最少出现的次数(默认是2,我们设为1)

通配符查询

通配符查询, 支持*匹配任何字符序列, 包括空。避免*开始, 会检索大量内容造成效率缓慢,单个字符用?

1
QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("user", "ki*hy");

正则表达式匹配查询

1
QueryBuilders.regexpQuery("name", "c[a-z]{1}b?")

模糊查询

模糊查询,这个用到了莱文斯坦算法,用于计算两个字符串的相似度,如果在设置的相似度之内,那么会被命中。这里如果把相似度改为Fuzziness.ZERO,那么yb和cyb的相似度是1,所以是命中失败的,默认是Fuzziness.AUTO

1
QueryBuilders.fuzzyQuery("name", "yb").fuzziness(Fuzziness.ONE)

更多概念请查看手册

更多的基本概念,请参考ElasticSearch权威指南(中文版),注意该中文指南的版本比较旧,要看最新版ES的可以去看官方英文文档列表

同步数据:logstash-input-jdbc

要通过ElasticSearch实现数据检索,首先要将数据导入ElasticSearch,并实现数据源与ElasticSearch数据同步。这里使用的数据源是MySql数据库,目前MySql与ElasticSearch常用的同步机制大多是基于插件实现的,常用的插件包括:logstash-input-jdbcgo-mysql-elasticsearchelasticsearch-jdbc

相信大家都听说过ELK,ELK 是 elastic 公司旗下三款产品ElasticSearch、Logstash、Kibana的首字母组合,也即Elastic Stack包含ElasticSearch、Logstash、Kibana、Beats。ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用,是目前主流的一种日志系统

  • ElasticSearch

    一个基于 JSON 的分布式的搜索和分析引擎,作为 ELK 的核心,它集中存储数据,用来搜索、分析、存储日志。它是分布式的,可以横向扩容,可以自动发现,索引自动分片

  • Logstash:

    一个动态数据收集管道,支持以 TCP/UDP/HTTP 多种方式收集数据(也可以接受 Beats 传输来的数据),并对数据做进一步丰富或提取字段处理。用来采集日志,把日志解析为json格式交给ElasticSearch

  • Kibana:

    一个数据可视化组件,将收集的数据进行可视化展示(各种报表、图形化数据),并提供配置、管理 ELK 的界面

logstash-input-jdbclogstash的一个同步mysql数据到es的插件,logstash的下载地址

安装

安装logstash后通过命令安装logstash-input-jdbc插件

1
2
cd /logstash-7.2.0/bin
./logstash-plugin install logstash-input-jdbc

配置

logstash-7.2.0/config文件夹下新建jdbc.conf,输入配置属性可以参考官方文档: Jdbc input plugin,输出配置属性可以参考官方文档:Elasticsearch output plugin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 输入部分
input {
stdin {}
jdbc {
# mysql数据库驱动
jdbc_driver_library => "/Users/chenshinan/work/tools/logstash-7.2.0/config/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# mysql数据库链接,数据库名
jdbc_connection_string => "jdbc:mysql://localhost:3306/knowledgebase_service"
# mysql数据库用户名,密码
jdbc_user => "root"
jdbc_password => "root"
# 设置监听间隔 各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次
schedule => "* * * * *"
# 分页
jdbc_paging_enabled => "true"
# 分页大小
jdbc_page_size => "50000"
#clean_run => "true"
# sql语句执行文件,也可直接使用 statement => 'select * from t_employee'
statement_filepath => "/Users/chenshinan/work/tools/logstash-7.2.0/config/jdbc.sql"
# statement => 'select * from kb_page'
# elasticsearch索引类型名
# type => "xx"
}
}

# 过滤部分(不是必须项)
filter {
json {
source => "message"
remove_field => ["message"]
}
}

# 输出部分
output {
elasticsearch {
# elasticsearch索引名
index => "knowledge_page"
# 使用input中的type作为elasticsearch索引下的类型名
# document_type => "%{type}" # <- use the type from each input
# elasticsearch的ip和端口号
hosts => "localhost:9200"
# 同步mysql中数据id作为elasticsearch中文档id
document_id => "%{id}"
# 使用自定义的index模版
template_overwrite => true
# index模版的地址
template => "/Users/chenshinan/work/tools/logstash-7.2.0/template/logstash.json"
}
stdout {
codec => json_lines
}
}

:sql_last_value为上一次更新的最后时刻值,在sql中加入:sql_last_value可以用于做增量更新

1
2
3
4
5
6
7
8
select p.id,
p.title,
pc.CONTENT,
p.project_id,
p.organization_id
from kb_page p
left join kb_page_content pc on pc.VERSION_ID = p.LATEST_VERSION_ID
where p.LAST_UPDATE_DATE > :sql_last_value OR pc.LAST_UPDATE_DATE > :sql_last_value

运行

1
2
3
4
5
cd logstash-7.2.0
# 检查配置文件语法是否正确
bin/logstash -f config/jdbc.conf --config.test_and_exit
# 启动,--config.reload.automatic: 会自动重新加载配置文件内容
bin/logstash -f config/jdbc.conf --config.reload.automatic

同步数据采用自定义的index模版

索引模板,简而言之,是一种复用机制。当新建一个Elasticsearch 索引时,自动匹配模板,完成索引的基础部分搭建。属性内容可以参考初探 Elasticsearch Index Template(索引模板)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
{
"order": 0,
"version": 60001,
"index_patterns": [
"knowledge*" //匹配的index规则
],
"settings": {
"index": {
"number_of_shards": "1",
"refresh_interval": "5s",
"analysis.analyzer.default.tokenizer": "ik_smart" //自定义分词器
}
},
"mappings": {
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string"
}
},
{
"string_fields": {
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_smart", //自定义分词器
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"@version": {
"type": "keyword"
}
}
},
"aliases": {}
}

Java High Level REST Client

基于HTTP协议,以JSON为数据交互格式的RESTful API。其他所有程序语言都可以使用RESTful API,通过9200端口的与Elasticsearch进行通信,你可以使用你喜欢的WEB客户端,事实上,如你所见,你甚至可以通过curl命令与Elasticsearch通信。Elasticsearch使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。不过,Elasticsearch不仅仅是Lucene和全文搜索,我们还能这样去描述它:

分布式的实时文件存储,每个字段都被索引并可被搜索
分布式的实时分析搜索引擎
可以扩展到上百台服务器,处理PB级结构化或非结构化数据

而且,所有的这些功能被集成到一个服务里面,你的应用可以通过简单的RESTful API、各种语言的客户端甚至命令行与之交互,这里介绍一下官方推荐的Java High Level REST Client,详细参考ES官方REST-Client文档

maven依赖

1
2
3
4
5
6
7
8
9
10
11
<!--elasticsearch client-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.2.0</version>
</dependency>

若出现java.lang.NoSuchMethodError: org.elasticsearch.action.index.IndexRequest.ifSeqNo()J异常,请参考讨论,依赖版本的问题

配置客户端信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class ElasticsearchRestClientConfig {
public static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchRestClientConfig.class);
private static final int ADDRESS_LENGTH = 2;
private static final String HTTP_SCHEME = "http";

/**
* 使用冒号隔开ip和端口
*/
@Value("${elasticsearch.ip}")
String[] ipAddress;

@Bean
public RestClientBuilder restClientBuilder() {
HttpHost[] hosts = Arrays.stream(ipAddress)
.map(this::makeHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
LOGGER.info("hosts:{}", Arrays.toString(hosts));
return RestClient.builder(hosts);
}


@Bean(name = "highLevelClient")
public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}


private HttpHost makeHttpHost(String s) {
assert StringUtils.isNotEmpty(s);
String[] address = s.split(":");
if (address.length == ADDRESS_LENGTH) {
String ip = address[0];
int port = Integer.parseInt(address[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
} else {
return null;
}
}
}

判断index索引是否存在

1
2
3
4
5
6
7
8
9
10
11
public Boolean indexExist(String index) {
GetIndexRequest request;
Boolean exists = false;
try {
request = new GetIndexRequest(index);
exists = highLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (Exception e) {
LOGGER.error("elasticsearch indexExist, error:{}", e.getMessage());
}
return exists;
}

创建index索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void createIndex(String index) {
CreateIndexRequest request = new CreateIndexRequest(index);
//设置索引的settings,设置默认分词
request.settings(Settings.builder().put("analysis.analyzer.default.tokenizer", "ik_smart"));
try {
CreateIndexResponse createIndexResponse = highLevelClient.indices()
.create(request, RequestOptions.DEFAULT);
if (!createIndexResponse.isAcknowledged()) {
LOGGER.error("elasticsearch createIndex the response is acknowledged");
} else {
LOGGER.info("elasticsearch createIndex successful");
}
} catch (Exception e) {
LOGGER.error("elasticsearch createIndex, error:{}", e.getMessage());
}
}

创建/更新文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createOrUpdatePage(String index, Long id, PageSyncDTO page) {
IndexRequest request = new IndexRequest(index);
request.id(String.valueOf(id));
Map<String, Object> jsonMap = new HashMap<>(5);
jsonMap.put(BaseStage.ES_PAGE_FIELD_PAGE_ID, page.getId());
jsonMap.put(BaseStage.ES_PAGE_FIELD_TITLE, page.getTitle());
jsonMap.put(BaseStage.ES_PAGE_FIELD_CONTENT, page.getContent());
jsonMap.put(BaseStage.ES_PAGE_FIELD_PROJECT_ID, page.getProjectId());
jsonMap.put(BaseStage.ES_PAGE_FIELD_ORGANIZATION_ID, page.getOrganizationId());
request.source(jsonMap);
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.info("elasticsearch createOrUpdatePage successful, pageId:{}", id);
}
@Override
public void onFailure(Exception e) {
LOGGER.error("elasticsearch createOrUpdatePage failure, pageId:{}, error:{}", id, e.getMessage());
pageMapper.updateSyncEsByPageId(id, false);
}
};
highLevelClient.indexAsync(request, RequestOptions.DEFAULT, listener);
}

删除页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void deletePage(String index, Long id) {
DeleteRequest request = new DeleteRequest(index, String.valueOf(id));
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
LOGGER.info("elasticsearch deletePage successful, pageId:{}", id);
}
@Override
public void onFailure(Exception e) {
LOGGER.error("elasticsearch deletePage failure, pageId:{}, error:{}", id, e.getMessage());
}
};
highLevelClient.deleteAsync(request, RequestOptions.DEFAULT, listener);
}

利用BulkProcessor批量创建页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void batchCreatePage(String index, List<PageSyncDTO> pages) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOGGER.info("elasticsearch batchCreatePage {} time, starting...", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
LOGGER.info("elasticsearch batchCreatePage {} time, successful", request.numberOfActions());
pageMapper.updateSyncEs();
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
LOGGER.error("elasticsearch batchCreatePage {} time, error:{}", request.numberOfActions(), failure.getMessage());
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
highLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
for (PageSyncDTO page : pages) {
Map<String, Object> jsonMap = new HashMap<>(5);
jsonMap.put(BaseStage.ES_PAGE_FIELD_PAGE_ID, page.getId());
jsonMap.put(BaseStage.ES_PAGE_FIELD_TITLE, page.getTitle());
jsonMap.put(BaseStage.ES_PAGE_FIELD_CONTENT, page.getContent());
jsonMap.put(BaseStage.ES_PAGE_FIELD_PROJECT_ID, page.getProjectId());
jsonMap.put(BaseStage.ES_PAGE_FIELD_ORGANIZATION_ID, page.getOrganizationId());
IndexRequest request = new IndexRequest(index).id(String.valueOf(page.getId()))
.source(jsonMap);
bulkProcessor.add(request);
}
}

全文检索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public List<FullTextSearchResultDTO> fullTextSearch(Long organizationId, Long projectId, String searchStr) {
List<FullTextSearchResultDTO> results = new ArrayList<>();
SearchRequest searchRequest = new SearchRequest(BaseStage.ES_PAGE_INDEX);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolBuilder = new BoolQueryBuilder();
if (organizationId != null) {
boolBuilder.filter(new TermQueryBuilder(BaseStage.ES_PAGE_FIELD_ORGANIZATION_ID, String.valueOf(organizationId)));
}
if (projectId != null) {
boolBuilder.filter(new TermQueryBuilder(BaseStage.ES_PAGE_FIELD_PROJECT_ID, String.valueOf(projectId)));
} else {
boolBuilder.mustNot(QueryBuilders.existsQuery(BaseStage.ES_PAGE_FIELD_PROJECT_ID));
}
boolBuilder.must(QueryBuilders.boolQuery().should(QueryBuilders.matchQuery(BaseStage.ES_PAGE_FIELD_TITLE, searchStr))
.should(QueryBuilders.matchQuery(BaseStage.ES_PAGE_FIELD_CONTENT, searchStr)));
//.should(QueryBuilders.matchPhraseQuery(BaseStage.ES_PAGE_FIELD_CONTENT, searchStr)));
//.should(QueryBuilders.matchPhrasePrefixQuery(BaseStage.ES_PAGE_FIELD_CONTENT, searchStr)));
sourceBuilder.query(boolBuilder);
// 设置分页查询
sourceBuilder.from(0);
sourceBuilder.size(20);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
// 高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.requireFieldMatch(true).field(BaseStage.ES_PAGE_FIELD_TITLE).field(BaseStage.ES_PAGE_FIELD_CONTENT)
.preTags("<span style=\"color:#F44336\" >").postTags("</span>")
.fragmentSize(50)
.noMatchSize(50);
sourceBuilder.highlighter(highlightBuilder);
searchRequest.source(sourceBuilder);
SearchResponse response;
try {
response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
Arrays.stream(response.getHits().getHits())
.forEach(hit -> {
Map<String, Object> map = hit.getSourceAsMap();
Object proIdObj = map.get(BaseStage.ES_PAGE_FIELD_PROJECT_ID);
Object orgIdObj = map.get(BaseStage.ES_PAGE_FIELD_ORGANIZATION_ID);
Object titleObj = map.get(BaseStage.ES_PAGE_FIELD_TITLE);
Long pageId = Long.parseLong(hit.getId());
Long esProjectId = proIdObj != null ? Long.parseLong(String.valueOf(proIdObj)) : null;
Long esOrganizationId = orgIdObj != null ? Long.parseLong(String.valueOf(orgIdObj)) : null;
String title = titleObj != null ? String.valueOf(titleObj) : "";
FullTextSearchResultDTO resultDTO = new FullTextSearchResultDTO(pageId, title, null, esProjectId, esOrganizationId);
//设置评分
resultDTO.setScore(hit.getScore());
//取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get(BaseStage.ES_PAGE_FIELD_CONTENT);
if (highlight != null) {
Text[] fragments = highlight.fragments();
if (fragments != null) {
String fragmentString = fragments[0].string();
resultDTO.setHighlightContent(fragmentString);
} else {
resultDTO.setHighlightContent("");
}
} else {
resultDTO.setHighlightContent("");
}
results.add(resultDTO);
});
LOGGER.info("全文搜索结果:组织ID:{},项目ID:{},命中{},搜索内容:{}", organizationId, projectId, response.getHits().getTotalHits(), searchStr);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return results;
}

对于搜索结果中对单篇文档的内容高亮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public String searchById(Long organizationId, Long projectId, String index, Long id, String searchStr, Integer contentLength) {
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolBuilder = new BoolQueryBuilder();
if (organizationId != null) {
boolBuilder.filter(new TermQueryBuilder(BaseStage.ES_PAGE_FIELD_ORGANIZATION_ID, String.valueOf(organizationId)));
}
if (projectId != null) {
boolBuilder.filter(new TermQueryBuilder(BaseStage.ES_PAGE_FIELD_PROJECT_ID, String.valueOf(projectId)));
} else {
boolBuilder.mustNot(QueryBuilders.existsQuery(BaseStage.ES_PAGE_FIELD_PROJECT_ID));
}
boolBuilder.filter(QueryBuilders.termQuery(BaseStage.ES_PAGE_FIELD_PAGE_ID, String.valueOf(id)));
boolBuilder.must(QueryBuilders.boolQuery().should(QueryBuilders.matchQuery(BaseStage.ES_PAGE_FIELD_TITLE, searchStr))
.should(QueryBuilders.matchQuery(BaseStage.ES_PAGE_FIELD_CONTENT, searchStr)));
sourceBuilder.query(boolBuilder);
sourceBuilder.from(0);
sourceBuilder.size(1);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
// 高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.requireFieldMatch(true).field(BaseStage.ES_PAGE_FIELD_TITLE).field(BaseStage.ES_PAGE_FIELD_CONTENT)
.preTags("<span style=\"color:#F44336\" >").postTags("</span>")
.numOfFragments(1)
//拉取出全文内容
.fragmentSize(contentLength)
.noMatchSize(contentLength);
sourceBuilder.highlighter(highlightBuilder);
searchRequest.source(sourceBuilder);
String fragmentString = null;
SearchResponse response;
try {
response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit hit : response.getHits().getHits()) {
//取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get(BaseStage.ES_PAGE_FIELD_CONTENT);
if (highlight != null) {
Text[] fragments = highlight.fragments();
if (fragments != null) {
fragmentString = fragments[0].string();
}
}
}
LOGGER.info("单篇文档关键词匹配:组织ID:{},项目ID:{},文章ID:{},命中{},搜索内容:{}", organizationId, projectId, id, response.getHits().getTotalHits(), searchStr);
} catch (Exception e) {
LOGGER.error("elasticsearch searchById failure, pageId:{}, error:{}", id, e.getMessage());
}
return fragmentString;
}

通过滚动游标查询出所有数据

参考官方文档Search Scroll API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void searchAll(String index, String searchStr) {
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchQuery("content", searchStr));
searchSourceBuilder.size(10);
searchRequest.source(searchSourceBuilder);
SearchResponse response;
try {
response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = response.getScrollId();
SearchHit[] searchHits = response.getHits().getHits();
List<SearchHit> hits = new ArrayList<>();
hits.addAll(Arrays.asList(searchHits));
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
response = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
searchHits = response.getHits().getHits();
hits.addAll(Arrays.asList(searchHits));
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = highLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();
System.out.println(succeeded+":"+hits.size());
} catch (IOException e) {
e.printStackTrace();
}
}

参考文献