Spark ALS实现的步骤是什么
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要讲解了"Spark ALS实现的步骤是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark ALS实现的步骤是什么"吧!spark
千家信息网最后更新 2025年12月01日Spark ALS实现的步骤是什么
这篇文章主要讲解了"Spark ALS实现的步骤是什么",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Spark ALS实现的步骤是什么"吧!
spark ALS算法是做个性推荐用的,它所需要的数据集是类似用户对商品的打分表之类的数据集。实现步骤主要以下几步:
1、定义输入数据
2、输入数据转换成评分数据格式,如case class Rating(user: Int, movie: Int, rating: Float)
3、设计ALS模型训练数据
4、计算推荐数据,存储起来供业务系统直接使用。
下面看看具体的代码:
package recommendimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesimport org.apache.spark.rdd.RDDimport org.apache.spark.ml.evaluation.RegressionEvaluatorimport org.apache.spark.ml.recommendation.ALSimport org.apache.spark.ml.feature.StringIndexerimport org.apache.spark.sql.Datasetimport org.apache.spark.sql.Rowimport org.apache.spark.ml.feature.IndexToStringimport scala.collection.mutable.ArrayBufferimport org.apache.spark.TaskContextimport org.apache.spark.ml.Pipelineimport org.apache.spark.sql.SaveMode/** * 个性化推荐ALS算法 * 用户对资源的点击率作为评分 * */object Recommend { case class Rating(user: Int, movie: Int, rating: Float) def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Java Spark MYSQL Recommend") .master("local") .config("es.nodes", "127.0.0.1") .config("es.port", "9200") .config("es.mapping.date.rich", "false") //不解析日期类型 .getOrCreate() trainModel(spark) spark.close() } def trainModel(spark: SparkSession): Unit = { import spark.implicits._ val MAX = 3 // 最大推荐数目 val rank = 10 // 向量大小,默认10 val iterations = 10 // 迭代次数,默认10 val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8" val table = "clicks" val user = "root" val pass = "123456" val props = new Properties() props.setProperty("user", user) // 设置用户名 props.setProperty("password", pass) // 设置密码 val clicks = spark.read.jdbc(url, table, props).repartition(4) clicks.createOrReplaceGlobalTempView("clicks") val agg = spark.sql("SELECT userId ,resId ,COUNT(id) AS clicks FROM global_temp.clicks GROUP BY userId,resId") val userIndexer = new StringIndexer() .setInputCol("userId") .setOutputCol("userIndex") val resIndexer = new StringIndexer() .setInputCol("resId") .setOutputCol("resIndex") val indexed1 = userIndexer.fit(agg).transform(agg) val indexed2 = resIndexer.fit(indexed1).transform(indexed1) indexed2.show() val ratings = indexed2.map(x => Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat)) ratings.show() val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1)) println("training:") training.show() println("test:") test.show() //隐性反馈和显示反馈 val als = new ALS() .setMaxIter(iterations) .setRegParam(0.01) .setImplicitPrefs(false) .setUserCol("user") .setItemCol("movie") .setRatingCol("rating") val model = als.fit(ratings) // Evaluate the model by computing the RMSE on the test data // Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics model.setColdStartStrategy("drop") val predictions = model.transform(test) val r2 = model.recommendForAllUsers(MAX) println(r2.schema) val result = r2.rdd.flatMap(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) var result = ArrayBuffer[Rating]() arrayPredict.foreach(rowPredict => { val p = rowPredict(0).asInstanceOf[Int] val score = rowPredict(1).asInstanceOf[Float] val sql = "insert into recommends(userId,resId,score) values (" + userId + "," + rowPredict(0) + "," + rowPredict(1) + ")" println("sql:" + sql) result.append(Rating(userId, p, score)) }) for (i <- result) yield { i } }) println("推荐结果RDD已展开") result.toDF().show() //资源id隐射 val resInt2Index = new IndexToString() .setInputCol("movie") .setOutputCol("resId") .setLabels(resIndexer.fit(indexed1).labels) //userId映射 val userInt2Index = new IndexToString() .setInputCol("user") .setOutputCol("userId") .setLabels(userIndexer.fit(agg).labels) val rc = userInt2Index.transform(resInt2Index.transform(result.toDF())) rc.show() rc.withColumnRenamed("rating","score").select("userId", "resId","score").write.mode(SaveMode.Overwrite) .format("jdbc") .option("url", url) .option("dbtable", "recommends") .option("user", user) .option("password", pass) .option("batchsize", "5000") .option("truncate", "true") .save println("finished!!!") }}DataFrame写入mysql还有另一种写法,就是原生写入:
//分区写推荐结果到mysql r2.foreachPartition(p => { @transient val conn = ConnectionPool.getConnection p.foreach(row => { val userId = row.getInt(0) val arrayPredict: Seq[Row] = row.getSeq(1) arrayPredict.foreach(rowPredict => { println(rowPredict(0) + "@" + rowPredict(1)) val sql = "insert into recommends(userId,resId,score) values (" + userId+"," + rowPredict(0)+","+ rowPredict(1) + ")" println("sql:"+sql) val stmt = conn.createStatement stmt.executeUpdate(sql) }) }) ConnectionPool.returnConnection(conn) })感谢各位的阅读,以上就是"Spark ALS实现的步骤是什么"的内容了,经过本文的学习后,相信大家对Spark ALS实现的步骤是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
推荐
步骤
用户
学习
个性
内容
就是
算法
结果
资源
评分
输入
最大
业务
代码
写法
向量
商品
大小
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
f116数据库
国家健康码数据库电话
cpu 软件开发
江苏电信dns服务器
eset快速生成web服务器
pg数据库最大容量
仙游有没有招聘网络安全职员
数据库技术编程
研究服务器安全的背景及意义
怎么使用数据库数据发文章
上海新能源网络技术厂家现货
海豚时序数据库
智慧城市互联网科技公司
生化围城服务器处于停战状态
游戏本适合用于软件开发吗
服务器配置
银川oa软件开发贵吗
网络安全宣传周怎么上网
电脑显示wf服务器故障
兰州数据库软件
厦门英九网络技术
网络安全数字矛盾空间
网络安全作战理论
以网络安全为抓手
高青物流竞价软件开发公司
数据库时间类型java怎么
珠海考试软件开发报价
游戏本适合用于软件开发吗
网络安全案例事件分析
oppo 网络技术