如何实现elasticsearch导入mysql数据
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearc
千家信息网最后更新 2025年12月03日如何实现elasticsearch导入mysql数据
这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearch导入mysql数据"吧!
一、基于elasticsearch的官方API批量导入
引入maven依赖
org.springframework.boot spring-boot-starter-parent 2.3.2.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-data-elasticsearch mysql mysql-connector-java runtime
jdbc连接类
public class DBHelper { public static final String url = "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "root"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password);//获取连接 } catch (Exception e) { e.printStackTrace(); } return conn; }}导入逻辑
@Service("positionService")public class PositionService { @Autowired ElasticsearchRestTemplate elasticsearchTemplate; @Autowired RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); } /** 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); System.out.println("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList> dataList = new ArrayList>(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { System.out.println("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } System.out.println("-------------------------- Finally insert number total: " + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 bulkProcessor.flush(); } catch (Exception e) { System.out.println(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); System.out.println(terminatedFlag); } catch (Exception e) { System.out.println(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId); } }; BiConsumer> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { System.out.println(e1.getMessage()); } } return bulkProcessor; }} 调用入口
@RestControllerpublic class PositionController { @Autowired PositionService positionService; @RequestMapping("query") public List导入的数据表
public class Position implements Serializable { //主键 private String id; //公司名称 private String companyName; //职位名称 private String positionName; //职位诱惑 private String positionAdvantage; //薪资 private String salary; //薪资下限 private int salaryMin; //薪资上限 private int salaryMax; //学历 private String education; //工作年限 private String workYear; //发布时间 private String publishTime; //工作城市 private String city; //工作地点 private String workAddress; //发布时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; //工作模式 private String jobNature;}二、基于logstash导入
前提:安装好logstash
import.conf
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "1000" statement_filepath => "D:/import.sql" }} filter { json { source => "message" remove_field => ["message"] }} output { elasticsearch { hosts => ["localhost:9200"] index => "position" document_type => "_doc" } stdout { codec => json_lines }}import.sql
select * from position
启动logstash
logstash -f ../import.conf
感谢各位的阅读,以上就是"如何实现elasticsearch导入mysql数据"的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
工作
方式
时间
薪资
学习
内容
名称
职位
支持
万条
上限
下限
入口
公司
前提
参数
取决于
地点
城市
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
停车场管理服务器配置
软件开发投标书
三级数据库技术没有编程题
阜阳手机软件开发定制公司
网络安全徽章图片
预约系统的数据库设计
服务器生存第七集
查看数据库的归档
科技互联网板块
网络安全宣传周准备活动
西南高防服务器
病毒数据库哪家工艺好
浙江诺友网络技术有限公司
网络安全的信息点是什么
宜兴加工软件开发技术参数
企业服务器erp
计算机网络技术第4版徐立新
软件开发者平台交流
c 两个串口同时收数据库
网络安全人才培养顶层设计
数据库管理系统比excel
nba2k22港服服务器
饥荒联机版怎么快速搜索服务器
阿里云服务器租用配置
云数据库mysql存储介质是
英语专业可以干软件开发吗
终端安全是网络安全
网线管理ipm服务器
阿克苏联想服务器维保公司
服务器出错_1是什么意思