千家信息网

RabbitMq的5种开发方案是什么

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章将为大家详细讲解有关RabbitMq的5种开发方案是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。1、安装windows安装rabbitm
千家信息网最后更新 2025年12月01日RabbitMq的5种开发方案是什么

这篇文章将为大家详细讲解有关RabbitMq的5种开发方案是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1、安装

windows安装rabbitmq

mac安装rabbitmq 需要注意的是,mac安装rabbitmq,启动的时候命令前,需要加 sudo,不然会报错误

2、rabbitmq 开发概念词

2.1 Producer(生产者)

2.2 Consumer(消费者)

2.3 Exchange(交换机)

2.4 Queue(队列)

2.5 rountingKey(交换机与队列之间的关系)

3、RabbitMQ开发方案

官网的6中模式,可以点开这个网址,显示6中模式,第6中模式RPC远程调用我们不需要用该模式,所以我们只要关注前五种就可以了。

接下来我们就直接简单教学rabbitmq的简单使用

3.0 代码讲解
/** * 声明队列,五个参数列表,如果直接使用默认channel.queueDeclare("queue"),那么其他参数都会自动默认设置属性,所以一般我们几乎都默认它 * String queue  队列名称 * boolean durable 队列是需要持久化,意思就是rabbitmq重启的时候,如果不是持久化,那么该队列就会消失,默认true * boolean exclusive 如果你想创建一个只有自己可见的队列,不允许其它用户访问RabbitMQ允许你将一个Queue声明成为排他性的,只对首次声明它的连接(Connection)可见,会在其连接断开的时候自动删除。所以我们开发中一般不需要此操作,默认false * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重启的时候,如果为true,那么关闭期间接受的消息就会自动消失,默认false * Map arguments 这个参数我们只会在使用延迟队列中才会用到,就是延迟队列的相关配置等属性 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null); */channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** * 绑定队列到交换机  * String queue 队列名 * String exchange 交换机名 * String routingKey 绑定关系 如 大头儿子,小头爸爸 他们的rountingKey就是父子 */channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
3.1 pom.xml

版本随意,本博客任何版本高版本也行。

   com.rabbitmq   amqp-client   3.4.1   org.slf4j   slf4j-log4j12   1.7.7   org.apache.commons   commons-lang3   3.3.2   org.springframework.amqp   spring-rabbit   1.4.0.RELEASE
3.2 工具类准备
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;public class ConnectionUtil {    public static Connection getConnection() throws Exception {        //定义连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置服务地址        factory.setHost("localhost");        //端口        factory.setPort(5672);        //设置账号信息,用户名、密码、vhost        factory.setVirtualHost("/");        factory.setUsername("guest");        factory.setPassword("guest");        // 通过工程获取连接        Connection connection = factory.newConnection();        return connection;    }}
3.3 简单模式(simple)

理解:该rabbit服务器只有一个单一的队列

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send {    private final static String QUEUE_NAME = "test_queue";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        // 从连接中创建通道        Channel channel = connection.createChannel();        // 声明(创建)队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 消息内容        String message = "Hello World!";        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());        System.out.println(" [x] Sent '" + message + "'");        //关闭通道和连接        channel.close();        connection.close();    }}
public class Recv {    private final static String QUEUE_NAME = "mujiutian_queue";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        //创建channel        Channel channel = connection.createChannel();        //创建队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        //监听队列,发送消息        channel.basicConsume(QUEUE_NAME, true, consumer);        // 获取消息,该线程一直进行        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println("获取到消息"+message);        }    }}

先打开rece的main函数,这样就可以执行send发送消息。

3.4 工作模式(work)

理解:rabbit服务器有一个exchange(交换机),该交换机下有两个队列,总共发送了100条消息,A队列效率高可以抢到80条消息给消费者,B队列只能抢到20条消息给发送者,他们的总和是100条,为工作模式。

先执行,先执行消费者main函数,在看结果图:

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send {    private final static String QUEUE_NAME = "test_queue_work";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        for (int i = 0; i < 100; i++) {            // 消息内容            String message = "" + i;            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());            System.out.println(" [x] Sent '" + message + "'");        }        channel.close();        connection.close();    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv {    private final static String QUEUE_NAME = "test_queue_work";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(10);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println("收到消息'" + message + "'");            // 返回确认状态            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv2 {    private final static String QUEUE_NAME = "test_queue_work";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成状态        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}

如图:

// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);

跟这个有关系,这也就是我们所说的工作模式

3.5 订阅模式(Publish/Subribe)

理解:就是群发,该交换机下的所有队列,都会接受相同的所有交换机发来的消息,类似于qq群一样

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send {    private final static String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");        // 消息内容        String message = "Hello World!";        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());        System.out.println(" [x] Sent '" + message + "'");        channel.close();        connection.close();    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv {    private final static String QUEUE_NAME = "test_queue_work";    private final static String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv2 {    private final static String QUEUE_NAME = "test_queue_work2";    private final static String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
3.6 路由模式(Routing)

理解就是:你是人,如果性别是女,请进女厕,是男性,请进男厕,如同,一个交换机下面的多个队列,根据rountingKey判断该接受的消息

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send {    private final static String EXCHANGE_NAME = "test_exchange_direct";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "direct");        // 消息内容        String message = "Hello World!";        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());        System.out.println(" [x] Sent '" + message + "'");        channel.close();        connection.close();    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv {    private final static String QUEUE_NAME = "test_queue_work";    private final static String EXCHANGE_NAME = "test_exchange_direct";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv2 {    private final static String QUEUE_NAME = "test_queue_work2";    private final static String EXCHANGE_NAME = "test_exchange_direct";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
3.7 通配符模式(Topics)

理解就是:路径aas.# A队列接受这种所有路径,B队列接受路径下的所有队列aab.#

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;public class Send {    private final static String EXCHANGE_NAME = "test_exchange_topic";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        // 消息内容        String message = "Hello World!";        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());        System.out.println(" [x] Sent '" + message + "'");        channel.close();        connection.close();    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv {    private final static String QUEUE_NAME = "test_queue_topic_work";    private final static String EXCHANGE_NAME = "test_exchange_topic";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.QueueingConsumer;public class Recv2 {    private final static String QUEUE_NAME = "test_queue_topic_work2";    private final static String EXCHANGE_NAME = "test_exchange_topic";    public static void main(String[] argv) throws Exception {        // 获取到连接以及mq通道        Connection connection = ConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 绑定队列到交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");        // 同一时刻服务器只会发一条消息给消费者        channel.basicQos(1);        // 定义队列的消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列,手动返回完成        channel.basicConsume(QUEUE_NAME, false, consumer);        // 获取消息        while (true) {            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println(" [x] Received '" + message + "'");            Thread.sleep(10);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);        }    }}

关于RabbitMq的5种开发方案是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0