千家信息网

Elasticsearch reindex及Java使用sliceScorll优化查询的方法

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"Elasticsearch reindex及Java使用sliceScorll优化查询的方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究
千家信息网最后更新 2025年12月02日Elasticsearch reindex及Java使用sliceScorll优化查询的方法

这篇文章主要讲解了"Elasticsearch reindex及Java使用sliceScorll优化查询的方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Elasticsearch reindex及Java使用sliceScorll优化查询的方法"吧!

Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。

简单实例如下

POST _reindex{  "source": {    "remote": {      "host": "http://otherhost:9200", // 远程es的ip和port列表      "socket_timeout": "1m",      "connect_timeout": "10s"  // 超时时间设置    },    "index": "my_index_name", // 源索引名称    "query": {         // 满足条件的数据      "match": {        "test": "data"      }    }  },  "dest": {    "index": "dest_index_name"  // 目标索引名称  }}

具体详细的使用参考

ElasticSearch 6.3版本 Document APIs之Reindex API

elasticsearch 基础 -- ReIndex

在java中对于reindexapi没有找到,于是作者采用了别名转换和全Index查询加上bulk插入的方式对于索引进行迁移。

但是转移数据实在太慢,所以使用了slice对scorll查询进行优化

Java slice scorll reindex 基于5.6版本

多线程reindex

具体开启线程数根据Index分片数进行调整,最好和主分片数相同,本例子为五个分片,同时还使用了别名转换对索引进行无缝衔接避免数据正常插入读取

 //建新索引        createUserRecordIndex(newIndexName, typeName);        //筛选时间        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();        RangeQueryBuilder rangeQueryBuilder;        rangeQueryBuilder = QueryBuilders.rangeQuery("createTime")                .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT))                .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT));        boolQueryBuilder.must(rangeQueryBuilder);        try {                   //多线程处理查询请求           List list = new ArrayList<>();            for (int i = 0; i < 5; i++) {                SliceBuilder sliceBuilder = new SliceBuilder(i, 5);                SearchResponse response = EsBuildersServiceUtil.getESClient()                        .prepareSearch(userRecordAlias)                        .setTypes(userRecordType)                        .setQuery(boolQueryBuilder)                        .setSize(1000).setScroll(new TimeValue(10000))                        .slice(sliceBuilder)                        .execute()                        .actionGet();                SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response);                Future submit = threadPoolTaskExecutor.submit(sliceQuery);                list.add(submit);            }            for (Future future : list) {                future.get();            }        } catch (Exception e) {            log.error("reindex error =", e);            throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR);        }        try {            //别名转换            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet();            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet();        } catch (Exception e) {            log.error(" convertAlias error =", e);            throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR);        }

slice线程

class SliceQuery implements Callable {        private String newIndexName;        private String typeName;        private SearchResponse response;        private SliceQuery(String newIndexName, String typeName, SearchResponse response) {            this.newIndexName = newIndexName;            this.typeName = typeName;            this.response = response;        }        @Override        public Void call() {            //获取总数量            long totalCount = response.getHits().getTotalHits();            //计算总次数,每次搜索数量为分片数*设置的size大小            int page = (int) totalCount / 1000;            operateRecordList(response, newIndexName, typeName);            for (int i = 0; i < page; i++) {                //再次发送请求,并使用上次搜索结果的ScrollId                response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId())                        .setScroll(new TimeValue(10000)).execute()                        .actionGet();                operateRecordList(response, newIndexName, typeName);            }            return null;        }    }

批量插入

  /**     * 从查询数据中获取并批量插入Index     *     * @param response     * @param indexName     * @param typeName     */    private void operateRecordList(SearchResponse response, String indexName, String typeName) {        try {            SearchHits hits = response.getHits();            List list = new ArrayList<>();            for (SearchHit hit : hits) {                String sourceAsString = hit.getSourceAsString();                list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class));            }            //批量插入            saveBulkRecord(list, indexName, typeName);        } catch (Exception e) {            log.error("operateRecordList error =", e);            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);        }    }    /**     * 批量插入     *     * @param list     * @param indexName     * @param typeName     */    private void saveBulkRecord(List list, String indexName, String typeName) {        try {            BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk();            for (AddUserRecordRequest recordRequest : list) {                JSONObject json = JSONObject.fromObject(recordRequest);                bulkRequest.add(EsBuildersServiceUtil.getESClient()                        .prepareIndex(indexName, typeName)                        .setSource(json));            }            if (list.size() > 0) {                bulkRequest.execute().actionGet();            }        } catch (Exception e) {            log.error("saveBulkRecord error =", e);            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);        }    }

感谢各位的阅读,以上就是"Elasticsearch reindex及Java使用sliceScorll优化查询的方法"的内容了,经过本文的学习后,相信大家对Elasticsearch reindex及Java使用sliceScorll优化查询的方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0