MLSQL Stack如何让流调试更加简单详解
发表于:2025-11-10 作者:千家信息网编辑
千家信息网最后更新 2025年11月10日,前言有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:能随时查看最新固定条数的Kafka数据调试结果(sink)能打印在web控制台流程序能自
千家信息网最后更新 2025年11月10日MLSQL Stack如何让流调试更加简单详解
前言
有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:
- 能随时查看最新固定条数的Kafka数据
- 调试结果(sink)能打印在web控制台
- 流程序能自动推测json schema(现在spark是不行的)
实现这三个点之后,我发现调试确实就变得简单很多了。
流程
首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:
set abc='''{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}''';load jsonStr.`abc` as table1;select to_json(struct(*)) as value from table1 as table2;save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092";这样我每次运行,数据就能写入到Kafka.
接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:
!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;
这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:
没有什么问题。接着我写一个非常简单的流式程序:
-- the stream name, should be uniq.set streamName="streamExample";-- use kafkaTool to infer schema from kafka!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092"as newkafkatable1;select * from newkafkatable1as table21;-- print in webConsole instead of terminal console.save append table21 as webConsole.`` options mode="Append"and duration="15"and checkpointLocation="/tmp/s-cpl4";
运行结果如下:
在终端我们也可以看到实时效果了。
补充
当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:
-- the stream name, should be uniq.set streamName="streamExample";-- mock some data.set data='''{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}''';-- load data as tableload jsonStr.`data` as datasource;-- convert table as stream sourceload mockStream.`datasource` options stepSizeRange="0-3"as newkafkatable1;-- aggregation select cast(value as string) as k from newkafkatable1as table21;!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";-- output the the result to console.save append table21 as custom.`` options mode="append"and duration="15"and sourceTable="jack"and code='''select count(*) as c from jack as newjack;save append newjack as parquet.`/tmp/jack`; '''and checkpointLocation="/tmp/cpl15";总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。
数据
结果
对流
运行
内容
程序
处理
学习
支持
不行
困难
三个
两个
事件
价值
前言
同学
地址
地方
实时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全靠人民1000字征文
大容量服务器
生成对抗网络技术
我的世界怎么设置永恒服务器
安徽点亮网络技术有限公司
史诗级网络安全
服务器硬盘数据恢复完整教程
esh网络技术软件
教育软件开发推广
oracle数据库文件夹过大
osi网络安全机制
国家网络安全月演讲稿
大公司做金融软件开发
网络安全考试2018公需
网络安全的资料图文的
怎么看数据库系统时间
联想x3850服务器进bios
数据库mc和hd
hp服务器阵列卡
网易的服务器安全模式
四川工业软件开发正规平台
软件开发项目资源清单
怎么用命令导入数据库文件
防网络安全知识ppt课件
二本网络安全工作室好找工作吗
蒲公英服务器连接显示器
吴忠软件开发优化价格
ibm服务器面板指示灯闪烁
软件开发好还是网络管理好
怀化软件开发