千家信息网

如何进行spark python编程

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇文章给大家分享的是有关如何进行spark python编程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。spark应用程序结构Spa
千家信息网最后更新 2025年12月03日如何进行spark python编程

本篇文章给大家分享的是有关如何进行spark python编程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

spark应用程序结构

Spark应用程序可分两部分:driver部分和executor部分初始化SparkContext和主体程序

A:driver部分

driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在executor部分运行完毕后,需要将SparkContext关闭。

B:executor部分

Spark应用程序的executor部分是对数据的处理,数据分三种:

  • 原生数据,包含输入的数据和输出的数据

    • 生成Scala标量数据,如count(返回RDD中元素的个数)、reduce、fold/aggregate;返回几个标量,如take(返回前几个元素)。

    • 生成Scala集合数据集,如collect(把RDD中的所有元素倒入 Scala集合类型)、lookup(查找对应key的所有值)。

    • 生成hadoop数据集,如saveAsTextFile、saveAsSequenceFile

    • scala集合数据集,如Array(1,2,3,4,5),Spark使用parallelize方法转换成RDD。

    • hadoop数据集,Spark支持存储在hadoop上的文件和hadoop支持的其他文件系统,如本地文件、HBase、SequenceFile和Hadoop的输入格式。例如Spark使用txtFile方法可以将本地文件或HDFS文件转换成RDD。

    • 对于输入原生数据,Spark目前提供了两种:

    • 对于输出数据,Spark除了支持以上两种数据,还支持scala标量

  • RDD,Spark进行并行运算的基本单位,其细节参见RDD 细解。RDD提供了四种算子:

    • 窄依赖算子

    • 宽依赖算子,宽依赖会涉及shuffle类,在DAG图解析时以此为边界产生Stage,如图所示。

    • 输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatMap;

    • 输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce;

    • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample。

    • 对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;

    • 对两个RDD基于key进行join和重组,如join、cogroup。

    • 输入算子,将原生数据转换成RDD,如parallelize、txtFile等

    • 转换算子,最主要的算子,是Spark生成DAG图的对象,转换算子并不立即执行,在触发行动算子后再提交给driver处理,生成DAG图 --> Stage --> Task --> Worker执行。按转化算子在DAG图中作用,可以分成两种:

    • 缓存算子,对于要多次使用的RDD,可以缓冲加快运行速度,对重要数据可以采用多备份缓存。

    • 行动算子,将运算结果RDD转换成原生数据,如count、reduce、collect、saveAsTextFile等。

  • 共享变量,在Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:

    • 广播变量,可以缓存到各个节点的共享变量,通常为只读,使用方法:

    • >>> from pyspark.context import SparkContext                    >>> sc = SparkContext('local', 'test')                           >>> b = sc.broadcast([1, 2, 3, 4, 5])                                    >>> b.value[1, 2, 3, 4, 5]                                                        >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    • 累计器,只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用"+="操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。使用方法:

python编程

实验项目

sogou日志数据分析

实验数据来源:sogou精简版数据下载地址

数据格式说明:

访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL

其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID。

以上数据格式是官方说明,实际上该数据集中排名和顺序号之间不是\t分割,而是空格分割。

一个session内查询次数最多的用户的session与相应的查询次数

import sys  from pyspark import SparkContext    if __name__ == "__main__":      if len(sys.argv) != 2:          print >> sys.stderr, "Usage: SogouC "          exit(-1)      sc = SparkContext(appName="SogouC")      sgRDD = sc.textFile(sys.argv[1])      print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)      sc.stop()

虚拟集群中任意节点运行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt

运行结果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

以上就是如何进行spark python编程,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

数据 算子 部分 变量 用户 输入 程序 运行 文件 结果 支持 生成 元素 应用程序 方法 节点 应用 查询 输出 编程 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库异地备份国家标准 常州斑马网络技术有限公司 有外国节点的服务器 服务器异常开不了机 西方网络安全控制 vps服务器多少钱 数据库sql视图的作用 泉州市网络安全平台招标 数据库下载库 深圳市迈集客网络技术有限公司 网络安全的方法有哪些 呼市新华互联网科技学校多大 写关于产品数据库 穿越火线网通区哪个服务器最火 伊美莱斯网络技术有限公司 江阴正睿服务器维修中心热线 张掖青少年网络安全知识竞赛 诚实守信网络安全主题班会记录 php用户登陆 不用数据库 win7数据库集中度 上海惊瑜互联网科技有限公司 平面设计和网络技术贴吧 苏州铭阳互联网科技有限公司 怎么查看服务器的安全日志 宁夏智慧社区软件开发哪儿好 魔兽世界服务器人口查询 赛季服 3dmax 渲染服务器搭建 select注解多个数据库 正规的服务器搬迁收费 深圳软件开发公司伺服驱动器
0