flink 从mysql 读取数据 放入kafka中 用于搜索全量
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,接着上一篇,将mysql的数据导入kafka中public static void main(String[] arg) throws Exception { TypeInformati
千家信息网最后更新 2025年12月02日flink 从mysql 读取数据 放入kafka中 用于搜索全量
接着上一篇,将mysql的数据导入kafka中
public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "name", "address" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablespace?characterEncoding=utf8") .setUsername("user").setPassword("root") .setQuery("select LOGIC_CODE, SHARE_LOG_CODE from table").setRowTypeInfo(rowTypeInfo).finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource s = env.createInput(jdbcInputFormat); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); Table tapiResult = tableEnv.scan("t2"); System.out.println("schema is:"); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select name, address from t2"); DataSet ds= tableEnv.toDataSet(query, Result.class); DataSet temp=ds.map(new MapFunction() { @Override public String map(Result result) throws Exception { String name = result.name; String value = result.address; return name+":->:"+value; } }); logger.info("read db end"); KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat() .setBootstrapServers("ip:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000") .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish(); temp.output(kafkaOutput); logger.info("write kafka end"); env.execute("Flink add data source"); }
数据
上一
搜索
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
法国关于网络安全的法律法规
数据库复杂表如何创建
6700k可以做服务器cpu吗
服务器安全策略密码过期
亚当模块数据库和电脑怎么连
代理服务器通讯设置失败
传输网络技术otn
学习通考试插件服务器超时
江西综合软件开发市价
日立服务器怎么查故障代码
网络安全 政务服务
分类(数据库术语)
万方数据库服务功能
数据库泵导入数据
四川程序软件开发费用是多少
手机网络错误无法连接服务器
dell服务器硬件管理
网络安全从哪学
安卓记事本软件开发
我国现行的网络安全
网络安全管理台账6
宜章安卓软件开发学校
国家网络安全投入资金
海南企业软件开发市价
崇明区一站式软件开发联系方式
网络安全课进校园征文手写
平板全屋定制软件开发
金华专技信息与网络安全考题
云主机服务器服务商
网络安全法律法规宣传活动总结