千家信息网

Spark UDF变长参数的方法是什么

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要介绍"Spark UDF变长参数的方法是什么",在日常操作中,相信很多人在Spark UDF变长参数的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答
千家信息网最后更新 2025年12月02日Spark UDF变长参数的方法是什么

这篇文章主要介绍"Spark UDF变长参数的方法是什么",在日常操作中,相信很多人在Spark UDF变长参数的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Spark UDF变长参数的方法是什么"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

引子

变长参数对于我们来说并不陌生,在Java里我们这么写

public void varArgs(String... args)

在Scala里我们这么写

def varArgs(cols: String*): String

而在Spark里,很多时候我们有自己的业务逻辑,现成的functions满足不了我们的需求,而当我们需要处理同一行的多个列,将其经过我们自己的逻辑合并为一个列时,变长参数及其变种实现可以给我们提供帮助。

但是在Spark UDF里我们是 无法使用变长参数传值 的,但之所以本文以变长参数开头,是因为需求起于它,而通过对它进行变换,我们可以使用变长参数或Seq类型来接收参数。

下面通过Spark-Shell来做演示,以下三种方法都可以做到多列传参,分别是

  • 变长参数(接受array类型)

  • Seq类型参数(接受array类型)

  • Row类型参数(接受struct类型)

变长参数类型的UDF

定义UDF方法

def myConcatVarargs(sep: String, cols: String*): String = cols.filter(_ != null).mkString(sep)

注册UDF函数

由于变长参数只能通过方法定义,所以这里使用部分应用函数来转换

val myConcatVarargsUDF = udf(myConcatVarargs _)

可以看到该UDF的定义如下

UserDefinedFunction(,StringType,List(StringType, ArrayType(StringType,true)))

也即变长参数转换为了ArrayType,而且函数是只包括两个参数,所以变长参数列表由此也可看出无法使用的。

变长参数列表传值

我们构造一个DataFrame如下

val df = sc.parallelize(Array(("aa", "bb", "cc"),("dd","ee","ff"))).toDF("A", "B", "C")

然后直接传入多个String类型的列到myConcatVarargsUDF

df.select(myConcatVarargsUDF(lit("-"), col("A"), col("B"), col("C"))).show

结果出现如下报错

java.lang.ClassCastException: anonfun$1 cannot be cast to scala.Function4

由此可以看出,使用变长参数列表的方式Spark是不支持的,它会被识别为四个参数的函数,而UDF确是被定义为两个参数而不是四个参数的函数!

变换:使用array()转换做第二个参数

我们使用Spark提供的array() function来转换参数为Array类型

df.select(myConcatVarargsUDF(lit("-"), array(col("A"), col("B"), col("C")))).show

结果如下

+-------------------+ |UDF(-,array(A,B,C))| +-------------------+ |           aa-bb-cc| |           dd-ee-ff| +-------------------+

由此可以看出,使用变长参数构造的UDF方法,可以通过构造Array的方式传参,来达到多列合并的目的。

使用Seq类型参数的UDF

上面提到,变长参数***被转为ArrayType,那不禁要想我们为嘛不使用Array或List类型呢?

实际上在UDF里,类型并不是我们可以随意定义的,比如使用List和Array就是不行的,我们自己定义的类型也是不行的,因为这涉及到数据的序列化和反序列化。

以Array/List为示例的错误

下面以Array类型为示例

定义函数

val myConcatArray = (cols: Array[String], sep: String) => cols.filter(_ != null).mkString(sep)

注册UDF

val myConcatArrayUDF = udf(myConcatArray)

可以看到给出的UDF签名是

UserDefinedFunction(,StringType,List())

应用UDF

df.select(myConcatArrayUDF(array(col("A"), col("B"), col("C")), lit("-"))).show

会发现报错

scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String

同样List作为参数类型也会报错,因为反序列化的时候无法构建对象,所以List和Array是无法直接作为UDF的参数类型的

以Seq做参数类型

定义调用如下

val myConcatSeq = (cols: Seq[Any], sep: String) => cols.filter(_ != null).mkString(sep)  val myConcatSeqUDF = udf(myConcatSeq)  df.select(myConcatSeqUDF(array(col("A"), col("B"), col("C")), lit("-"))).show

结果如下

+-------------------+ |UDF(array(A,B,C),-)| +-------------------+ |           aa-bb-cc| |           dd-ee-ff| +-------------------+

使用Row类型参数的UDF

我们可以使用Spark functions里struct方法构造结构体类型传参,然后用Row类型接UDF的参数,以达到多列传值的目的。

def myConcatRow: ((Row, String) => String) = (row, sep) => row.toSeq.filter(_ != null).mkString(sep)  val myConcatRowUDF = udf(myConcatRow)  df.select(myConcatRowUDF(struct(col("A"), col("B"), col("C")), lit("-"))).show

可以看到UDF的签名如下

UserDefinedFunction(,StringType,List())

结果如下

+--------------------+ |UDF(struct(A,B,C),-)| +--------------------+ |            aa-bb-cc| |            dd-ee-ff| +--------------------+

使用Row类型还可以使用模式提取,用起来会更方便

row match {   case Row(aa:String, bb:Int) => }

***

对于上面三种方法,变长参数和Seq类型参数都需要array的函数包装为ArrayType,而使用Row类型的话,则需要struct函数构建结构体类型,其实都是为了数据的序列化和反序列化。三种方法中,Row的方式更灵活可靠,而且支持不同类型并且可以明确使用模式提取,用起来相当方便。

而由此我们也可以看出,UDF不支持List和Array类型的参数,同时 自定义参数类型 如果没有混合Spark的特质实现序列化和反序列化,那么在UDF里也是 无法用作参数类型 的。当然,Seq类型是可以 的,可以接多列的数组传值。

此外,我们也可以使用柯里化来达到多列传参的目的,只是不同参数个数需要定义不同的UDF了。

到此,关于"Spark UDF变长参数的方法是什么"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

参数 类型 方法 函数 序列 由此 结果 学习 不同 列传 方式 目的 帮助 支持 不行 两个 多个 数据 时候 更多 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 专业的计算机网络技术有哪些 服务器2008什么时候发布 数据库外键关联 宝山区销售软件开发 软件开发属于大学哪个专业 富土康资讯及网络安全试卷 软件开发 测试合同范本 数据库不能自动备份 苏州常见软件开发售后服务 北京通讯软件开发设施价格优惠 智能ai回答软件开发 提取数据库密码 ios开发用软件开发 天涯明月刀开新服务器 区块链数据库怎么存取数据 深信服软件开发面试牛客网 江汉哪里有软件开发方案 企业信息数据库 报送异常数据 湖北网络安全宣传中心 集群和服务器配置 画网络安全人人有责手抄报 银行用的服务器一般用哪个公司的 具有丰富的软件开发经验 R语言修改数据库列类型 河南专升本网络技术2021 拓道互联网科技 乌鲁木齐网络安全培训简单易学 北京正规软件开发技术指导 广东咨询网络技术排名靠前 信息化网络安全主题教育调研
0