JMS消息队列ActiveMQ(发布/订阅模式)
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,消费者1(Consumer)--订阅(subcribe)-->主题(Topic)package com.java1234.activemq2;import javax.jms.Connection;i
千家信息网最后更新 2025年12月03日JMS消息队列ActiveMQ(发布/订阅模式)
消费者1(Consumer)--订阅(subcribe)-->主题(Topic)
package com.java1234.activemq2;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息消费者-消息订阅者一 * @author Administrator * */public class JMSConsumer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息的消费者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列 destination=session.createTopic("FirstTopic1"); messageConsumer=session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}package com.java1234.activemq2;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 消息监听-订阅者一 * @author Administrator * */public class Listener implements MessageListener{ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}消费者2(Consumer)--订阅(subcribe)-->主题(Topic)
生产者(Producer)--发布(publish)-->主题(Topic)
package com.java1234.activemq2;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;/** * 消息生产者-消息发布者 * @author Administrator * */public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM=10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("FirstTopic1");//创建主题 messageProducer=session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i
消息
工厂
订阅
消费者
消费
主题
生产者
生产
订阅者
队列
地址
实例
密码
用户
用户名
目的
目的地
线程
监听
发布者
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
ftp服务器被动端口
计算机网络技术其他名称
站长之家怎么查解析服务器
海口网络技术优化方案
冬奥会期间企业网络安全
带数据库的学生管理系统
腾信互联网科技有限公司
移动的固网宽带服务器
网络安全为人民靠人民示意图
socket服务器编码
连接数据库表格
国内破产预测数据库
镇江数据网络技术怎么样
数据库读取排序
中国经济特区数据库
杭州软件开发公司贵吗
专业做政务软件开发的公司
软件开发学多少才能工作
网吧电脑网络安全么
数据库表的结构应该有
工商银行软件开发笔试题库
银川网络安全朝阳行业
武汉金融系统软件开发公司
车载导航软件开发介绍
加强中小学校园网络安全
技术好的触控拍照软件开发公司
网络技术服务费包括啥
马鞍山门店管理软件开发公司
中国的网络服务器在哪
网络安全产业将迎新一轮发展