Spark-Streaming如何处理数据到mysql中
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,本篇内容主要讲解"Spark-Streaming如何处理数据到mysql中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark-Streaming如何
千家信息网最后更新 2025年12月01日Spark-Streaming如何处理数据到mysql中
本篇内容主要讲解"Spark-Streaming如何处理数据到mysql中",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Spark-Streaming如何处理数据到mysql中"吧!
1.说明
数据表如下:
create database test;use test;DROP TABLE IF EXISTS car_gps;CREATE TABLE IF NOT EXISTS car_gps(deployNum VARCHAR(30) COMMENT '调度编号',plateNum VARCHAR(10) COMMENT '车牌号',timeStr VARCHAR(20) COMMENT '时间戳',lng VARCHAR(20) COMMENT '经度',lat VARCHAR(20) COMMENT '纬度',dbtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据入库时间',PRIMARY KEY(deployNum, plateNum, timeStr))
2.编写程序
首先引入mysql的驱动
mysql mysql-connector-java 5.1.44
2.1 jdbc写入mysql
package com.hoult.Streaming.workimport java.sql.{Connection, DriverManager, PreparedStatement}import java.util.Propertiesimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql.ForeachWriterclass JdbcHelper extends ForeachWriter[BusInfo] { var conn: Connection = _ var statement: PreparedStatement = _ override def open(partitionId: Long, epochId: Long): Boolean = { if (conn == null) { conn = JdbcHelper.openConnection } true } override def process(value: BusInfo): Unit = { //把数据写入mysql表中 val arr: Array[String] = value.lglat.split("_") val sql = "insert into car_gps(deployNum,plateNum,timeStr,lng,lat) values(?,?,?,?,?)" statement = conn.prepareStatement(sql) statement.setString(1, value.deployNum) statement.setString(2, value.plateNum) statement.setString(3, value.timeStr) statement.setString(4, arr(0)) statement.setString(5, arr(1)) statement.executeUpdate() } override def close(errorOrNull: Throwable): Unit = { if (null != conn) conn.close() if (null != statement) statement.close() }}object JdbcHelper { var conn: Connection = _ val url = "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf8" val username = "root" val password = "123456" def openConnection: Connection = { if (null == conn || conn.isClosed) { val p = new Properties Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection(url, username, password) } conn }}2.2 通过foreach来写入mysql
package com.hoult.Streaming.workimport com.hoult.structed.bean.BusInfoimport org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}object KafkaToJdbc { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root") //1 获取sparksession val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName(KafkaToJdbc.getClass.getName) .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ //2 定义读取kafka数据源 val kafkaDf: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "linux121:9092") .option("subscribe", "test_bus_info") .load() //3 处理数据 val kafkaValDf: DataFrame = kafkaDf.selectExpr("CAST(value AS STRING)") //转为ds val kafkaDs: Dataset[String] = kafkaValDf.as[String] //解析出经纬度数据,写入redis //封装为一个case class方便后续获取指定字段的数据 val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_)).filter(_ != null) //将数据写入MySQL表 busInfoDs.writeStream .foreach(new JdbcHelper) .outputMode("append") .start() .awaitTermination() }}2.4 创建topic和从消费者端写入数据
kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic test_bus_info --partitions 2 --replication-factor 1kafka-console-producer.sh --broker-list linux121:9092 --topic test_bus_info
到此,相信大家对"Spark-Streaming如何处理数据到mysql中"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
内容
时间
学习
实用
更深
兴趣
字段
实用性
实际
操作简单
数据源
数据表
方法
更多
朋友
消费者
程序
纬度
经度
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
消费者数据库技术员
网络安全知识答题答案查询
电脑c盘服务器运行失败
nosql数据库查询
java获取服务器信息
服务器端口是什么
网络安全江苏市场份额
发生内部服务器错误
北京网络安全大会郭盛华演讲
2014 数据库市场
重庆网络安全微视频
双色球 数据库 下载
浅谈网络安全问题
网络安全大赛世界冠军奖金
ad域服务器设置一个管理员
通讯软件开发市价
家用网络安全性是什么
安全风险数据库的内容
松江区企业数据库研发承诺守信
互联网科技如何
修改数据库的工具如何测试
无锡凌空网络技术
交大网络技术基础试卷
软件开发时代进步
成立网络安全公司
软件开发 英语自我介绍
数据库自增值
迅雷数据库
鲲鹏服务器价钱
php数据库列表页面