Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。一
千家信息网最后更新 2025年12月02日Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis
今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
一、Canal使用RocketMQ同步MySQL
Canal结合RocketMQ同步MySQL
二、 同步数据到Redis
2.1 安装Redis
略
2.2 Redis配置
略
2.3 SpringBoot配置
2.3.1 引入依赖
com.alibaba.otter canal.client 1.1.4 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 javax.persistence persistence-api
2.3.2 通用代码
SQLType.java
import lombok.AccessLevel;import lombok.NoArgsConstructor;/** * Canal监听SQL类型 * * @author Yu * @date 2019/09/08 00:18 **/@NoArgsConstructor(access = AccessLevel.PRIVATE)public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**删除*/ public static final String DELETE = "DELETE";}User.java
import lombok.Data;import javax.persistence.Id;import java.io.Serializable;/** * UserPo对象 * * @author Yu * @date 2019/09/08 14:13 **/@Datapublic class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex;}CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage;import java.util.Collection;/** * Canal同步服务 * * @author Yu * @date 2019/09/08 00:00 **/public interface CanalSynService{ /** * 处理数据 * * @param flatMessage CanalMQ数据 */ void process(FlatMessage flatMessage); /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增数据 */ void insert(Collection list); /** * 更新 * * @param list 更新数据 */ void update(Collection list); /** * 删除 * * @param list 删除数据 */ void delete(Collection list);}
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage;import com.google.common.collect.Sets;import com.taco.springcloud.canal.constant.SQLType;import com.taco.springcloud.core.component.ApplicationContextHolder;import com.taco.springcloud.core.exception.BizException;import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;import com.taco.springcloud.core.utils.JsonUtil;import com.taco.springcloud.redis.utils.RedisUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.util.ReflectionUtils;import javax.annotation.Resource;import javax.persistence.Id;import java.lang.reflect.Field;import java.lang.reflect.ParameterizedType;import java.util.*;/** * 抽象CanalMQ通用处理服务 * * @author Yu * @date 2019/09/08 00:05 **/@Slf4jpublic abstract class AbstractCanalMQ2RedisServiceimplements CanalSynService { @Resource private RedisTemplate redisTemplate; @Resource private RedisUtils redisUtils; private Class cache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,删库清空,更新字段处理 } @Override public void insert(Collection list) { insertOrUpdate(list); } @Override public void update(Collection list) { insertOrUpdate(list); } private void insertOrUpdate(Collection list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection list) { Set keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 获取类泛型 * * @return 泛型Class */ protected Class getTypeArguement() { if(cache == null) { cache = (Class ) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 获取Object标有@Id注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@Id注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO类未设置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set getData(FlatMessage flatMessage) { List
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage;import com.taco.springcloud.canal.model.User;import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;@Slf4j@Service@RocketMQMessageListener(topic = "test_users", consumerGroup = "users")public class TestUsersConsumer extends AbstractCanalMQ2RedisServiceimplements RocketMQListener { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); }}
看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
同步
数据
对象
名称
字段
处理
更新
内容
注解
服务
配置
代码
信息
更多
知识
篇文章
类型
行业
语句
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
连接docker中的数据库
uk数据库表审查失败
腾讯轻量云服务器装08系统
安卓应用软件开发有哪些流
网络安全等级评定中心
淄博应用软件开发公司
公共基础网络安全
打印机网络安全密码是多少
软件开发公司怎么看好坏
苏州国开行软件开发中心秘闻
kali关闭X服务器
余姚手机软件开发哪家好
小银行软件开发
tplink服务器不响应
系统设计与软件开发
柒喜互联网科技公司
客户机服务器是什么拓扑结构
四川大学网络安全奖学金
街道网络安全规划
前车辅助系统传感器无数据库
连接docker中的数据库
端口转发访问内网ftp服务器
网络安全示例
深圳视频安防软件开发服务
甲骨文美国数据库
广东智慧党建软件开发软件
崇明区数据金融网络技术服务优势
关于ai网络安全手抄报上的句子
武汉国家网络安全创新基地投资
北京通信定位软件开发