FlinkSQL怎么搭建
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!1.背景由于公司内部需求较多,并不想
千家信息网最后更新 2025年12月02日FlinkSQL怎么搭建
本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!
1.背景
由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x
2.效果
传一个 sql 文件给 jar 包,然后 sql 文件内的 sql 将自动执行
3. jar 包 vs web 界面
调研了基于 web 的 zeppline
zeppline 设计的初衷其实是为了交互式分析
基于 zeppline rest api 与现有的监控不兼容,需要修改现有监控的代码
虽然带有 web 界面的对用户很是友好,对于分析人员来说,是一个不错的选择,但对于开发人员来说,真正的线上长时间的运行程序,开发成 HA 的 server 还是有必要的
基于以上 3 点最终选择 jar 作为最终的方式
4. 使用
将 sql 写入 xxx.sql 文件中,如
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;-- 30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day; -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos( info_index int, order_id bigint)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache( finishOrders BIGINT, courier_id BIGINT, dayStr String)WITH ( 'connector' = 'redis', 'hostPort'='localhost:6400', 'keyType'='hash', 'keyTemplate'='test2_${courier_id}', 'fieldTemplate'='${dayStr}', 'valueNames'='finishOrders', 'expireTime'='259200');create view temp asselect o.courier_id, (CASE WHEN sum(infosMaxIndex.info_index) is null then 0 else sum(infosMaxIndex.info_index) end) finishOrders, o.status, dayStrfrom ((select courier_id, id, last_value(status) status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr from orders where status = 60 group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id from infos group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;将 flinksql-platform 打包并上传至服务器
将必要的 connector jar 放入到相应的目录下
执行,如
flink-1.12.0/bin/flink run -p 3 -yt ./flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql
其中
-C 添加 udfJar 等第三方 jar 包 -C 参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
-yt 目录 将 udfJar 等第三方 jar 包提交到 TaskManager 上
到此,相信大家对"FlinkSQL怎么搭建"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文件
必要
人员
内容
界面
目录
程序
第三方
分析
学习
开发
监控
运行
选择
不错
实用
更深
交互式
代码
公司
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
原生软件开发是什么意思
高级工业网络技术
山东软件开发多不多
长沙oa软件开发平台
千千静听歌词服务器最新补丁
网络安全属于软件开发吗
数据库设计的基本原则简单
网络技术应用 高二
网络安全防护的数学模型
网络安全的主要特点不包括
宽带连接服务器无反应
越光宝盒连接不上通讯服务器
西安交通大学软件开发基础
家用共享服务器
服务器电源 拆
云平台的管理范围有服务器吗
网络安全防护加固
网络安全字眼
揭阳无线软件开发平均价格
上海软件开发培训中心
视易系统网关服务器的作用
网络是一种单纯的网络技术
mysql数据库登录不进
电脑总是服务器连接失败
有嵌入式软件开发
石家庄设备管理软件开发
关于网络安全保障监管
波兰网络安全公司
删除数据库 判刑
黄石市网络安全知识