千家信息网

Flink Connectors怎么连接Redis

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Flink Connectors怎么连接Redis",在日常操作中,相信很多人在Flink Connectors怎么连接Redis问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
千家信息网最后更新 2025年12月03日Flink Connectors怎么连接Redis

这篇文章主要介绍"Flink Connectors怎么连接Redis",在日常操作中,相信很多人在Flink Connectors怎么连接Redis问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink Connectors怎么连接Redis"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

通过使用Flink DataStream Connectors 数据流连接器连接到Redis缓存数据库,并提供数据流输入与输出操作;

示例环境

java.version: 1.8.xflink.version: 1.11.1redis:3.2

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.redis;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.Protocol;/** * @Description 从redis中读取数据输出到DataStream数据流中 */public class DataStreamSource {    /**     * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        String key = "props";        //实现RichSourceFunction抽象方法,加载数据源数据到流中        DataStream> dataStream = env.addSource(new RichSourceFunction>(){            private JedisPool jedisPool = null;            @Override            public void run(SourceContext> ctx) throws Exception {                jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379, Protocol.DEFAULT_TIMEOUT);                Jedis jedis = jedisPool.getResource();                try{                    ctx.collect(Tuple2.of(key, jedis.get(key)));                }catch(Exception e){                    e.printStackTrace();                }finally{                    if (jedis != null){                        //用完即关,内部会做判断,如果存在数据源与池,则回滚到池中                        jedis.close();                    }                }            }            @Override            public void cancel() {                try {                    super.close();                }catch(Exception e){                }                if (jedisPool != null){                    jedisPool.close();                    jedisPool = null;                }            }        });        dataStream.print();        env.execute("flink redis source");    }}

数据流输出

DataStreamSink.java

package com.flink.examples.redis;import org.apache.commons.lang3.RandomUtils;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/** * @Description 将数据流写入到redis中 */public class DataStreamSink {    /**     * 官方文档:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //1.写入数据到流中        String [] words = new String[]{"props","student","build","name","execute"};        DataStream> sourceStream = env.fromElements(words).map(new MapFunction>() {            @Override            public Tuple2 map(String v) throws Exception {                return Tuple2.of(v, RandomUtils.nextInt(1000, 9999));            }        });        sourceStream.print();        //2.实例化FlinkJedisPoolConfig 配置redis        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();        //3.写入到redis,实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis        sourceStream.addSink(new RedisSink<>(conf, new RedisMapper>(){            @Override            public RedisCommandDescription getCommandDescription() {                return new RedisCommandDescription(RedisCommand.SET, null);                //通过实例化传参,设置hash值的key                //return new RedisCommandDescription(RedisCommand.HSET, key);            }            @Override            public String getKeyFromData(Tuple2 tuple2) {                return tuple2.f0;            }            @Override            public String getValueFromData(Tuple2 tuple2) {                return tuple2.f1.toString();            }        }));        env.execute("flink redis sink");    }}

数据展示

到此,关于"Flink Connectors怎么连接Redis"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

数据 数据流 示例 学习 实例 数据源 输出 官方 文档 方法 更多 模块 环境 帮助 输入 实用 接下来 数据库 文章 方式 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 服务器上数据库连接异常 宁波市镇海有米网络技术有限公司 论文数据库表能分在两页上吗 软件开发一般提成多少 如何建域服务器 长宁区品牌数据库系统职能 手机如何连接日本服务器 安徽智能还款软件开发 erp服务器能放云上 不通过表单提交数据库 柳州网络安全等级保护 lol服务器界面不显示多少ms 南京软件开发服务平台 河北链家网络技术有限公司 山西智能软件开发销售价格 济南互联网络科技有限公司 兰州安卓软件开发最新招聘信息 江苏极光网络技术有限责任公司 网络安全法对等保测评规定 中秋节视频软件开发 数据库设计技术路线 西格数据库 一般数据库用户表主键是啥 迷你世界ice服务器的账号 软件开发过程中的问题统计 网络安全专题讲座用英语怎么说 法律禁止的危害网络安全的行为 数据库删除缓存后如何恢复 网络技术工作计划书 搭建数据库的公司
0