Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。使用Tb
千家信息网最后更新 2025年12月02日Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的
这期内容当中小编将会给大家带来有关Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
使用Tbale&SQL与Flink Kafka连接器将数据写入kafka的消息队列
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/** * @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入kafka的消息队列 */public class InsertToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html format:用于反序列化和序列化Kafka消息的格式。支持的格式包括'csv','json','avro','debezium-json'和'canal-json'。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); //时间格式处理,参考阿里文档 //https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))"; // 第一种方式:直接执行sql// TableResult tableResult = tEnv.executeSql(sql); //第二种方式:声明一个操作集合来执行sql StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql(sql); TableResult tableResult = stmtSet.execute(); tableResult.print(); }}打印结果
+-------------------------------------------+| default_catalog.default_database.my_users |+-------------------------------------------+| -1 |+-------------------------------------------+1 row in set
上述就是小编为大家分享的Flink中TableAPI 、 SQL 与 Kafka消息插入是怎么实现的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
消息
数据
示例
方式
格式
内容
序列
时间
模块
环境
连接器
队列
分析
参考
专业
中小
内容丰富
官方
就是
数据源
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
天津九宫格网络技术有限公司
大学网络技术宣传
中国网络安全监测公司
网络安全工程师考试锦鲤
西安11月校招软件开发信息
gprs模块服务器
国家宣讲网络安全知识
检索效率低的数据库
网络安全法开始湿湿的时间
贵州省2021年网络安全
人间地狱进服务器后闪退
管家婆辉煌v7.2用什么数据库
宜兴辅助软件开发
初学者怎么提高网络技术
怎么将数据库字段设计为自增长
软件开发管培生是做啥的
财务软件开发者需要懂财务吗
网络安全主题硬笔书法作品
郑州数据库培训哪里有
海康nvr存储服务器
阿里云 服务器 密码
供电公司网络安全防护措施
广东省服务器托管公司云主机
写网络安全手抄报文字
网络安全综合防控体系建设方案
网络与信息和软件开发哪个好
软件开发项目政府技术评审
dbc数据库安装错误
服务器身份
CRPS服务器电源品牌