这篇教程关于Java中RabbitMQ的高级特性写得很实用,希望能帮到您。
RabbitMQ高级特性
1.消息的可靠投递在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式。 RabbitMQ整个消息投递的路径为:producer >rabbitMQ broker > exchange > queue > consumer - 消息从producer到exchange则会返回一个
confirmCallback - 消息从exchange到queue投递失败则会返回一个
returnCallback
利用这两个callback来控制消息的可靠性传递。
1.1 confirm 确认模式(1)开启确认模式 在创建连接工厂的时候要开启确认模式,关键字:publisher-confirms ,默认为false 。 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true"/> (2)RabbitTemplate设置回调 @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")public class ProducerTest { /** * 注入RabbitTemplate */ @Autowired private RabbitTemplate rabbitTemplate; /** * 测试默认的队列发送消息 */ @Test public void testConfirmCallback() throws InterruptedException { // 设置回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 回调方法 * @param correlationData 回调的相关数据。 * @param ack true 表示发送成功, false 发送失败 * @param cause 失败原因,ack==true->null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("发送成功"); } else { System.out.println("发送失败,原因:" + cause); // 失败后处理流程 } } }); rabbitTemplate.convertAndSend("spring_queue", "hello world"); // 防止发送完成后,未完成回调关闭通道 Thread.sleep(5000); }} public void confirm(CorrelationData correlationData, boolean ack, String cause)
correlationData 参数,发送数据的时候可以携带上ack 是否发送成功,成功为true,失败为falsecause 失败的原因,成功时为null
Thread.sleep(5000); 防止发送完成后,未完成回调关闭通道
如果没有加上会 clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
1.2 return 回退模式(1)开启回退模式 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-returns="true"/> (2)RabbitTemplate设置回调 @Test public void testReturnCallback() throws InterruptedException { // 设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); // 设置回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 返回消息 * @param message 消息对象 * @param replyCode 错误码 * @param replyText 交换信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息对象:" + new String(message.getBody())); System.out.println("错误码:" + replyCode); System.out.println("交换信息:" + replyText); System.out.println("交换机:" + exchange); System.out.println("路由键:" + routingKey); } }); rabbitTemplate.convertAndSend("spring_direct_exchange", "direct_key_3", "spring_direct_exchange_direct_key_1"); // 防止发送完成后,未完成回调关闭通道 Thread.sleep(5000); } public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) - message 消息对象
- replyCode 错误码
- replyText 交换信息
- exchange 交换机
- routingKey 路由键
mandatory属性的优先级高于publisher-returns的优先级 mandatory结果为true、false时会忽略掉publisher-returns属性的值 mandatory结果为null(即不配置)时结果由publisher-returns确定
2.Consumer Ack(消费端)Ack指Acknowledge,确认。表示消费端接收到消息后的确认方式。 有三种确认方式: - 自动确认:
acknowledge="none" - 手动确认:
acknowledge="manual" - 根据异常情况确认:
acknowledge="auto"
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message 从RabbitMQ的消息缓存中移除。 但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用``channel.basicAck(),手动签收,如果出现异常,则调用 channel.basicNack()`方法,让其自动重新发送消息。
2.1 设置手动签收(1)创建一个监听器接收消息 设置手动接收时,让监听器实现ChannelAwareMessageListener 接口 如果消息成功处理,则调用channel.basicAck() 如果消息处理失败,则调用 channel.basicNack() ,broker重新发送consumer /** * @author zhong * <p> * Consumer Ack机制 * 1.设置手动签收,acknowledge="manual" * 2.让监听器实现ChannelAwareMessageListener接口 * 3.如果消息成功处理,则调用channel.basicAck() * 4.如果消息处理失败,则调用 channel.basicNack(),broker重新发送consumer */@Componentpublic class AckSpringQueueListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 接收消息 System.out.println("Message:" + new String(message.getBody())); // 手动签收 /** * deliveryTag: 标识id * multiple: 确认所有消息 */ channel.basicAck(deliveryTag, true); // 手动拒绝 /** * requeue:如果被拒绝的消息应该被重新排队而不是被丢弃/死信 */ //channel.basicNack(deliveryTag, true, true); }} (2)设置手动,加入监听 设置手动签收,acknowledge=“manual” <context:component-scan base-package="org.example"/><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true"> <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/></rabbit:listener-container>
3.消费端限流MQ一个作用就是削峰填谷,通过消费端限流实现。 消费端限流包括一下操作: |