flink sql-clent MATCH_RECOGNIZE kafka 例子
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,环境 flink1.7.2增加flink1.7.2 的lib 中的jar, 否则会报类找不到avro-1.8.2.jar flink-connector-kafka-0.10_2
千家信息网最后更新 2025年12月02日flink sql-clent MATCH_RECOGNIZE kafka 例子
环境 flink1.7.2
增加flink1.7.2 的lib 中的jar, 否则会报类找不到
avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jarflink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1.7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jarflink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12-1.7.15.jar- 修改 sql-client-defaults.yaml 中的table 值
tables: - name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties: - key: bootstrap.servers value: kafkaip:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)" schema: - name: sessionId type: STRING - name: fromUid type: STRING - name: toUid type: STRING - name: chatType type: STRING - name: type type: STRING - name: msgId type: STRING - name: msg type: STRING - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60" - name: procTime type: TIMESTAMP proctime: true- 运行
./bin/sql-client.sh embedded select * from myTable;然后使用 MATCH_RECOGNIZE 的sql
SELECT * FROM myTable MATCH_RECOGNIZE ( PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST(e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (e1 e2) DEFINE e1 as e1.type = 'yonghu', e2 as e2.type = 'guanjia' );上面是使用sql-client 不用谢代码,当然也可以写代码,下面是对应的程序
public static void main(String[] arg) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.connect(new Kafka() .version("0.11") .topic("im-message-topic3") //.property("zookeeper.connect","") .property("bootstrap.servers","kafkaip:9092") .startFromEarliest() .sinkPartitionerRoundRobin()//Flink分区随机映射到kafka分区 ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("sessionId", Types.STRING).from("sessionId") .field("fromUid", Types.STRING).from("fromUid") .field("toUid", Types.STRING).from("toUid") .field("chatType", Types.STRING).from("chatType") .field("type", Types.STRING).from("type") .field("msgId", Types.STRING).from("msgId") .field("msg", Types.STRING).from("msg")// .field("timestampSend", Types.SQL_TIMESTAMP) .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("timestampSend") .watermarksPeriodicBounded(1000) ) .field("proctime", Types.SQL_TIMESTAMP).proctime() ).inAppendMode().registerTableSource("myTable"); Table tb2 = tableEnv.sqlQuery( "SELECT " + "answerTime, customer_event_time, empUid, noreply_counts, total_talk " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.rowtime as answerTime, "+ "LAST(e1.rowtime) as customer_event_time, " + "e2.fromUid as empUid, " + "1 as noreply_counts, " + "e1.rowtime as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1 e2) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream =tableEnv.toAppendStream(tb2, Row.class); System.out.println("schema is:"); tb2.printSchema(); appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE); logger.info("stream end"); Table tb3 = tableEnv.sqlQuery("select sessionId, type from myTable"); DataStream temp =tableEnv.toAppendStream(tb3, Row.class); tb3.printSchema(); temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE); env.execute("msg test"); }
大功告成,其实里面坑很多。
注意:如果使用了 TimeCharacteristic.EventTime, 请不用再使用procTime。
不用
代码
大功告成
大功
环境
程序
会报
运行
例子
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
焦作网络技术参数
移动光猫服务器可以选择吗
江西网络安全周开幕式
网络安全应届生面试题
测试服务器在哪里设置
网络安全研究的主要目标是
陕西自由互联网科技有限公司
数据库防篡改监控
mysql 数据库监控
set协议是网络安全吗
魔龙之戒服务器
软件开发版本控制有哪些
网络安全宣传视频小学生舞蹈
ip中国代理服务器
易企秀的怎么收集数据库
网络安全与防范ppt课件
杭州源锦网络技术有限公司
切实强化网络安全保障
我的世界有名的服务器
校园网络安全厂家
怎样修改网络安全密匙
为什么网络安全做不到位
shp加载到现有数据库
湖南pdu服务器电源专卖店
技术手段避免数据库数据泄密
邯郸计算机应用软件开发哪家实惠
网络技术有限公司有什么职位
银发网络安全
思科网络安全章节测试答案
上海 分布式数据库 霍雷