spark 与flume 1.6.0的示例代码
发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package hgs.spark.streamingimport or
千家信息网最后更新 2025年11月07日spark 与flume 1.6.0的示例代码
小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner/* pom.xml中加入如下配置 ** *//*flume的conf文件a1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/logsa1.sources.r1.fileHeader=truea1.sinks.k1.type=avroa1.sinks.k1.hostname= 192.168.1.9a1.sinks.k1.port= 8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel=c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*/object SparkStreamingFlumePush { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]"); val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("d:\\checkpoint") val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } //总共有两种获取数据的方式,push和poll,这种是push即flume将数据推送给spark 该出的ip、port是spark的ip地址和port val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY) val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" ")) .map(x=>(x,1)) .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) result.print() ssc.start() ssc.awaitTermination() }} org.apache.spark spark-streaming-flume_2.11 2.1.0
package hgs.spark.streamingimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport java.net.InetAddressimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner//spark支持1.6.0的flume版本/* pom.xml中加入如下配置 ** *//* * flume配置a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir = /home/logsa1.sources.r1.fileHeader = truea1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname=192.168.6.129a1.sinks.k1.port = 8888a1.channels.c1.type=memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel = c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*///同时需要如下三个包 将三个包放到flume的classpath下面/* groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0 groupId = org.scala-lang artifactId = scala-library version = 2.11.7 groupId = org.apache.commons artifactId = commons-lang3 version = 3.5*/ object SparkStreamingFlumePoll { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]"); val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("d:\\checkpoint") val ipSeq = Seq(new InetSocketAddress("192.168.6.129",8888)) //这种方式通过spark从flume拉取数据 val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK) val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" ")) .map(x=>(x,1)) .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) result.print() ssc.start() ssc.awaitTermination() }}//遇到的错误 scala-library包在flume 的lib下面本来就有,包重复导致的冲突,删除一个/*18 Oct 2018 20:58:32,123 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) - Error while processing transaction.java.lang.IllegalStateException: begin() called when transaction is OPEN! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)18 Oct 2018 20:58:32,128 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Spark was unable to successfully process the events. Transaction is being rolled back.18 Oct 2018 20:58:32,128 WARN [New I/O worker #1] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Received an error batch - no events were received from channel! */ org.apache.spark spark-streaming-flume_2.11 2.1.0
看完了这篇文章,相信你对"spark 与flume 1.6.0的示例代码"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
方式
数据
配置
代码
示例
三个
篇文章
中加
同时
地址
完了
文件
更多
版本
知识
行业
资讯
资讯频道
错误
频道
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库安全访问的代码
石家庄久盈网络技术有限公司
苏铭服务器价格
网络安全大会开幕时间
网络安全风险等级划分标准
北上广软件开发师招聘条件
南宁市广科网络技术有限公司
数据库列插入
编辑数据库个人总结
保定地区软件开发公司
cpu服务器视频
cs go 怎么开服务器
六年级网络安全手抄报模板
网络安全主要存在问题
华为手机激活显示网络服务器繁忙
工控网络安全领域厂家
开发设立官网与服务器
广东分享在线网络技术有限公司
软件开发相关方需求
模拟城市怎么使用服务器数据
苏州联想服务器技术指导
软件开发中台化
豆瓣服务器多大
国产网络安全整机价格
网络安全专项检查动员部署会
软件开发过程中的模型
爱屋网络技术
软件开发质量管理国内外现状
战地一无法连接到多人服务器
软件开发中的环境