Flink如何读取数据源
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要为大家展示了"Flink如何读取数据源",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Flink如何读取数据源"这篇文章吧。从集合中读取
千家信息网最后更新 2025年12月02日Flink如何读取数据源
这篇文章主要为大家展示了"Flink如何读取数据源",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Flink如何读取数据源"这篇文章吧。
从集合中读取
private static void radFromCollection(String[] args) throws Exception { //将参数转成对象 MultipleParameterTool params = MultipleParameterTool.fromArgs(args); //创建批处理执行环境// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建流程处理 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置每个算子的的并行度,默认为cup核数(测试环境下) env.setParallelism(2); //设置最大并行度 env.setMaxParallelism(6); //从集合中读取 List collectionData = Arrays.asList("a", "b", "c", "d"); DataStreamSource dataStreamSource = env.fromCollection(collectionData); //从数组中读取 // env.fromElements("a", "b", "c", "d"); dataStreamSource.print(); //dataStreamSource.addSink(new PrintSinkFunction<>()); env.execute(); } 从文件中读取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.readTextFile("E:\\GIT\\flink-learn\\flink1\\word.txt", "utf-8"); dataStreamSource.print(); env.execute();
从kafka 中读取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers", "10.1.5.130:9092"); properties.put("zookeeper.connect", "10.2.5.135:2181"); properties.put("group.id", "my-flink"); properties.put("auto.offset.reset", "latest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010<>( "flink",// topic new SimpleStringSchema(), properties ); DataStreamSource dataStreamSource = env.addSource(kafkaConsumer010); dataStreamSource.print(); env.execute(); 从自定义Source 中读取
实现
org.apache.flink.streaming.api.functions.source.SourceFunction
public static final class MyDataSource implements SourceFunction{ private Boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (running) { double data = random.nextDouble() * 100; sourceContext.collectWithTimestamp(String.valueOf(data), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { this.running = false; } }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.addSource(new MyDataSource()); dataStreamSource.print(); env.execute();
以上是"Flink如何读取数据源"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
数据
数据源
内容
篇文章
环境
学习
帮助
最大
参数
对象
数组
文件
易懂
更多
条理
核数
流程
知识
算子
编带
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库mdf文件打开失败
淳诺互联网科技
lol韩国服务器延迟
同时连接两个数据库
服务器手语
服务器有个灯link
滨州手机app软件开发费用
数据库dbml用在哪一层
服务器卡在正在使用中怎么办
南京正厚软件开发面试
软件开发服务加计抵减
软件开发创业目标
华为服务器出售的影响
怎样入侵迷你世界服务器
梦幻西游如何查看服务器人数
查手机服务器ip和端口网站
忍者必须死3玩哪个服务器
长沙市盈进高网络技术有限公司
vb6.0增删改查数据库
数据库运维形考任务一答案
腾讯云服务器只有管理员界面
完美国际服务器列表
天眼查达梦数据库
qt数据库新插入一行
一站式服务器搬家哪家强
邮箱为什么连接不上服务器
路由器首选dns用什么服务器
财务报表不能连接服务器怎么回事
临朐软件开发
瑞庭网络技术有限公司在哪