您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch | 异常检测 |

自学教程:关于Java中RabbitMQ的高级特性

51自学网 2023-07-22 10:36:21
  python
这篇教程关于Java中RabbitMQ的高级特性写得很实用,希望能帮到您。

RabbitMQ高级特性

1.消息的可靠投递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

RabbitMQ整个消息投递的路径为:producer>rabbitMQ broker> exchange > queue > consumer

  • 消息从producer到exchange则会返回一个confirmCallback
  • 消息从exchange到queue投递失败则会返回一个returnCallback

利用这两个callback来控制消息的可靠性传递。

1.1 confirm 确认模式

(1)开启确认模式

在创建连接工厂的时候要开启确认模式,关键字:publisher-confirms,默认为false

<rabbit:connection-factory id="connectionFactory"                            host="${rabbitmq.host}"                           port="${rabbitmq.port}"                           username="${rabbitmq.username}"                           password="${rabbitmq.password}"                           virtual-host="${rabbitmq.virtual-host}"                           publisher-confirms="true"/>

(2)RabbitTemplate设置回调

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")public class ProducerTest {    /**     * 注入RabbitTemplate     */    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 测试默认的队列发送消息     */    @Test    public void testConfirmCallback() throws InterruptedException {        // 设置回调        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            /**             * 回调方法             * @param correlationData 回调的相关数据。             * @param ack true 表示发送成功, false 发送失败             * @param cause 失败原因,ack==true->null             */            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause) {                if (ack) {                    System.out.println("发送成功");                } else {                    System.out.println("发送失败,原因:" + cause);                    // 失败后处理流程                }            }        });        rabbitTemplate.convertAndSend("spring_queue", "hello world");        // 防止发送完成后,未完成回调关闭通道        Thread.sleep(5000);    }}
  • public void confirm(CorrelationData correlationData, boolean ack, String cause)

    • correlationData 参数,发送数据的时候可以携带上
    • ack 是否发送成功,成功为true,失败为false
    • cause 失败的原因,成功时为null
  • Thread.sleep(5000);防止发送完成后,未完成回调关闭通道

    如果没有加上会

    clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

1.2 return 回退模式

(1)开启回退模式

<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"                           port="${rabbitmq.port}"                           username="${rabbitmq.username}"                           password="${rabbitmq.password}"                           virtual-host="${rabbitmq.virtual-host}"                           publisher-returns="true"/>

(2)RabbitTemplate设置回调

@Test    public void testReturnCallback() throws InterruptedException {        // 设置交换机处理失败消息的模式        rabbitTemplate.setMandatory(true);        // 设置回调        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {            /**             * 返回消息             * @param message 消息对象             * @param replyCode 错误码             * @param replyText 交换信息             * @param exchange 交换机             * @param routingKey 路由键             */            @Override            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {                System.out.println("消息对象:" + new String(message.getBody()));                System.out.println("错误码:" + replyCode);                System.out.println("交换信息:" + replyText);                System.out.println("交换机:" + exchange);                System.out.println("路由键:" + routingKey);            }        });        rabbitTemplate.convertAndSend("spring_direct_exchange", "direct_key_3",                "spring_direct_exchange_direct_key_1");        // 防止发送完成后,未完成回调关闭通道        Thread.sleep(5000);    }

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

  • message 消息对象
  • replyCode 错误码
  • replyText 交换信息
  • exchange 交换机
  • routingKey 路由键

mandatory属性的优先级高于publisher-returns的优先级
mandatory结果为true、false时会忽略掉publisher-returns属性的值
mandatory结果为null(即不配置)时结果由publisher-returns确定

2.Consumer Ack(消费端)

Ack指Acknowledge,确认。表示消费端接收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message 从RabbitMQ的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用``channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()`方法,让其自动重新发送消息。

2.1 设置手动签收

(1)创建一个监听器接收消息

设置手动接收时,让监听器实现ChannelAwareMessageListener接口

如果消息成功处理,则调用channel.basicAck()

如果消息处理失败,则调用 channel.basicNack(),broker重新发送consumer

/** * @author zhong * <p> * Consumer Ack机制 * 1.设置手动签收,acknowledge="manual" * 2.让监听器实现ChannelAwareMessageListener接口 * 3.如果消息成功处理,则调用channel.basicAck() * 4.如果消息处理失败,则调用 channel.basicNack(),broker重新发送consumer */@Componentpublic class AckSpringQueueListener implements ChannelAwareMessageListener {    @Override    public void onMessage(Message message, Channel channel) throws Exception {        long deliveryTag = message.getMessageProperties().getDeliveryTag();        // 接收消息        System.out.println("Message:" + new String(message.getBody()));        // 手动签收        /**         * deliveryTag: 标识id         * multiple: 确认所有消息         */        channel.basicAck(deliveryTag, true);        // 手动拒绝        /**         * requeue:如果被拒绝的消息应该被重新排队而不是被丢弃/死信         */        //channel.basicNack(deliveryTag, true, true);    }}

(2)设置手动,加入监听

设置手动签收,acknowledge=“manual”

<context:component-scan base-package="org.example"/><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true">    <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/></rabbit:listener-container>

3.消费端限流

MQ一个作用就是削峰填谷,通过消费端限流实现。

消费端限流包括一下操作:

51自学网自学EXCEL、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。
京ICP备13026421号-1