首页 > 社交 > 科普中国

SpringBoot整合RabbitMQ实战附加死信交换机

常驻编辑 科普中国 2022-06-11 死信   交换机   队列   绑定   实战   异常   次数   声明   消息   代码   业务

业务消费者

RabbitConsumerQpJ拜客生活常识网

复制代码12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182JAVApackage com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听业务交换机
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumer {
    /**
     * 监听业务队列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company")
    public void company(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息"+s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    /**
     * 监听业务队列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project")
    public void project(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息"+s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}


死信消费者

RabbitConsumerQpJ拜客生活常识网

复制代码123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354JAVApackage com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听死信交换机
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumerDead {
    /**
     * 处理死信队列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company_dead")
    public void company_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信"+s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:"+e.getMessage());
        }
    }
    /**
     * 处理死信队列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project_dead")
    public void project_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信"+s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:"+e.getMessage());
        }
    }
}
    

相关阅读:

  • 交换机如何使用(交换机买来可以直接使用吗)
  • 什么是交换机(交换机的通俗解释)
  • 交换机怎么连接(一二三层交换机的区别)-路由器和交换
  • 交换机怎么配置 华为路由器交换机配置?
  • 交换机与路由器的区别 不纯的博美三个月图片?
  • 华为防火墙路由器三层交换机的维护常用命令
  • 首款自校准可编程光子芯片面世
  • 如何购买家用路由器
  • 弱电网工们必看的网络基础知识大合集
  • 任正非的故事(三)
    • 网站地图 |
    • 声明:登载此文出于传递更多信息之目的,并不意味着赞同其观点或证实其描述。文章内容仅供参考,不做权威认证,如若验证其真实性,请咨询相关权威专业人士。