spark+kafka+redis统计网站访问者IP
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。1、kafka版本是最新的0.10.0.02、spark版本是1.613、下载对应的spark-streaming-kafka-a
千家信息网最后更新 2025年12月02日spark+kafka+redis统计网站访问者IP
*目的是为了防采集。需要对网站的日志信息,进行一个实时的IP访问监控。
1、kafka版本是最新的0.10.0.0
2、spark版本是1.61

3、下载对应的spark-streaming-kafka-assembly_2.10-1.6.1.jar放到spark目录下的lib目录下
下载地址 https://repo1.maven.org/maven2/org/apache/spark/

4、利用flume将nginx日志写入到kafka(后续补充)
5、编写python脚本,命名为test_spark_collect_ip.py
# coding:utf-8__author__ = 'chenhuachao''''利用pyspark连接kafka,统计访问者的IP信息,做出的一个实时的防采集'''import sysreload(sys)sys.setdefaultencoding('utf-8')import redisimport datetimefrom pyspark.streaming.kafka import KafkaUtilsfrom pyspark.streaming import StreamingContextfrom pyspark import SparkConf, SparkContextdef parse(logstring): try: infodict = eval(logstring.encode('utf-8')) ip =infodict.get('ip') assert infodict['tj-event'] == 'onload' assert ip return (ip) except: return ()def insert_redis(rdd): '''将符合条件的结果写入到redis''' conn = redis.Redis(host='redis的IP',port=6380) for i,j in rdd.collect(): print i,j if j >=3 and j != "": conn.sadd('cheating_ip_set_{0}'.format(datetime.datetime.now().strftime("%Y%m%d")),i) conn.expire('cheating_ip_set',86400)if __name__ == "__main__": topic = 'statis-detailinfo-pageevent' sc = SparkContext(appName="pyspark_kafka_streaming_chc") ssc = StreamingContext(sc,10) checkpointDirectory = '/tmp/checkpoint/cp3' ssc.checkpoint(checkpointDirectory) kvs = KafkaUtils.createDirectStream(ssc,['statis-detailinfo-pageevent'],kafkaParams={"auto.offset.reset": "largest","metadata.broker.list":"kafka-IP:9092,kafka-IP:9092"}) #kvs.map(lambda line:line[1]).map(lambda x:parse(x)).pprint() #这里用到了一个滑动窗口的概念,需要深入了解的可以参考http://www.kancloud.cn/kancloud/spark-programming-guide/51567 #ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKey(lambda ips,num:ips+num) ipcount = kvs.map(lambda line: line[1]).map(lambda x:parse(x)).map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda ips,num:ips+num,30,10) # 预处理,如果需要多次计算则使用缓存 # 传入rdd进行循坏,即用于foreachRdd(insertRedis) ipcount.foreachRDD(insert_redis) # 各节点的rdd的循坏 # wordCounts.foreachRDD(lambda rdd: rdd.foreach(sendRecord)) ssc.start()6、执行命令
bin/spark-submit --jars lib/spark-streaming-kafka-assembly_2.10-1.6.1.jar test_spark_collect_ip.py
7、输出界面
8、更多信息,请参考spark的官网http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#module-pyspark.streaming.kafka
信息
实时
日志
版本
目录
参考
网站
访问者
统计
命令
地址
更多
条件
概念
界面
目的
结果
缓存
脚本
节点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库输入输出接口设计
宁海直销软件开发项目管理
昆明系统软件开发
厦门网络安全科技馆
意识形式网络安全培训
最好最安全的服务器
软件开发外包平台有哪些
软件开发设计主管
国家科技厅互联网新闻
莱阳ios软件开发解决方案
互联网科技五大专业学院
aspx数据库
下列发生的网络安全事件可以
广州市软件开发公司电话
河北区媒体网络技术售后保障
宝鸡云途网络技术有限公司
长宁区创新数据库服务商收费标准
在下列数据库中不属于关系型
网络安全邓琦
计算机网络技术发展里程碑
和邮政快递有关的数据库系统
天锐绿盾如何转移服务器
无棣网络安全
中国网络安全企业50强
作业辅导软件开发
数据库跑代码
计算机网络技术不好
数据库运行调试
没有防火墙服务器对网络危害
网络安全的手指