Elasticsearch reindex及Java使用sliceScorll优化查询的方法
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"Elasticsearch reindex及Java使用sliceScorll优化查询的方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究
千家信息网最后更新 2025年12月02日Elasticsearch reindex及Java使用sliceScorll优化查询的方法
这篇文章主要讲解了"Elasticsearch reindex及Java使用sliceScorll优化查询的方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Elasticsearch reindex及Java使用sliceScorll优化查询的方法"吧!
Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
简单实例如下
POST _reindex{ "source": { "remote": { "host": "http://otherhost:9200", // 远程es的ip和port列表 "socket_timeout": "1m", "connect_timeout": "10s" // 超时时间设置 }, "index": "my_index_name", // 源索引名称 "query": { // 满足条件的数据 "match": { "test": "data" } } }, "dest": { "index": "dest_index_name" // 目标索引名称 }}具体详细的使用参考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基础 -- ReIndex
在java中对于reindexapi没有找到,于是作者采用了别名转换和全Index查询加上bulk插入的方式对于索引进行迁移。
但是转移数据实在太慢,所以使用了slice对scorll查询进行优化
Java slice scorll reindex 基于5.6版本
多线程reindex
具体开启线程数根据Index分片数进行调整,最好和主分片数相同,本例子为五个分片,同时还使用了别名转换对索引进行无缝衔接避免数据正常插入读取
//建新索引 createUserRecordIndex(newIndexName, typeName); //筛选时间 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); RangeQueryBuilder rangeQueryBuilder; rangeQueryBuilder = QueryBuilders.rangeQuery("createTime") .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)) .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT)); boolQueryBuilder.must(rangeQueryBuilder); try { //多线程处理查询请求 List list = new ArrayList<>(); for (int i = 0; i < 5; i++) { SliceBuilder sliceBuilder = new SliceBuilder(i, 5); SearchResponse response = EsBuildersServiceUtil.getESClient() .prepareSearch(userRecordAlias) .setTypes(userRecordType) .setQuery(boolQueryBuilder) .setSize(1000).setScroll(new TimeValue(10000)) .slice(sliceBuilder) .execute() .actionGet(); SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response); Future submit = threadPoolTaskExecutor.submit(sliceQuery); list.add(submit); } for (Future future : list) { future.get(); } } catch (Exception e) { log.error("reindex error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR); } try { //别名转换 EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet(); EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet(); } catch (Exception e) { log.error(" convertAlias error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR); } slice线程
class SliceQuery implements Callable { private String newIndexName; private String typeName; private SearchResponse response; private SliceQuery(String newIndexName, String typeName, SearchResponse response) { this.newIndexName = newIndexName; this.typeName = typeName; this.response = response; } @Override public Void call() { //获取总数量 long totalCount = response.getHits().getTotalHits(); //计算总次数,每次搜索数量为分片数*设置的size大小 int page = (int) totalCount / 1000; operateRecordList(response, newIndexName, typeName); for (int i = 0; i < page; i++) { //再次发送请求,并使用上次搜索结果的ScrollId response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(10000)).execute() .actionGet(); operateRecordList(response, newIndexName, typeName); } return null; } }批量插入
/** * 从查询数据中获取并批量插入Index * * @param response * @param indexName * @param typeName */ private void operateRecordList(SearchResponse response, String indexName, String typeName) { try { SearchHits hits = response.getHits(); List list = new ArrayList<>(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class)); } //批量插入 saveBulkRecord(list, indexName, typeName); } catch (Exception e) { log.error("operateRecordList error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } /** * 批量插入 * * @param list * @param indexName * @param typeName */ private void saveBulkRecord(List list, String indexName, String typeName) { try { BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk(); for (AddUserRecordRequest recordRequest : list) { JSONObject json = JSONObject.fromObject(recordRequest); bulkRequest.add(EsBuildersServiceUtil.getESClient() .prepareIndex(indexName, typeName) .setSource(json)); } if (list.size() > 0) { bulkRequest.execute().actionGet(); } } catch (Exception e) { log.error("saveBulkRecord error =", e); throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR); } } 感谢各位的阅读,以上就是"Elasticsearch reindex及Java使用sliceScorll优化查询的方法"的内容了,经过本文的学习后,相信大家对Elasticsearch reindex及Java使用sliceScorll优化查询的方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
查询
索引
数据
方法
线程
别名
学习
内容
名称
数量
时间
版本
搜索
相同
作者
例子
信息
再次
副本
同时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
魔兽影之哀伤服务器
组织机构数据库设计
dhcp实体服务器
厦门龙脉网络技术有限公司
马云推荐的软件开发书籍
数据库怎么导入系统里
怎样提高访问内部服务器的带宽
服务好的app软件开发
安卓服务器怎么发验证码
免视频软件开发
上海质量软件开发推广
济南微网互联网络科技有限公司
2020年网络安全任务
软件开发外包项目不付款
网络安全word图标
神雕侠侣2几个区一个服务器
网络安全证书删了会怎么样
h2数据库无法打开
set协议是网络安全吗
服务器安全狗 二合一
河南网络安全宣传启动仪式
军队如何守好网络安全防线
厦门龙脉网络技术有限公司
丰台区网络安全培训
sql数据库个人版
当前网络安全防范的措施
软件开发述职报告怎么写
软件开发系统流程
网络安全的治理机构是什么
最强蜗牛服务器排行榜