千家信息网

Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务

发表于:2025-12-07 作者:千家信息网编辑
千家信息网最后更新 2025年12月07日,小编给大家分享一下Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!之前我们发送和接收到队列中的消息,
千家信息网最后更新 2025年12月07日Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务

小编给大家分享一下Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

之前我们发送和接收到队列中的消息,现在是时候在 RabbitMQ 中引入完整的消息传递模式了。

让我们快速回顾一下之前了解的内容:

生产者(producer):发送消息的程序

队列(queue):存储消息的缓冲器

消费者(consumer):接收消息的程序

RabbitMQ 消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换器(exchange)。交换器非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是多个队列,或者是直接忽略消息。这些规则是通过交换器类型(exchange type)来定义的。

有几个可供选择的交换器类型:direct, topic, headers 和 fanout。我们在这里主要说明最后一个 --fanout。fanout exchange 很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。这正是我们所需要的。

先创建一个 fanout 类型的交换器,命名为"tut.fanout"

@Profile({"tut3", "pub-sub"})@Configurationpublic class Tut3Config { /**     * 定义一个 Exchange     *     * @return FanoutExchange     */     @Bean    public FanoutExchange queue() {        return new FanoutExchange("tut.fanout");    }    /**     * 消费者一端的配置:queues、bindings     */    @Profile("receiver")    private static class ReceiverConfig {        @Bean         public Queue autoDeleteQueue1() {            return new AnonymousQueue();        }        @Bean         public Queue autoDeleteQueue2() {            return new AnonymousQueue();        }        @Bean         public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {            return BindingBuilder.bind(autoDeleteQueue1).to(fanout);        }        @Bean         public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {            return BindingBuilder.bind(autoDeleteQueue2).to(fanout);        }        @Bean         public Tut3Receiver receiver() {            return new Tut3Receiver();        }    }    @Bean @Profile("sender")    public Tut3Sender sender() {        return new Tut3Sender();    }}

和之前的两个教程一样,我们定义了两个 profiles(tut3、pub-sub)以保证我们能运行指定的示例。然后创建了一个消费者端的配置,并在其中定义了两个 AnonymousQueue 和两个 Binding 以便将相应的队列和交换器绑定起来。

交换器列表

rabbitmqctl 能够列出服务器上所有的交换器。这个列表中有一些叫做 amq.* 的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

$ sudo rabbitmqctl list_exchanges

Listing exchanges …

logs fanout

amq.direct direct

amq.topic topic

amq.fanout fanout

amq.headers headers

…done.

未命名交换器(Nameless exchange)

在之前,我们对交换一无所知,但依然能够将消息发送到队列。这是怎么回事?因为我们使用了默认的交换器,它是用空字符串("")来标识的。

我们之前是这样发送消息的:

template.convertAndSend(queue.getName(), message);

而具体源码如下,可以看出之前我们使用的是默认的空字符定义的交换器

// RabbitTemplate@Overridepublic void convertAndSend(String routingKey, final Object object) throws AmqpException {convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);}private volatile String exchange = DEFAULT_EXCHANGE;/\*_ Alias for amq.direct default exchange. _/private static final String DEFAULT_EXCHANGE = "";

临时队列(Temporary queues)

我们之前使用的是具有指定名称的队列(hello 和 work-queues)。能够命名队列对我们而言至关重要 -- 我们需要将工作进程指向同一个队列。当我们想要在生产者和消费者之间共享队列时,给队列一个名字很重要。

但是我们的日志记录器并不是这样。我们希望记录到所有的日志消息,而不仅仅是它们的一部分。我们也只对当前的消息感兴趣,而对旧的消息不感兴趣。为了解决这个问题,我们需要做两件事情。

首先,每当我们连接到 RabbitMQ,我们需要一个新的空的队列。我们可以创建一个具有随机名称的队列,或者最好让服务器为我们选择一个随机的队列名。

其次,当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。

在 Spring AMQP 中,我们可以使用 AnonymousQueue 来作为临时队列。它是一个非持久化的、独占的、可自动删除的队列。

@Beanpublic Queue autoDeleteQueue1() {    return new AnonymousQueue();}@Beanpublic Queue autoDeleteQueue2() {    return new AnonymousQueue();}

此时我们的队列名字看起来会像这样:amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定(Bindings)

