千家信息网

FLINK 1.12 upsertSql怎么使用

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要讲解了"FLINK 1.12 upsertSql怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"FLINK 1.12 upsertS
千家信息网最后更新 2025年12月01日FLINK 1.12 upsertSql怎么使用

这篇文章主要讲解了"FLINK 1.12 upsertSql怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"FLINK 1.12 upsertSql怎么使用"吧!

package com.konka.dsp;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.jdbc.dialect.MySQLDialect;import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;import org.apache.flink.connector.jdbc.table.JdbcUpsertTableSink;import org.apache.flink.formats.json.JsonFormatFactory;import org.apache.flink.formats.json.canal.CanalJsonFormatFactory;import org.apache.flink.shaded.curator4.org.apache.curator.framework.schema.Schema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.expressions.TimeIntervalUnit;import org.apache.flink.table.types.DataType;import org.apache.flink.types.Row;import org.apache.flink.util.CloseableIterator;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.util.concurrent.ExecutionException;import static org.apache.flink.table.api.Expressions.*;public class SalesOrderStream {    private static Logger log = LoggerFactory.getLogger(SalesOrderStream.class.getName());    public static Table report(Table transactions) {        return transactions.select(                $("customer_name"),                $("created_date"),                $("total_amount"))                .groupBy($("customer_name"),$("created_date"))                .select(                        $("customer_name"),                        $("total_amount").sum().as("total_amount"),                        $("created_date")                        );    }    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment                .getExecutionEnvironment();//        env.setParallelism(4);//        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);     // set default parallelism to 4//        tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +////                "   id  BIGINT not null,\n" +//                "    customer_name STRING,\n"+////                "    dsp_org_name STRING,\n"+//                "    total_amount      DECIMAL(38,2),\n" +////                "    total_discount      DECIMAL(16,2),\n" +////                "    pay_amount      DECIMAL(16,2),\n" +////                "    total_amount      DECIMAL(16,2),\n" +//                "    created_date TIMESTAMP(3)\n" +//                ") WITH (\n" +//                " 'connector' = 'mysql-cdc',\n" +//                " 'hostname' = '192.168.8.73',\n" +//                " 'port' = '4000',\n"+//                " 'username' = 'flink',\n"+//                " 'password' = 'flink',\n"+//                " 'database-name' = 'dspdev',\n"+//                " 'table-name' = 'sales_order_header'\n"+//                ")");        //pay_type,over_sell        tEnv.executeSql("CREATE TABLE sales_order_header_stream (\n" +                        " `id` BIGINT,\n"+                        " `total_amount` DECIMAL(16,2) ,\n"+                        " `customer_name` STRING,\n"+                        " `order_no` STRING,\n"+                        " `doc_type` STRING,\n"+                        " `sales_org` STRING,\n"+                        " `distr_chan` STRING,\n"+                        " `division` STRING,\n"+                        " `sales_grp` STRING,\n"+                        " `sales_off` STRING,\n"+                        " `purch_no_c` STRING,\n"+                        " `purch_date` STRING,\n"+                        " `sold_to` STRING,\n"+                        " `ship_to` STRING,\n"+                        " `r3_sales_order` STRING,\n"+                        " `created_by_employee_name` STRING,\n"+                        " `created_by_dept_name` STRING,\n"+                        " `created_by_dept_name` STRING,\n"+                        " `is_enable` BIGINT,\n"+                        " `is_delete` BIGINT,\n"+                        " `sale_order_status` STRING,\n"+                        " `created_by_parent_dept_name` STRING,\n"+                        " `total_discount` DECIMAL(16,2),\n"+                        " `customer_sapcode` STRING,\n"+                        " `sold_to_name` STRING,\n"+                        " `ship_to_name` STRING,\n"+                        " `total_discount_amount` DECIMAL(16,2),\n"+                        " `other_discount` DECIMAL(16,2),\n"+                        " `other_amount` DECIMAL(16,2),\n"+                        " `pay_amount` DECIMAL(16,2),\n"+                        " `dsp_org_name` STRING,\n"+                        " `delivery_address` STRING,\n"+                        " `delivery_person` STRING,\n"+                        " `delivery_phone` STRING,\n"+                        " `pay_type` STRING,\n"+                        " `over_sell` STRING,\n"+                        " `created_date` TIMESTAMP(3),\n"+                        " PRIMARY KEY (`id`) NOT ENFORCED "+                ") WITH (\n" +                "'connector' = 'kafka',\n"+                "'topic' = 'canal-data',\n"+                "'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+                "'properties.group.id' = 'test',\n"+                "'format' = 'canal-json'\n"+                ")");//        tEnv.executeSql("CREATE TABLE total_day_report (\n" +//                "    customer_name STRING,\n" +////                "    total_amount    DECIMAL(16,2),\n" +////                "    total_discount  DECIMAL(16,2),\n" +////                "    pay_amount      DECIMAL(16,2),\n" +//                "    total_amount    DECIMAL(16,2),\n" +//                "    created_date STRING,\n" +//                "    PRIMARY KEY (created_date) NOT ENFORCED" +//                ") WITH (\n" +//                "  'connector' = 'upsert-kafka',\n" +//                "  'topic' = 'customer_amount',\n" +//                "  'properties.bootstrap.servers' = '192.168.8.71:9092',\n"+//                "  'key.format' = 'json',\n"+//                "  'value.format' = 'json',\n"+//                "  'value.fields-include' = 'ALL'\n"+//                ")");        tEnv.executeSql("CREATE TABLE upsertSink (\n" +                "    customer_name STRING,\n" +//                "    total_amount    DECIMAL(16,2),\n" +//                "    total_discount  DECIMAL(16,2),\n" +//                "    pay_amount      DECIMAL(16,2),\n" +                "    total_amount    DECIMAL(16,2),\n" +                "    created_date STRING,\n" +                "    PRIMARY KEY (customer_name,created_date) NOT ENFORCED" +                ") WITH (\n" +                        "  'connector' = 'tidb',\n" +                        "  'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\n" +                        "  'tidb.username' = 'flink',\n"+                        "  'tidb.password' = 'flink',\n"+                        "  'tidb.database.name' = 'dspdev',\n"+                        "  'tidb.table.name' = 'spend_report'\n"+//                "  'connector.type'='jdbc'," +//                "  'connector.url'='jdbc:mysql://192.168.8.73:4000/dspdev',\n" +//                "  'connector.username' = 'flink',\n"+//                "  'connector.password' = 'flink',\n"+//                "  'connector.table' = 'spend_report'" +                ")");//        TableSchema tableSche = TableSchema.builder()//                .field("customer_name",DataTypes.STRING().notNull())//                .field("total_amount",DataTypes.DECIMAL(16,2))//                .field("created_date",DataTypes.STRING().notNull()).build();////        JdbcOptions jdbcOptions = JdbcOptions.builder()//                .setDBUrl("jdbc:mysql://192.168.8.73:4000/dspdev")//                .setTableName("spend_report")//                .setUsername("flink")//                .setPassword("flink")//                .setDialect(new MySQLDialect())//                .setDriverName("com.mysql.jdbc.Driver")//                .build();//        JdbcUpsertTableSink jdbcUpsertTableSink = JdbcUpsertTableSink.builder()//                .setTableSchema(tableSche)//                .setOptions(jdbcOptions)//                .build();//        jdbcUpsertTableSink.setKeyFields(new String[]{"id"});        /**         * SINK End         *///        tEnv.re("spend_report",jdbcUpsertTableSink);        Table transactions = tEnv.from("sales_order_header_stream");//      tEnv.executeSql("delete from total_day_report");       tEnv.executeSql("insert into upsertSink select dsp_org_name as customer_name,cast(sum(t.pay_amount) as decimal(16,2)) as amount,DATE_FORMAT(t.created_date,'yyyy-MM-dd') as created_date from sales_order_header_stream t group by DATE_FORMAT(t.created_date,'yyyy-MM-dd'),dsp_org_name").print();//      tEnv.executeSql("insert into spend_report select * from total_day_report");//      Table transactions = tEnv.from("total_day_report");//      report(transactions).executeInsert("spend_report");        tEnv.execute("-----------");    }}

最后数据库结果如下:

每次都是更新替换,这样的话省去很多麻烦,不用转datastream在处理了,而且1.12支持upsert-kafka,最后数据叠加如下:

upsert-kafka上面已经体现

感谢各位的阅读,以上就是"FLINK 1.12 upsertSql怎么使用"的内容了,经过本文的学习后,相信大家对FLINK 1.12 upsertSql怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

学习 内容 数据 这样的话 不用 就是 思路 情况 数据库 文章 更多 知识 知识点 篇文章 结果 跟着 问题 麻烦 叠加 处理 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 关于网络安全搜题网 香港网络服务器购买安全吗 夸克web服务器拒绝了连接 华为网络技术大赛大纲 荒野日记连接不上服务器 数据库 全局变量 软件开发人越多效率越慢定律 信息网络安全侦查 网络安全法规定违法所得金额 浦东新区威力网络技术售后服务 广州师道互联网科技 15岁学习网络安全吗 数据库维护试题 智能电视服务器软件 自动售卖机软件开发企业 新疆阿渣互联网科技有限公司 运维数据库技术文档 手机网络安全动漫图 华科网络安全研究生 我的世界东京食种服务器 人事管理系统实现数据库设计 设计企业网络安全方案 服务器端渲染页面 服务器怎么重新设置路由器 联通网络技术研究院 面试 聊城炒股软件开发 计算机网络安全培训就业方向 延庆区常规软件开发平台怎么样 生存服务器可以加模组吗 软件开发需要用到哪几个证书
0