spark访问hbase
发表于:2025-11-06 作者:千家信息网编辑
千家信息网最后更新 2025年11月06日,import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.
千家信息网最后更新 2025年11月06日spark访问hbase
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.spark.rdd.NewHadoopRDDval conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE, "tmp")var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])hBaseRDD.count()import scala.collection.JavaConverters._hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => {( keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getRow, keyValues.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue)}).take(10)hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toChar).mkString)}).take(10)conf.set(TableInputFormat.INPUT_TABLE, "test1")//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {( row._1.map(_.toChar).mkString, row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue.map(_.toInt).mkString)}).take(10)import java.nio.ByteBufferhBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {( row._1.map(_.toChar).mkString, ByteBuffer.wrap(row._2.asScala.reduceLeft { (a, b) => if (a.getTimestamp > b.getTimestamp) a else b }.getValue).getLong)}).take(10)//conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf")conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])import java.nio.ByteBufferhBaseRDD.map(tuple => tuple._2).map(result => { ( result.getRow.map(_.toChar).mkString, ByteBuffer.wrap(result.value).getLong )}).take(10)val conf = HBaseConfiguration.create()conf.set(TableInputFormat.INPUT_TABLE, "test1")var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString)rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
关于暑假网络安全
服务器电源哪家优惠
大禹网络安全产品
梦幻西游服务器人多进不去咋办
mysql数据库历史
江苏无忧互联网科技有限公司
数据库all不等于in
怎么登陆云端服务器
猪八戒网可以接到软件开发单子吗
关爱青少年网络安全学校总结
企业上网行为管理服务器
映射到数据库中
免费代码服务器
计算机网络技术的学习
软件开发效率指标
大型监控系统流媒体服务器
影梭 服务器搭建
原神7开头服务器
湖北c语言软件开发服务
如何把数据库设置为电脑启动
青少年网络安全广播稿
人工网络安全
sftp服务器中文版
网络安全人员驻场一般驻多久
软件开发工资条
部队需要网络安全技术
五年级网络安全答题库
无线ac服务器管理页
杭州侣程网络技术有限公司官网
网关的数据怎么储存在数据库