溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

rabbitmq template實(shí)現(xiàn)發(fā)送消息

發(fā)布時(shí)間:2020-06-18 14:22:10 來源:億速云 閱讀:688 作者:元一 欄目:編程語(yǔ)言

前言

rabbitmq template:消息模板。這是spring整合rabbit提供的消息模板。是進(jìn)行發(fā)送消息的關(guān)鍵類。該類提供了豐富的發(fā)送方法,包括可靠性投遞消息方法、回調(diào)監(jiān)聽消息接口ConfirmCallback、返回值確認(rèn)接口ReturnCallBack等等。同樣我們需要注入到spring容器中,然后就可以想其他bean那樣正常使用了。

之前的配置是這樣的:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setMandatory(true);

        ...
        return template;
    }

要發(fā)送出去的消息vo是這樣的:

@Data
public class TestVO {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date testDate;
}

然后,出現(xiàn)的問題就是,消息體里,時(shí)間比當(dāng)前時(shí)間少了8個(gè)小時(shí)。

{"testDate":"2019-12-27 05:45:26"}

原因

我們是這么使用rabbitmq template的:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisRepository redisRepository;

    /**
     * 發(fā)送消息
     * @param exchange 交換機(jī)名稱
     * @param routingKey 路由鍵
     * @param msgMbject 消息體,無需序列化,會(huì)自動(dòng)序列化為json
     */
    public void send(String exchange, String routingKey, final Object msgMbject) {
        CorrelationData correlationData = new CorrelationData(GUID.generate());
        CachedMqMessageForConfirm cachedMqMessageForConfirm = new CachedMqMessageForConfirm(exchange, routingKey, msgMbject);
        redisRepository.saveCacheMessageForConfirms(correlationData,cachedMqMessageForConfirm);
        //核心代碼:這里,發(fā)送出去的msgObject其實(shí)就是一個(gè)vo或者dto,rabbitmqTemplate會(huì)自動(dòng)幫我們轉(zhuǎn)為json
        rabbitTemplate.convertAndSend(exchange,routingKey,msgMbject,correlationData);
    }

注釋里我解釋了,rabbitmq會(huì)自動(dòng)做轉(zhuǎn)換,轉(zhuǎn)換用的就是jackson。

跟進(jìn)源碼也能一探究竟:

org.springframework.amqp.rabbit.core.RabbitTemplate#convertAndSend

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {
        // 這里調(diào)用了convertMessageIfNecessary(object)
        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }
調(diào)用了convertMessageIfNessary:

protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        // 獲取消息轉(zhuǎn)換器
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

獲取消息轉(zhuǎn)換器的代碼如下:

private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter converter = getMessageConverter();
        if (converter == null) {
            throw new AmqpIllegalStateException(
                    "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        }
        return converter;
    }

getMessageConverter就是獲取rabbitmqTemplate 類中的一個(gè)field。

public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }
我們只要看哪里對(duì)它進(jìn)行賦值即可。

然后我想起來,就是在我們業(yè)務(wù)代碼里賦值的:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 下面這里賦值了。。。差點(diǎn)搞忘了
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setMandatory(true);

        return template;
    }

反正呢,總體來說,就是rabbitmqTemplate 會(huì)使用我們自定義的messageConverter轉(zhuǎn)換message后再發(fā)送。

時(shí)區(qū)問題,很好重現(xiàn),源碼在:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/jackson-demo

@Data
public class TestVO {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date testDate;
}

測(cè)試代碼:

@org.junit.Test
    public void normal() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        TestVO vo = new TestVO();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

輸出:

{"testDate":"2019-12-27 05:45:26"}

解決辦法

指定默認(rèn)時(shí)區(qū)配置

@org.junit.Test
    public void specifyDefaultTimezone() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        SerializationConfig oldSerializationConfig = mapper.getSerializationConfig();
        /**
         * 新的序列化配置,要配置時(shí)區(qū)
         */
        String timeZone = "GMT+8";
        SerializationConfig newSerializationConfig = oldSerializationConfig.with(TimeZone.getTimeZone(timeZone));

        mapper.setConfig(newSerializationConfig);
        TestVO vo = new TestVO();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

在field上加注解

@Data
public class TestVoWithTimeZone {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date testDate;
}
我們這里,新增了timezone,手動(dòng)指定了時(shí)區(qū)配置。

測(cè)試代碼:

@org.junit.Test
    public void specifyTimezoneOnField() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        TestVoWithTimeZone vo = new TestVoWithTimeZone();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

上面兩種的輸出都是正確的。

這里沒有去分析源碼,簡(jiǎn)單說一下,在序列化的時(shí)候,會(huì)有一個(gè)序列化配置;這個(gè)配置由兩部分組成:默認(rèn)配置+這個(gè)類自定義的配置。 自定義配置會(huì)覆蓋默認(rèn)配置。

我們的第二種方式,就是修改了默認(rèn)配置;第三種方式,就是使用自定義配置覆蓋默認(rèn)配置。

jackson 還挺重要,尤其是 spring cloud 全家桶, feign 也用了這個(gè), restTemplate 也用了,還有 Spring MVC 里的 httpmessageConverter 有興趣的同學(xué),去看下面這個(gè)地方就可以了。
rabbitmq template實(shí)現(xiàn)發(fā)送消息

rabbitmq template實(shí)現(xiàn)發(fā)送消息

如果對(duì)JsonFormat的處理感興趣,可以看下面的地方:

com.fasterxml.jackson.annotation.JsonFormat.Value#Value(com.fasterxml.jackson.annotation.JsonFormat) (打個(gè)斷點(diǎn)在這里,然后跑個(gè)test就到這里了)
rabbitmq template實(shí)現(xiàn)發(fā)送消息

rabbitmq template實(shí)現(xiàn)發(fā)送消息

總結(jié)
差點(diǎn)忘了,針對(duì)rabbitmq template的問題,最終我們的解決方案就是:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        ObjectMapper mapper = new ObjectMapper();
        SerializationConfig oldSerializationConfig = mapper.getSerializationConfig();
        /**
         * 新的序列化配置,要配置時(shí)區(qū)
         */
        String timeZone = environment.getProperty(CadModuleConstants.SPRING_JACKSON_TIME_ZONE);
        SerializationConfig newSerializationConfig = oldSerializationConfig.with(TimeZone.getTimeZone(timeZone));

        mapper.setConfig(newSerializationConfig);

        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(mapper);

        template.setMessageConverter(messageConverter);
        template.setMandatory(true);

        ...設(shè)置callback啥的
        return template;
    }
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI