spark-sql 自定义函数
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,(1)自定义UDFobject SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Lo
千家信息网最后更新 2025年12月03日spark-sql 自定义函数
(1)自定义UDF
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext /** * 注册定义的UDF: * 这里的泛型[Int,String] * 第一个是返回值类型,后面可以是一个或者多个,是方法参数类型 */ sqlContext.udf.register[Int,String]("strLen",strLen) val sql= """ |select strLen("zhangsan") """.stripMargin spark.sql(sql).show() } //自定义UDF方法 def strLen(str:String):Integer={ str.length }}(2) 自定义UDAF
这里举的例子是实现一个count:
自定义UDAF类:
class MyCountUDAF extends UserDefinedAggregateFunction{ //该UDAF输入的数据类型 override def inputSchema: StructType = { StructType(List( StructField("age",DataTypes.IntegerType) )) } //在该UDAF中聚合的数据类型 override def bufferSchema: StructType = { StructType(List( StructField("age",DataTypes.IntegerType) )) } //该UDAF输出的数据类型 override def dataType: DataType = DataTypes.IntegerType //确定性判断,通常特定输入和输出的类型一致 override def deterministic: Boolean = true //buffer:计算过程中临时的存储了聚合结果的Buffer override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0,0) } /** * 分区内的数据聚合合并 * @param buffer:就是我们在initialize方法中声明初始化的临时缓冲区 * @param input:聚合操作新传入的值 */ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val oldValue=buffer.getInt(0) buffer.update(0,oldValue+1) } /** * 分区间的聚合 * @param buffer1:分区一聚合的临时结果 * @param buffer2;分区二聚合的临时结果 */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val p1=buffer1.getInt(0) val p2=buffer2.getInt(0) buffer1.update(0,p1+p2) } //该聚合函数最终输出的值 override def evaluate(buffer: Row): Any = { buffer.get(0) }}调用:
object SparkSqlTest { def main(args: Array[String]): Unit = { //屏蔽多余的日志 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.project-spark").setLevel(Level.WARN) //构建编程入口 val conf: SparkConf = new SparkConf() conf.setAppName("SparkSqlTest") .setMaster("local[2]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[Student])) val spark: SparkSession = SparkSession.builder().config(conf) .getOrCreate() //创建sqlcontext对象 val sqlContext: SQLContext = spark.sqlContext //注册UDAF sqlContext.udf.register("myCount",new MyCountUDAF()) val stuList = List( new Student("委xx", 18), new Student("吴xx", 18), new Student("戚xx", 18), new Student("王xx", 19), new Student("薛xx", 19) ) import spark.implicits._ val stuDS: Dataset[Student] = sqlContext.createDataset(stuList) stuDS.createTempView("student") val sql= """ |select myCount(1) counts |from student |group by age |order by counts """.stripMargin spark.sql(sql).show() }}case class Student(name:String,age:Int)
类型
数据
方法
结果
输出
入口
对象
日志
编程
输入
函数
一致
例子
参数
多个
就是
新传
确定性
缓冲区
过程
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
h2两个数据库同步更新
成都网络安全等级保护条例
linux 服务器性能
新华三国服务器市场分析
公共信息网络安全监察简称
经营范围科技软件开发
更新数据库锁表
机械软件开发的课题
攻方和守方的案例数据库
怎么看数据库的自动备份
服务软件开发商家
齐鲁软件园软件开发
电脑插网线怎么连接服务器
嘉定区网络营销软件开发诚信为本
江西昌圣网络技术有限公司
邮箱无法连接服务器照片
常州个人软件开发管理方法
路由器设置屏蔽lol服务器
互联网科技早报
顶呱呱网络技术有限公司
数据库怎么搬迁
闵行区网络技术服务咨询收费
收件服务器是什么意思qq邮箱
杭州网络安全培训价格
网络安全事项报告制度
网络技术计划的要素
生成导入数据库文件
收费软件怎么修改数据库
分析分析器与数据库的关系
数据库设模式