我们已经创建了一个 fanout exchange 和两个队列,现在我们需要告诉交换所将消息发送到我们的队列。交换器和队列之间的关系称为绑定。在上面的 Tut3Config 中,您可以看到我们有两个绑定,每个 AnonymousQueue 都有一个绑定。

@Beanpublic Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);}

绑定列表

你可以使用 rabbitmqctl list_bindings 列出所有现存的绑定。

rabbitmqctl list_bindings

代码整合

生产者

发出消息的生产者程序与以前的没有多大区别。最重要的变化是我们现在要发布消息给我们的 fanout exchange,而不是默认的交换器。发送时我们需要提供一个 routingKey,但是对于 fanout exchange,这个值将被忽略。

public class Tut3Sender {@Autowirdprivate AmqpTemplate template;@Overrideprivate FanoutExchange fanout; private int dots = 0; private int count = 0;@Scheduled(fixedDelay = 1000, initialDelay = 500)    public void send() {        StringBuilder builder = new StringBuilder("Hello");        if (dots++ == 3) {            dots = 1;        }        for (int i = 0; i < dots; i++) {            builder.append('.');        }        builder.append(Integer.toString(++count));        String message = builder.toString();        template.convertAndSend(fanout.getName(), "", message);        System.out.println(" [x] Sent '" + message + "'");    }}

说明:

禁止发布到不存在的交换器
如果没有任何队列绑定到交换器,消息将丢失
消费者 了解springcloud架构可以加求求:三五三六二四七二五九
我们定义两个消费者分别监听两个队列

public class Tut3Receiver {    @RabbitListener(queues = "#{autoDeleteQueue1.name}")    public void receiver1(String in) throws InterruptedException {        receive(in, 1);    }    @RabbitListener(queues = "#{autoDeleteQueue2.name}")    public void receiver2(String in) throws InterruptedException {        receive(in, 2);    }    private void receive(String in, int instance) throws InterruptedException {        StopWatch watch = new StopWatch();        watch.start();        System.out.println("instance " + instance + " [x] Received '" + in + "'");        doWork(in);        watch.stop();        System.out.println("instance " + instance + " [x] Done in "                + watch.getTotalTimeSeconds() + "s");    }    private void doWork(String in) throws InterruptedException {        for (char ch : in.toCharArray()) {            if (ch == '.') {                Thread.sleep(1000);            }        }    }}

运行

maven 编译

mvn clean package -Dmaven.test.skip=true

运行(建议先运行消费者,再运行生产者)

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut3,receiver --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut3,sender --tutorial.client.duration=60000

输出

// Sender

Ready … running for 60000ms

[x] Sent 'Hello.1'

[x] Sent 'Hello…2'

[x] Sent 'Hello…3'

// Receiver

Ready … running for 60000ms

instance 2 [x] Received 'Hello.1'

instance 1 [x] Received 'Hello.1'

instance 2 [x] Done in 1.002s

instance 1 [x] Done in 1.002s

instance 2 [x] Received 'Hello…2'

instance 1 [x] Received 'Hello…2'

instance 2 [x] Done in 2.003s

instance 1 [x] Done in 2.003s

instance 1 [x] Received 'Hello…3'

instance 2 [x] Received 'Hello…3'

instance 1 [x] Done in 3.011s

instance 2 [x] Done in 3.011s

看完了这篇文章,相信你对"Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

队列 消息 交换器 两个 消费者 消费 程序 运行 发布者 生产者 生产 整合 重要 名字 类型 商务 电子 电子商务 之间 兴趣 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 济南成人软件开发培训机构 网络安全流量编排 小程序页面显示云数据库照片 软件开发增值税多少钱 无线网络安全性变成无 长汀优爱网络技术有限公司 数据库2000超级口令 计算机软件在网络安全中的应用 数据库建立索引是干啥的 上海服务软件开发学习 网络安全监管审查 中国商品诚信数据库怎样 辽宁互联网新科技 mc服务器地图转换 宿城区自动化网络技术哪家好 数据库评估期已过怎么激活 服务器容易受损吗 青浦区常规软件开发厂家价格 服务器是怎样应付原料涨价呢 完美世界电竞连不上服务器 数据库表之间的复制数据结构 上海翰友网络技术有限公司电话 浅谈数据库原理与应用 仙游县达埔服务器 华为武研所软件开发 c 数据库管理工具 服务器机柜加盟 服务器如何输入指令进入桌面 无线网络技术的现状 摄像头怎样添加到视频服务器
0