Flink中Connectors如何连接RabbitMq
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。通过使用Flink DataStream C
千家信息网最后更新 2025年12月02日Flink中Connectors如何连接RabbitMq
这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
通过使用Flink DataStream Connectors 数据流连接器连接到RabbitMq消息队列中间件,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 从MQ中获取数据并输出到DataStream流中 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); final DataStream stream = env .addSource(new RMQSource( connectionConfig, "test", true, new SimpleStringSchema())) .setParallelism(1); stream.print(); env.execute("flink rabbitMq source"); }} 数据流输出
DataStreamSink.java
package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 将DataStream流中的数据输出到rabbitMq队列中 */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); String [] words = new String[]{"props","student","build","name","execute"}; final DataStream stream = env.fromElements(words); stream.addSink(new RMQSink(connectionConfig,"test",new SimpleStringSchema())); env.execute("flink rabbitMq sink"); }} 数据展示
感谢各位的阅读!关于"Flink中Connectors如何连接RabbitMq"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
数据
数据流
示例
输出
内容
官方
文档
更多
模块
环境
篇文章
队列
输入
不错
实用
中间件
数据源
文章
消息
看吧
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
稀土数据库
服务器的备件更换
苏州江苏高性能服务器代理厂家
新开的南宁软件开发公司
web 日志管理服务器
数据库 哪个好
深圳软件开发者怎么报价
奕聪软件开发有限公司
在线网络技术
通讯网络安全总结
升腾服务器厂家
斗罗大陆终极魂技服务器下载
网络安全手抄报有文字没有坏人
网络技术迅速发展的原因
表彰软件开发人员
软件开发 代码评审
c 数据库插入数据
建立网络安全保护措施的目的
软件开发专业难学吗
圣泽科技互联网
知网数据库招标采购
空间数据库教程课后习题
计算机网络技术只能做网管吗
对于网络安全方面违法
我的世界方块研究所服务器下载
网络安全查题软件
武汉erp软件开发公司
微信小程序做开发数据库
实时式数据库价格
圣泽科技互联网