您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
Canal結(jié)合RocketMQ同步MySQL
略
略
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <!-- 根據(jù)個(gè)人需要依賴 --> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> </dependency>
SQLType.java
import lombok.AccessLevel; import lombok.NoArgsConstructor; /** * Canal監(jiān)聽SQL類型 * * @author Yu * @date 2019/09/08 00:18 **/ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**刪除*/ public static final String DELETE = "DELETE"; }
User.java
import lombok.Data; import javax.persistence.Id; import java.io.Serializable; /** * UserPo對(duì)象 * * @author Yu * @date 2019/09/08 14:13 **/ @Data public class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex; }
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import java.util.Collection; /** * Canal同步服務(wù) * * @author Yu * @date 2019/09/08 00:00 **/ public interface CanalSynService<T> { /** * 處理數(shù)據(jù) * * @param flatMessage CanalMQ數(shù)據(jù) */ void process(FlatMessage flatMessage); /** * DDL語句處理 * * @param flatMessage CanalMQ數(shù)據(jù) */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增數(shù)據(jù) */ void insert(Collection<T> list); /** * 更新 * * @param list 更新數(shù)據(jù) */ void update(Collection<T> list); /** * 刪除 * * @param list 刪除數(shù)據(jù) */ void delete(Collection<T> list); }
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.google.common.collect.Sets; import com.taco.springcloud.canal.constant.SQLType; import com.taco.springcloud.core.component.ApplicationContextHolder; import com.taco.springcloud.core.exception.BizException; import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum; import com.taco.springcloud.core.utils.JsonUtil; import com.taco.springcloud.redis.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.*; /** * 抽象CanalMQ通用處理服務(wù) * * @author Yu * @date 2019/09/08 00:05 **/ @Slf4j public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private RedisUtils redisUtils; private Class<T> cache; /** * 獲取Model名稱 * * @return Model名稱 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,刪庫清空,更新字段處理 } @Override public void insert(Collection<T> list) { insertOrUpdate(list); } @Override public void update(Collection<T> list) { insertOrUpdate(list); } private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封裝redis的key * * @param t 原對(duì)象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 獲取類泛型 * * @return 泛型Class */ protected Class<T> getTypeArguement() { if(cache == null) { cache = (Class<T>) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 獲取Object標(biāo)有@Id注解的字段值 * * @param t 對(duì)象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 獲取Class標(biāo)有@Id注解的字段名稱 * * @return id字段名稱 */ protected Field getIdField() { Class<T> clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO類未設(shè)置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 轉(zhuǎn)換Canal的FlatMessage中data成泛型對(duì)象 * * @param flatMessage Canal發(fā)送MQ信息 * @return 泛型對(duì)象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JsonUtil.mapConvertPojo(map, getTypeArguement()); targetData.add(t); } return targetData; } }
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.taco.springcloud.canal.model.User; import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "test_users", consumerGroup = "users") public class TestUsersConsumer extends AbstractCanalMQ2RedisService<User> implements RocketMQListener<FlatMessage> { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); } }
看完上述內(nèi)容,你們對(duì)Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。