Redisson中怎么实现一个延时消息组件
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。定义主题队列注解@Target({Eleme
千家信息网最后更新 2025年12月03日Redisson中怎么实现一个延时消息组件
Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
定义主题队列注解
@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Componentpublic @interface RMessage { /** * 消息队列 * @return */ String queue(); /** * 主题 * @return */ String topic() default "system";}springboot启动监听初始化任务队列与消息主题,消费者订阅主题
@Slf4j@Componentpublic class RMessageListener implements ApplicationListener{ /** * consumer monitoringMethod monitorMessage */ private final static String METHOD_MONITOR_MESSAGE = "monitorMessage"; /** * redisson topic name */ private final static String ATTRIBUTE_NAME_TOPIC = "topic"; /** * redisson messageQueue name */ private final static String ATTRIBUTE_NAME_QUEUE = "queue"; /** * redisson queue map */ public static Map > messageQueue = new ConcurrentHashMap<>(); /** * redisson offQueue map */ public static Map > offQueue = new ConcurrentHashMap<>(); /** * redisson topic map */ public static Map topicMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false); provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class)); String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName(); Set beanDefinitions = provider.findCandidateComponents(basePackage); ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory(); mqInit(beanDefinitions, beanFactory); provider.clearCache(); provider.resetFilters(false); provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class)); Set consumers = provider.findCandidateComponents(basePackage); consumerSubscribe(beanFactory, consumers); } /** * consumer subscription news * * @param beanFactory * @param consumers */ private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set consumers) { consumers.forEach(beanDefinition -> { log.info("rMessage init consumer {}",beanDefinition.getBeanClassName()); try { Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName())); Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE); ReflectionUtils.invokeMethod(method,bean); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } }); } /** * Parameter initialization * * @param beanDefinitions * @param beanFactory */ private void mqInit(Set beanDefinitions,final ConfigurableListableBeanFactory beanFactory) { RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class); beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{ AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); MergedAnnotation mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class); String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE); String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC); String shortName = topicName+"."+queryName; RBlockingDeque super Serializable> blockingDeque = redissonClient.getBlockingDeque(shortName); messageQueue.put(shortName,blockingDeque); RDelayedQueue super Serializable> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); offQueue.put(shortName,delayedQueue); RTopic topic = redissonClient.getTopic(topicName); topicMap.put(shortName,topic); }); }}
抽象队列主题列表
public abstract class AbstractQueue { Map> offQueue = RMessageListener.offQueue; Map> messageQueue = RMessageListener.messageQueue; Map topicMap = RMessageListener.topicMap; protected RDelayedQueue super Serializable> getRDelayedQueue() { return offQueue.get(shortName()); } protected RBlockingDeque super Serializable> getMessageQueue() { return messageQueue.get(shortName()); } private String shortName() { Annotation[] annotations = this.getClass().getAnnotations(); RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage) .map(annotation -> (RMessage)annotation).findAny().get(); String queryName = rMessage.queue(); String topicName = rMessage.topic(); return topicName+"."+queryName; } protected RTopic getTopic() { return topicMap.get(shortName()); }} 抽象生产者模板
@Slf4jpublic abstract class RMessageProducerextends AbstractQueue { /** * 发送延时消息 * @param message * @param delay * @param timeUnit */ public void sendMessage(T message, long delay, TimeUnit timeUnit) { log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name()); super.getRDelayedQueue().offer(message,delay,timeUnit); super.getTopic().publish(this.hashCode()); } /** * 发送异步消息 * @param message */ public void sendMessage(T message) { this.sendMessage(message,0,TimeUnit.MILLISECONDS); }}
抽象消费者模板
@Slf4jpublic abstract class RMessageConsumerextends AbstractQueue { public void monitorMessage() { CompletableFuture.runAsync(this::pastConsumption); super.getTopic().addListener(Object.class,(c,m)-> { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } }); } protected abstract void useMessage(T message); public void pastConsumption() { while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } } }}
具体使用
生产者
@RMessage(queue = "redisQuery",topic = "order")public class RedissonProducer extends RMessageProducer{}@RestController@RequestMapping("producer")@AllArgsConstructorpublic class ProducerController { private RedissonProducer redissonProducer; @PostMapping public String send() { HashMap map = new HashMap<>(); map.put("name","张三"); map.put("time", "测试顺序第二条"+LocalDateTime.now()); redissonProducer.sendMessage(map,5, TimeUnit.MINUTES); return "send msg"; }}
消费者
@RMessage(queue = "redisQuery",topic = "order")public class RedissonConsumer extends RMessageConsumer{ @Override protected void useMessage(HashMap message) { System.out.println("接收到消息:"+message); }}
关于 Redisson中怎么实现一个延时消息组件问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
消息
主题
队列
消费者
问题
消费
组件
更多
模板
生产者
帮助
生产
解答
易行
简单易行
任务
内容
小伙
小伙伴
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
安徽综合软件开发要多少钱
创建数据库设置数组
共享打印机和打印服务器区别
婴儿听力软件开发
彩虹六号固定服务器
网络安全教育的手抄报
烽火服务器上市公司
我的世界服务器怎么管理
北京国信诚商互联网科技
网络安全审计哪个品牌好
天涯数据库技术路线图
常用的网络安全工具
我的世界2b2t服务器入侵视频
信息技术软件开发规定
网络技术公司大全
伍佰万互联网科技怎么赚钱
网站换个服务器
网络安全的自救方法
数据库 逻辑删除
创建数据库总结
用户数据库使用教程
税盘数据库异常
如何知道买家的ip数据库
具前景的零信任网络安全
视频服务器人脸识别
星创软件开发官网
软科学成果数据库
武安无线网络技术
医院的信息科网络安全
无线网络技术有什么功能