ElasticSearch高阶实战(一)实时增量重建索引

这是我参与8月更文挑战的第6天,活动详情查看:8月更文挑战

前言

ElasticSearch对于词库管理以及重建索引方面功能并不是很完善,更新分词并不会重建文档的倒排索引,常规方法是对整个index进行全量数据删除后再全量重新插入触发重建倒排索引。本文,将给出一个方案,从词库的管理到重建文档的倒排索引,实现词库管理可视化,增量、低延迟重建倒排索引。

背景

我们在做中文搜索的时候,往往都会采用ElasticSearch作为搜索引擎。而ElasticSearch对中文内容搜索很不友好,需要额外安装IK中文分词器插件。IK中文分词器会有基础词库,但是由于业务的专业性,领域性,会造出一些新词出来。比如前几章说的“万元保”,以及网络新词“不明觉厉”等等。如果我们想把这些词添加到词库中,就需要配置IK的扩展词库。IK的扩展词库可以配置一个文件路径也可以是一个远程cdn地址,为了我们以后修改词方便,采用了远程cdn地址。

可视化词库管理

image.png
想要对词库进行增删改查,直接操作文件是不可行的。这里引入mongoDB,所有的增删改查都是针对mongoDB数据,只需要实现mongoDB与cdn数据的同步即可。功能简单不多赘述。
1、查mongo获取所有词,去重。
2、转成字符串行
3、上传、覆盖cdn文件
ElasticSearch会在一分钟内生效所有的词。

重建倒排索引

词库新增或者删除词,要及时去更新倒排索引,否则会导致搜索词和倒排索引不一致,进而导致搜不到结果等严重问题。针对大量词更新就需要全量更新倒排索引,一个两个词的更新则需要增量更新倒排索引提高效率。

全量更新倒排索引

image.png
上图是整体架构。搜索服务并不会去直接请求es上真实的index_1,而是请求alias别名。通过修改index和alias的对应关系,从而实现无感知切换index。通过reindex指令把index_1上的数据全量同步到index_2上,reindex的时候词典已经更新,所以index_2的倒排索引是根据新词典生成的。
1、查询别名(alias)下的index(index_1)
2、数据迁移 index_1 —reindex—> index_2
3、在index_2上新建别名,删除index_1上的别名
核心源码

public Boolean rebuildIndex() {
    //创建iyb-search-product-spare(备用库)
    createDefaultIndex(INDEX_SPARE, TYPE);
    //查询别名下的index
    JSONObject indexAliases = getIndexAliases(INDEX_SPARE);
    JSONObject aliases = indexAliases.getJSONObject(INDEX_SPARE).getJSONObject("aliases").getJSONObject(ALIAS);
    if (Objects.isNull(aliases)) {
        JSONObject indexAliases2 = getIndexAliases(INDEX);
        String aliases2 = indexAliases2.getJSONObject(INDEX).getJSONObject("aliases").getString(ALIAS);
        if (Objects.isNull(aliases2)) {
            //都没别名则给iyb-search-product添加
            return addAliases(INDEX, ALIAS);
        } else {
            //当前别名在iyb-search-product上
            //数据迁移 INDEX -> INDEX_SPARE
            reindex(INDEX, INDEX_SPARE);
            //在INDEX_SPARE上新建别名,添加成功,删除INDEX上的别名
            return addAliases(INDEX_SPARE, ALIAS) ? removeAliases(INDEX, ALIAS) : false;
        }
    } else {
        //当前别名在iyb-search-product-spare上
        reindex(INDEX_SPARE, INDEX);
        //在INDEX_SPARE上新建别名,添加成功,删除INDEX上的别名
        return addAliases(INDEX, ALIAS) ? removeAliases(INDEX_SPARE, ALIAS) : false;
    }
}
复制代码

增量重建倒排索引

大部分情况下,可能几天才会去变更一个词,而这个词可能只会出现在少部分文档里。这种情况我们还是采用全量模式的,往往造成了性能浪费。针对这种情况,需要实现一套增量重建倒排索引功能。

步骤分析

要实现增量,第一步就是需要知道这次增量有哪些词。上文介绍了,词我们用mongoDB进行了管理,只需要加一个变更状态(changeStatus)字段,每次新增、删除、修改都将changeStatus置为true,便可统计出增量的词了。
第二步,需要知道哪些文档用到了变更词。这里使用ES的multi_match+ik_max_word来实现(也可以使用regexp)。
第三步,增量更新。采用ES的_update_by_query,根据第二步的查询条件用_update_by_query,将会把查出来的结果去更新倒排索引。
第四部,索引重建后,将changeStatus置为false,完成本次增量更新索引。
核心源码(ES客户端用的是jest)

public Boolean updateAnalyzer() {
    //判断是否有词变更
    Long changeWordSetCount = getChangeWordSetCount();
    if (changeWordSetCount > 0) {
        //es分词字典更新
        Criteria criteria = Criteria.where(BaseField.IS_DELETED).is(BaseEnum.NO.getStatus());
        List<SearchTagDO> searchTagDOList = searchTagDao.queryByCondition(criteria);
        Set<String> tagNameSet = searchTagDOList.stream().map(SearchTagDO::getTagName).collect(Collectors.toSet());
        StringBuilder content = new StringBuilder();
        tagNameSet.forEach(s -> content.append(s).append("\n"));
        OssTemplate.uploadFile(ES_IK_FILE, content.toString());
        //延迟一分钟,确保es集群ik词典全部执行完毕
        try {
            Thread.sleep(1000 * 60);
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
        //获取变更词
        Set<String> changeWordSet = getChangeWordSet();
        //执行增量重建索引
        QueryChain boolChain = QueryChain.defaultQueryChain();
        if (CollectionUtils.isNotEmpty(changeWordSet)) {
            changeWordSet.forEach(keyStr -> boolChain.or(com.baoyun.iyb.search.elastic.query.QueryChain.multiMatch(keyStr, PRODUCT_NAME, COMPANY, TYPE_NAMES, TAG_TYPE_NAMES, TAG_NAMES)));
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolChain.getQueryBuilder());
        UpdateByQuery updateByQuery = new UpdateByQuery.Builder(searchSourceBuilder.toString()).addIndex(ALIAS).addType(TYPE).build();
        JsonObject re = (UpdateByQueryResult) execute(updateByQuery).getJsonObject();
        log.info(String.format("本次更新索引结果。需更新%s条结果,成功更新%s条结果", re.get("total").getAsLong(), re.get("updated").getAsLong()));
        //变更状态置为false
        deleteChangeWordSet();
    }
    return true;
}
复制代码

总结与思考

像阿里的opensearch就自带这些功能,你可以把本文看作成ES实现版。其实大部分opensearch都可以用ES来实现。本文也只是简单实现,提供个大体思路和方案。对大数据量的index没做过多的处理,欢迎评论一起讨论。

思考

ES的IK分词是不具有隔离性的,也就是说一套ES集群下,只有一套词库,作用所有index上。但是有些情况下,由于业务关系,index之间的关联度不高,甚至可以说毫无关系。也就会导致有些词是不希望作用在某些index上的。那怎么实现像opensearch那样可以维护多词库实现隔离呢?欢迎评论一起讨论。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享