业务消费者
RabbitConsumer
复制代码12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182JAVApackage com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 监听业务交换机
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 监听业务队列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company")
public void company(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 监听业务队列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project")
public void project(Message message, Channel channel) throws IOException {
try{
System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("处理消息"+s);
//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
//String str = null;
//str.split("1");
//处理成功,确认应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("处理消息时发生异常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("异常重试次数已到达设置次数,将发送到死信交换机");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即将返回队列处理重试");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
死信消费者
RabbitConsumer
复制代码123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354JAVApackage com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 监听死信交换机
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumerDead {
/**
* 处理死信队列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company_dead")
public void company_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
/**
* 处理死信队列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project_dead")
public void project_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("处理死信"+s);
//在此处记录到数据库、报警之类的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收异常:"+e.getMessage());
}
}
}