华企号 后端开发 RabbitMQ消息确认机制-可靠抵达

RabbitMQ消息确认机制-可靠抵达

 

消息发送到被消费的流程:

JAVA的生产端的发送数据—–>Broker(消息服务器)——–>达到Exchange交换机————->通过路由键到达Queue<——JAVA消费端监听并消费

这里P->B、E->Q属于生产端确认,Q->C是属于消费端确认,

生产端消息确认机制
1.yml配置

rabbitmq:
host: xxxxxxxx
virtual-host: /
port: xxx
#开启发送端确认p->b
publisher-confirms: true
#开启消息抵达队列的确认 e->q
publisher-returns: true
#只要消息到达队列,以异步的方式优先回调我们这个returnconfirm
template:
mandatory: true

2.生产端消息确认机制自定义配置代码
下面这些配置代码属于回调!

/**
* @description: Rabbit核心配置
* @author TAO
* @date 2020/8/4 20:48
*/
@Configuration
public class MyRabbitConfig {

//讲对象序列化为JSON
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

@Autowired
RabbitTemplate rabbitTemplate;

//定制RabbitTemplate
@PostConstruct//MyRabbitConfig对象创建完成后执行这个初始化方法
public void initRabbitTemplate(){
//设置发送消息确认回调p->b
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

/**
* @param correlationData 当前消息的唯一关联数据 ,这个是消息的唯一id
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(“confirm…correlationData=>”+correlationData+”———-ack==>”+ack+”——–cause ==>”+cause);
}
});

//设置消息抵达队列的失败回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就会触发这个失败回调
* @param message 投递失败的消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 消息发送的交换机
* @param routingKey 消息走的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(“returnedMessage–>”+message+”\nreplyCode–>”+replyCode+”\nexchange–>”+exchange+”\nroutingKey–>”+routingKey);
}
});
}
}

3.编写发送消息请求
方便测试

@RestController
public class SendMQController {

@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping(“/send”)
public String send(){
//new CorrelationData(UUID.randomUUID().toString())指定消息的唯一id
//项目实际情况还会将消息的唯一id存入数据库中,用作后期队列中的消息消费情况做对比
String id=UUID.randomUUID().toString();
System.out.println(“消息发送中…消息唯一id”+id);
rabbitTemplate.convertAndSend(“hello-java-exchange”, “hello.jasva”, “我是一条消息”,new CorrelationData(id));
return “ok”;
}
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
4.查看效果

 

消费端确认机制
消费端是默认自动确认的,只要消息接收到,客户端就会自动确认消息,RabbitMQ就会移除这个消息

1.编写yml配置文件

rabbitmq:
host: xxx
virtual-host: /
port: xx
#开启发送端确认p->b
publisher-confirms: true
#开启消息抵达队列的确认 e->q
publisher-returns: true
#只要消息到达队列,以异步的方式优先回调我们这个returnconfirm
template:
mandatory: true
#手动签收消息
listener:
simple:
acknowledge-mode: manual
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
消息监听处代码编写

@Service
@RabbitListener(queues={“hello-java-queue”})//标注这个类可以监听这个队列中的所有消息
public class RabbitImpl {

//我们收到很多消息,自动回复给服务器ack,只有一个消息处理了,然后服务器宕机了,发生消息丢失
//这时我们就需要手动确认模式
@RabbitHandler
public void recieveMessage(Message message, Channel channel){

long deliveryTag=message.getMessageProperties().getDeliveryTag();

try {
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicAck(deliveryTag,false);//签收

//deliveryTag签收的消息标签
//multiple 是否批量签收
//是否从新入队 (将这条拒收的消息又重新存放带队列中)
channel.basicNack(deliveryTag,false,false);//拒收-支持批量
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicReject(deliveryTag,false);//拒收-不支持批量
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(“接收到消息=>”+message);

}
}

消费者获取到消息,成功处理,可 回复Ack给Broker
-basic.ack用于肯定确认;broker将移除此消息
-basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
-basic.reject用于否定确认;同上,但不能批量
默认自动ack,消息被消费者收到,就会从broker的queue中移除”queue无消费者,消息依然会被存储,直到消费者消费,消费者收到消息,默认会自动ack,但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
-消息处理成功, ack(),接受下一个消息,此消息broker就会移除
-消息处理失败, nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
-消息一直没有调用ack/nack方法, broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

 

作者: 华企网通王鹏程序员

我是程序员王鹏,热爱互联网软件开发和设计,专注于大数据、数据分析、数据库、php、java、python、scala、k8s、docker等知识总结。 我的座右铭:"业精于勤荒于嬉,行成于思毁于随"
上一篇
下一篇

发表回复

联系我们

联系我们

028-84868647

在线咨询: QQ交谈

邮箱: tech@68v8.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部