第94课:SparkStreaming 实现广告计费系统中在线黑名单过滤实战
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本期课程内容:在线黑名单过滤实现解析SparkStreaming实现在线黑名单过滤广告计费系统,是电商必不可少的一个功能点。为了防止恶意的广告点击(假设商户A和B同时在某电商做了广告,A和B为竞争对手
千家信息网最后更新 2025年12月02日第94课:SparkStreaming 实现广告计费系统中在线黑名单过滤实战
本期课程内容:
在线黑名单过滤实现解析
SparkStreaming实现在线黑名单过滤
广告计费系统,是电商必不可少的一个功能点。为了防止恶意的广告点击(假设商户A和B同时在某电商做了广告,A和B为竞争对手,那么如果A使用点击机器人进行对B的广告的恶意点击,那么B的广告费用将很快被用完),必须对广告点击进行黑名单过滤。
可以使用leftOuterJoin对目标数据和黑名单数据进行关联,将命中黑名单的数据过滤掉。
本文主要介绍的是DStream的transform函数的使用
SparkStreaming代码实现
package com.dt.spark.sparkapps.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * 使用Scala开发集群运行的Spark 在线黑名单过滤程序 * Created by Limaoran on 2016/5/2. * 新浪微博:http://weibo.com/ilovepains/ * * 背景描述:在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益, * 只进行有效的广告点击计费或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量; * 实现技术:使用transform Api直接基于RDD编程,进行join操作 */object OnlineBlackListFilter { def main(args: Array[String]) { /** * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息。 * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置 * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如 * 只有1G的内存)的初学者 */ val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setMaster("spark://Master:7077") //此时,程序在Spark集群 val ssc = new StreamingContext(conf,Seconds(30)) /** * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务 * 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能工访问完整的信息 */ val blackList = Array(("hadoop",true),("mahout",true)) val blackListRDD = ssc.sparkContext.parallelize(blackList,8) val adsClickStream = ssc.socketTextStream("Master" ,9999) /** * 此处模拟的广告点击的每条数据的格式为:time、name * 此处map操作的结果是name、(time,name)的格式 */ val adsClientStreamFormated = adsClickStream.map(ads=>(ads.split(" ")(1),ads)) adsClientStreamFormated.transform(userClickRDD => { //通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中 val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean)) * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值 * 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容; */ val validClicked = joinedBlackListRDD.filter(joinedItem=>{ if(joinedItem._2._2.getOrElse(false)){ false }else{ true } }) validClicked.map(validClick => {validClick._2._1}) }).print() /** * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费 */ ssc.start() ssc.awaitTermination() }}将程序打包,并上传至spark集群
在spark-master节点,启动nc
root@spark-master:~# nc -lk 9999
运行OnlineBlacklistFilter程序
root@spark-master:~# /usr/local/spark-1.6.0/bin/spark-submit --class com.dt.spark.sparkapps.streaming.OnlineBlackListFilter --master spark://Master:7077 ./sparkApps.jar
在nc端输入数据
root@spark-master:~# nc -lk 999922555 spark124321 hadoop5555 Flink6666 HDFS2222 Kafka572231 Java66662 mahout
SparkStreaming运行结果:
16/05/02 08:28:00 INFO MapPartitionsRDD: Removing RDD 8 from persistence list-------------------------------------------5555 Flink6666 HDFS572231 Java22555 spark2222 Kafka
可见,结果已经将黑名单设置的hadoop和mathou过滤掉了。
在此程序的基础上,可以添加更复杂的业务逻辑规则,以满足企业的需求。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
黑名单
黑名
广告
数据
程序
运行
内容
系统
在线
有效
元素
集群
计费系统
名称
时候
结果
配置
复杂
业务
信息
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
山东移动宽带dns服务器
广州软件开发培训机构
泰鸿网络技术咨询
四川网络安全攻防演练
软件开发转技术还是转管理好
鹿泉区技术软件开发服务咨询报价
关系型数据库oracle
网络安全法相关词汇
产品数据库软件
两个系统共用一个数据库账户
小学生网络安全周教育视频
TcL软件开发在几个城市
大学生网络安全升旗主持稿
重庆英睿特互联网科技有限公司
密友软件开发
郑州中韩软件开发有限公司
华为刀片服务器一个多少钱
天津边缘计算服务器云主机
云象网络技术
连接oracle数据库用的工具
英雄联盟显示服务器在线状态
网络安全学习网
互联网医疗科技
网络安全所面临的主要攻击
it网络技术员岗们职责
泸州大二软件开发
禄劝专业性软件开发价格信息
真正做软件开发的上市公司
明日之后优化服务器好吗
dbvis数据库连接老断