Zookeeper Queue队列怎么实现
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有
千家信息网最后更新 2025年12月03日Zookeeper Queue队列怎么实现
本篇内容介绍了"Zookeeper Queue队列怎么实现 "的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1: Barries: 栅栏,见面知意。
2:Queue:Queue也就是我们所说的队列
1:Barries:
1.1: 是指所有的现场都达到 barrier后才能进行后续的计算
1.2:所有的线程都完成自己的计算以后才能离开barrier
进入栅栏: 1,新建一个根节点 "/root" 2, 想进入barrier的线程在 "/root"下建立一个字节点"/root/c-i" 3,循环监听"/root"孩子节点数的变化,每当其达到Size的时候就说明有Size个线程都已经达到了Barrier的要求。
2:Queue:就是指一个生产者或消费者的模型
离开Barrier 1: 想离开Barrier的现场删除掉在"/root" 下建立的子节点 2: 循环监听"/root" 孩子节点数目的变化,当Size减少到0的时候它就可以离开了。
3 :Queue 队列的实现
1 : 建立一个根节点"/root"2 : 生产线程在"/root" 下建立一个SEQUENTAIL的节点3 : 消费线程检查"/root" 如果没有就循环的监听"/root" 节点的变化,直到它有自己的子节点,删除序号最小子字节点。
package sync; import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.util.List;import java.util.Random; import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; String root; //同步原语 SyncPrimitive(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); //建立Zookeeper连接,并且指定watcher zk = new ZooKeeper(address, 3000, this); //初始化锁对象 mutex = new Integer(-1); System.out.println("Finished starting ZK:" + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } } @Override synchronized public void process(WatchedEvent event) { synchronized (mutex) { //有事件发生时,调用notify,使其他wait()点得以继续 mutex.notify(); } } static public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; if (zk != null) { try { //一个barrier建立一个根目录 Stat s = zk.exists(root, false); //不注册watcher if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } try { //获取自己的主机名 name = new String(InetAddress.getLocalHost() .getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } boolean enter() throws KeeperException, InterruptedException { //在根目录下创建一个子节点.create和delete都会触发children wathes,这样getChildren就会收到通知,process()就会被调用 zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //一直等,直到根目录下的子节点数目达到size时,函数退出 while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); //释放mutex上的锁 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //删除自己创建的节点 zk.delete(root + "/" + name, 0); //一直等,直到根目录下有子节点时,函数退出 while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } static public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { //一个queue建立一个根目录 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } } //参数i是要创建节点的data boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); //根目录下创建一个子节点,因为是SEQUENTIAL的,所以先创建的节点具有较小的序号 zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List list = zk.getChildren(root, true); //并不能保证list[0]就是序号最小的 //如果根目录下没有子节点就一直等 if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } //找到序号最小的节点将其删除 else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tmp = new Integer(s.substring(7)); if (tmp < min) min = tmp; } System.out.println("Temporary value:" + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String[] args) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } private static void barrierTest(String[] args) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try { boolean flag = b.enter(); System.out.println("Enter barrier:" + args[2]); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e) { } catch (InterruptedException e) { } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try { b.leave(); } catch (KeeperException e) { } catch (InterruptedException e) { } System.out.println("Left barrier"); } private static void queueTest(String[] args) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input:" + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + 1); } catch (KeeperException e) { } catch (InterruptedException e) { } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) try { int r = q.consume(); System.out.println("Item:" + r); } catch (KeeperException e) { i--; } catch (InterruptedException e) { } } } } "Zookeeper Queue队列怎么实现 "的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
节点
根目录
线程
队列
序号
变化
循环
监听
最小
个子
内容
函数
字节
孩子
就是
数目
时候
更多
栅栏
知识
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
我的世界网易版JOJO服务器号
义乌市星野网络技术有限公司
2020年潍坊市网络安全
服务器怎么虚拟化主机
电脑总弹出服务器正在运行
自行软件开发会计分录
我的世界服务器运行进不去
网络安全问题约谈内容
分布式数据库书籍哪个好
江苏网络服务器机柜厂家云空间
网络安全文明上网手抄报一等奖
网络安全有必要做吗
杨硕河北省公安厅网络安全总队
国产嵌入式软件开发解决方案
镇江浪潮服务器
明日之后圣尤纳服务器在哪里
网络安全法和网络知识手抄报
广州新华互联网科技学校信息
除了sql还有什么数据库
csgo怎么切换国家服务器
php反序列化数据库
我的世界网易版JOJO服务器号
宽带显示未找到服务器怎么办
400 数据库
人人商城 消息提醒数据库
原数据库找不到了
数据库系统奔溃的原因
域服务器授权其他计算机远程
2个服务器共用一个显示器
紫光同创软件开发