利用flink统计消息回复情况
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下public static void main(S
千家信息网最后更新 2025年12月03日利用flink统计消息回复情况
其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下
public static void main(String[] arg) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableSysoutLogging();//开启Sysout打日志 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为process time Properties props = new Properties(); props.put("bootstrap.servers", "kafkaip:9092"); props.put("group.id", "metric-group4"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); //value 反序列化 DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "im-message-topic3", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); DataStream bean3DataStream = dataStreamSource.map(new MapFunction() { @Override public Message map(String value) throws Exception { logger.info("receive msg:"+value); JSONObject jsonObject =JSONObject.parseObject(value); Message s= new Message( jsonObject.getString("sessionId"), jsonObject.getString("fromUid"), jsonObject.getString("toUid"), jsonObject.getString("chatType"), jsonObject.getString("type"), jsonObject.getString("msgId"), jsonObject.getString("msg"), jsonObject.getLong("timestampSend") ); return s; } }); //设置水印,并过滤数据 DataStream bean3DataStreamWithAssignTime = bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() { @Override public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { for (Message t: values) { logger.info("window start time:"+new Date(window.getStart()).toString()); logger.info("real time:"+new Date(t.getTimestampSend()).toString()); if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);// appendStream.addSink(new Sink()); //对过滤后的数据,使用正则匹配数据 Table tb2 = tableEnv.sqlQuery( "SELECT " + " * " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.timestampSend as answerTime, "+ "LAST(e1.timestampSend) as customer_event_time, " + "e2.fromUid as empUid, " + "e1.timestampSend as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1+ e2+?) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class); appendStream2.addSink(new Sink2()); env.execute("msg v5"); } public static class TruckTimestamp extends AscendingTimestampExtractor { private static final long serialVersionUID = 1L; @Override public long extractAscendingTimestamp(Message element) { return element.getTimestampSend(); } } public static class Sink implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"orinal time:"+value.toString()); } } public static class Sink2 implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"new time:"+value.toString()); } }
数据
序列
三个
业务
代码
函数
单位
大小
实际
日志
时间
正则
水印
情况
消息
统计
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
公司组建网络技术部方案
徐州企业网络安全
平板连接服务器时失败
数据库er模型转换关系模型
拉曼数据库软件
西城区单路服务器
.dat文件导入到数据库
淄博多轩社网络技术有限公司
中国网络安全认可体系
山海异兽录主播服务器
项目软件开发工程师 职责
软件开发外包付款比例
贵州北斗校时服务器云空间
剑灵一区哪个服务器好
阿里云服务器手机上怎么用
民歌数据库建设方案
中专二年级网络技术考试
软件开发服务的进项从哪儿来
网络安全的扩展知识讨论
阜阳联想服务器内存条推荐商家
嵌入式软件开发和后端开发
崂山区电商软件开发公司有哪些
数据库考研
服务器 温度监控
rebro软件开发商日本
自制数据储存服务器
财务软件数据库占用内存大吗
网络安全防护技术培训
服务器卖钱
杭州宇视软件开发公司