Flink中怎么自定义Redis的Sink函数
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.添加redis对应pom依赖
千家信息网最后更新 2025年12月03日Flink中怎么自定义Redis的Sink函数
这期内容当中小编将会给大家带来有关Flink中怎么自定义Redis的Sink函数,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1.添加redis对应pom依赖
org.apache.bahir flink-connector-redis_2.11 1.0
2.主函数代码:
package com.hadoop.ljs.flink110.redis;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import scala.Tuple2;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-05-02 10:30* @version: v1.0* @description: com.hadoop.ljs.flink110.redis*/public class RedisSinkMain {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamsource = senv.socketTextStream("localhost", 9000); DataStreamfilter = source.filter(new FilterFunction () { @Overridepublic boolean filter(String value) throws Exception {if (null == value || value.split(",").length != 2) {return false;}return true;}});DataStream> keyValue = filter.map(new MapFunction >() { @Overridepublic Tuple2map(String value) throws Exception { String[] split = value.split(",");return new Tuple2<>(split[0], split[1]);}});//创建redis的配置 单机redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfigFlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();keyValue.addSink(new RedisSink>(redisConf, new RedisMapper >() { @Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"table1");}@Overridepublic String getKeyFromData(Tuple2data) { return data._1;}@Overridepublic String getValueFromData(Tuple2data) { return data._2;}}));/*启动执行*/senv.execute();}}
3.函数测试
1).window端scoket发送数据

2.redis结果验证

上述就是小编为大家分享的Flink中怎么自定义Redis的Sink函数了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
函数
内容
分析
专业
中小
代码
内容丰富
单机
就是
数据
文章
更多
知识
篇文章
结果
行业
角度
资讯
资讯频道
集群
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器退信
数据库触发安全性
gis数据库
高级网络技术实训心得500字
湖北省网络技术安全
电视在哪看dns服务器设置
电脑网络安全主要内容
数据库的时钟
暗黑修仙数据库列表
查看db怪物数据库
贵州工业服务器云主机
数据库操作日志查看
服务器 直流供电
法迅网络技术
酷音铃声软件开发者
软件开发技术特点范文
鄞州一站式软件开发商
mc手机服务器添加模组
百惠科技 精彩互联网电视
宁夏干部网络技术培训
快速app软件开发
马鞍山联想服务器硬盘价格推荐
城市宏观数据库
通讯录系统数据库设计
网络技术所学专业
便宜百独服务器租用
360快搜网络技术有限公司
武汉服务管理软件开发
软件开发哪个城市好
青岛凯斯特网络技术