如何将数据按指定格式存入zookeeper
发表于:2025-11-07 作者:千家信息网编辑
千家信息网最后更新 2025年11月07日,这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper
千家信息网最后更新 2025年11月07日如何将数据按指定格式存入zookeeper
这篇文章主要讲解了"如何将数据按指定格式存入zookeeper",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何将数据按指定格式存入zookeeper"吧!
环境:
scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zkimport java.util.concurrent.TimeUnitimport org.apache.curator.framework.CuratorFrameworkFactoryimport org.apache.curator.framework.recipes.locks.InterProcessMuteximport org.apache.curator.retry.ExponentialBackoffRetryimport org.slf4j.LoggerFactoryimport scala.collection.JavaConversions._import scala.collection.mutable/** * Created by ganwei on 2018/08/21 * 要求: * 1 通过storeOffsets方法把数据存入zookeeper中。 * 存储格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * 2 通过obtainOffsets方法把存入的数据读取出来 * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */object ZkConnectApp{ val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString("172.16.100.31:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("consumers") .build() client.start() client } def lock(path: String)(body: => Unit) { val lock = new InterProcessMutex(client, path) lock.acquire() try { body } finally { lock.release() } } def tryDo(path: String)(body: => Unit): Boolean = { val lock = new InterProcessMutex(client, path) if (!lock.acquire(10, TimeUnit.SECONDS)) { LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出") return false } try { LOG.info("获准运行") body true } finally { lock.release() LOG.info(s"释放锁 {$path}") } } //zookeeper创建路径 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } /** * OffsetRange类定义(偏移量对象) * 用于存储偏移量 */ case class OffsetRange( val topic:String, // 主题 val partition:Int, // 分区 val fromOffset:Long, // 起始偏移量 val utilOffset:Long // 终止偏移量 ) /** * zookeeper存储offset的方法 * 写入格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * @param OffsetsRanges * @param groupName */ def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={ val offsetRootPath = s"/"+groupName if (client.checkExists().forPath(offsetRootPath) == null) { client.create().creatingParentsIfNeeded().forPath(offsetRootPath) } for(els <- OffsetsRanges ){ val data = String.valueOf(els.utilOffset).getBytes val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}" // 创建路径 ensurePathExists(path) // 写入数据 client.setData().forPath(path, data) } } /** * TopicAndPartition类定义(偏移量key对象) * 用于提取偏移量 */ case class TopicAndPartition( topic:String, // 主题 partition:Int // 分区 ) /** * zookeeper提取offset的方法 * @param topic * @param groupName * @return */ def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={ // 定义一个空的HashMap val maps = mutable.HashMap[TopicAndPartition,Long]() // offset的路径 val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition" // 判断路径是否存在 val stat = client.checkExists().forPath(s"$offsetRootPath") if (stat == null ){ println(stat) // 路径不存在 就将路径打印在控制台,检查路径 }else{ // 获取 offsetRootPath路径下一级的所有子目录 // 我们这里是获取的所有分区 val children = client.getChildren.forPath(s"$offsetRootPath") // 遍历所有的分区 for ( lines <- children ){ // 获取分区的数据 val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong // 将 topic partition 和数据赋值给 maps maps(TopicAndPartition(topic,lines.toInt)) = data } } // 按partition排序后 返回map对象 maps.toList.sortBy(_._1.partition).toMap } def main(args: Array[String]) { //定义初始化数据 val off1 = OffsetRange("ruoze_offset_topic",0,0,7) val off2 = OffsetRange("ruoze_offset_topic",1,0,3) val off3 = OffsetRange("ruoze_offset_topic",2,0,5) val arr = Array(off1,off2,off3) //获取到namespace// println(client.getNamespace) // 创建路径// val offsetRootPath = "/G322"// if (client.checkExists().forPath(offsetRootPath) == null) {// client.create().creatingParentsIfNeeded().forPath(offsetRootPath)// } //存储值 storeOffsets(arr,"G322") //获取值 /** * 输出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */ val result = obtainOffsets("ruoze_offset_topic","G322") for (map <- result){ println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2) } }}感谢各位的阅读,以上就是"如何将数据按指定格式存入zookeeper"的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
路径
格式
偏移
方法
存储
对象
学习
主题
任务
内容
版本
输出
运行
子目
子目录
就是
思路
情况
控制台
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
身边的网络安全手抄报
美团软件开发实习招聘流程
购物软件开发实习项目内容
软件开发风险因素
我的世界搭路练习服务器在哪
spes服务器
网络安全技术专升本
电脑自带时间服务器地址
数据库系统概论数据对象的创建
vb软件开发流程
榆次展厅大屏导航软件开发公司
sap底层数据库 唯一id
云跟数据库的区别吗
学计算机网络技术理由
迈瑞社招软件开发面试流程
湖北拓商网络技术公司招聘
双路卷积神经网络技术图片
adsl vpn服务器
小度与网络安全
扫描服务器开放的端口
世界互联网大会金融科技
公安厅公共信息网络安全监察
植物大战僵尸ol服务器
asu数据库包括nature吗
浅谈多媒体软件开发流程
软件开发测试工作前景
逐风网络安全实验室
街道网络安全风险评估报告
软件开发前后端怎么连接
第一届网鼎杯网络安全大赛