flink 多表join的例子
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下MapToString 参考bug 那篇博客public static void main(String[]
千家信息网最后更新 2025年12月02日flink 多表join的例子
今天写了一个稍微复杂的例子, 实现了类似mysql group_concat 功能,记录一下
MapToString 参考bug 那篇博客
public static void main(String[] arg) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerFunction("mapToString", new MapToString()); getProjectInfo(env,tableEnv); getProject(env,tableEnv); joinTableProjectWithInfo(tableEnv); Table query = tableEnv.sqlQuery("select id, name, type from result_agg"); DataSet ds= tableEnv.toDataSet(query, Row.class); ds.print(); ds.writeAsText("/home/test", WriteMode.OVERWRITE); env.execute("multiple-table"); } public static void getProjectInfo(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "id", "type" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select project_fid, cast(project_info_type as CHAR) as type from project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project_info", s); aggProjectInfo(tableEnv,"project_info"); } public static void aggProjectInfo(BatchTableEnvironment tableEnv, String tableName) { Table tapiResult = tableEnv.scan(tableName); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select id, mapToString(collect(type)) as type from project_info group by id"); tableEnv.registerTable(tableName+"_agg", query); tapiResult = tableEnv.scan(tableName+"_agg"); tapiResult.printSchema(); } public static void getProject(ExecutionEnvironment env,BatchTableEnvironment tableEnv) { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "pid", "name" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/space?characterEncoding=utf8") .setUsername("user").setPassword("pwd") .setQuery("select fid, project_name from t_project").setRowTypeInfo(rowTypeInfo).finish(); DataSource s = env.createInput(jdbcInputFormat); tableEnv.registerDataSet("project", s); } public static void joinTableProjectWithInfo(BatchTableEnvironment tableEnv) { Table result =tableEnv.sqlQuery("select a.pid as id , a.name , b.type from project a inner join project_info_agg b on a.pid=b.id"); tableEnv.registerTable("result_agg", result); result.printSchema(); }
例子
复杂
功能
博客
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
gta 服务器
sql多个数据库一起备份吗
网络安全与拒绝网络游戏教育
上海卓闲互动网络技术有限公司
护苗网络安全教育宣传
外贸系统软件开发公司
网络技术物流传输方式
软件开发培训学校怎么样
高级网络安全技术课程
黄山服务器机箱定做
数据库教程写得最好的
个人的网络安全如今形式
公益网络安全宣传官证书要钱吗
武汉软件开发中心
服务器管理员减点券
服务器的管理口
在全县网络安全会议上的发言
服务器远程登录问题
青浦区数据软件开发厂家价格
网络技术管理员岗位职责
皇派网络技术有限公司
西藏溢嘉年华互联网科技有限公司
租用别人服务器安全吗
c调用数据库
网络安全小黑板怎么划分
北大青鸟软件开发培训有哪些课程
网络安全风险变量增多
江苏引跑数据库
sms短信服务器
sun服务器开机