Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。使用
千家信息网最后更新 2025年12月02日Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取
这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;/** * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据 */public class SelectToKafka { /** 官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html 开始偏移位置 config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是: group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。 earliest-offset:从最早的偏移量开始。 latest-offset:从最新的偏移量开始。 timestamp:从每个分区的用户提供的时间戳开始。 specific-offsets:从每个分区的用户提供的特定偏移量开始。 默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费 一致性保证 sink.semantic选项来选择三种不同的操作模式: NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。 AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。 EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注册kafka数据维表 tEnv.executeSql(table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery(sql); //打印字段结构 table.printSchema(); //table 转成 dataStream 流 DataStream behaviorStream = tEnv.toAppendStream(table, Row.class); behaviorStream.print(); env.execute(); }}
打印结果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3)3> 1,1,normal,2021-01-26T10:25:44
关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
数据
偏移
消息
示例
事务
内容
文章
时间
更多
模块
模式
环境
用户
知识
篇文章
经纪人
连接器
队列
保证
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
互联网及其网络安全
定制软件开发开发公司
检验记录数据库
问道清理角色物品数据库
判断网站数据库
sql堆叠注入数据库
公司数据库坏了怎么办
ifix历史服务器停止工作
mysql数据库卡发
网络安全开始施行的时间是
无锡进口刀片服务器哪家好
电子商务数据库课件
传世数据库功能注释
请帮忙连接服务器
外国网络安全人才培养
上海镇江用友u9软件开发
安卓部署大数据库
服务器硬盘插在电脑上不显示
知网数据库以外
软件开发工程师一般薪资
合肥学校网络安全
网络安全代理公司
中职学校计算机网络技术
网络安全对孩子有什么危害
数据库的基本命令有哪些
软件开发用例
网络技术投资成本构成
要玩娱乐网络技术有限公司
中间软件开发工程师好考吗
比威网络技术有限公司王华