首页 > 社交 > 科普中国

SpringBoot整合RabbitMQ实战附加死信交换机

常驻编辑 科普中国 2022-06-11 死信   交换机   队列   绑定   实战   异常   次数   声明   消息   代码   业务
arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD); //绑定该队列到死信交换机的队列1 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD); return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build(); } /** * 声明业务队列2 * * @return */ @Bean("queueProject") public Queue queueProject() { Map arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD); //绑定该队列到死信交换机的队列2 arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD); return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build(); } /** * 声明死信队列1 * * @return */ @Bean("queueCompanyDead") public Queue queueCompanyDead() { return new Queue(QUEUE_COMPANY_DEAD); } /** * 声明死信队列2 * * @return */ @Bean("queueProjectDead") public Queue queueProjectDead() { return new Queue(QUEUE_PROJECT_DEAD); } /** * 绑定业务队列1和业务交换机 * @param queue * @param directExchange * @return */ @Bean public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY); } /** * 绑定业务队列2和业务交换机 * @param queue * @param directExchange * @return */ @Bean public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT); } /** * 绑定死信队列1和死信交换机 * @param queue * @param directExchange * @return */ @Bean public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD); } /** * 绑定死信队列2和死信交换机 * @param queue * @param directExchange * @return */ @Bean public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD); } }

生产者

RabbltProducerQpJ拜客生活常识网

复制代码123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687JAVApackage com.example.rabitmqdemo.mydemo.producer;

import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 初始化消息确认函数
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);

    }

    /**
     * 发送消息服务器确认函数
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功" + correlationData);
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }

    /**
     * 消息发送失败,消息回调函数
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = new String(returnedMessage.getMessage().getBody());
        System.out.println("消息发送失败:" + str);
    }

    /**
     * 处理消息发送到队列1
     * @param str
     */
    public void sendCompany(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
    }

    /**
     * 处理消息发送到队列2
     * @param str
     */
    public void sendProject(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
    }
}

    

相关阅读:

  • 交换机如何使用(交换机买来可以直接使用吗)
  • 什么是交换机(交换机的通俗解释)
  • 交换机怎么连接(一二三层交换机的区别)-路由器和交换
  • 交换机怎么配置 华为路由器交换机配置?
  • 交换机与路由器的区别 不纯的博美三个月图片?
  • 华为防火墙路由器三层交换机的维护常用命令
  • 首款自校准可编程光子芯片面世
  • 如何购买家用路由器
  • 弱电网工们必看的网络基础知识大合集
  • 任正非的故事(三)
    • 网站地图 |
    • 声明:登载此文出于传递更多信息之目的,并不意味着赞同其观点或证实其描述。文章内容仅供参考,不做权威认证,如若验证其真实性,请咨询相关权威专业人士。