消息队列之常见问题处理

q1871901600 发布于 2024-11-16 29 次阅读


在消息队列使用中常见会遇到问题:消息丢失,消息积压,消息不顺序消费

一,如何保证消息不丢失?

这下面几个地方都容易丢失消息

这里1,2,4 步骤都是跨网络请求,是存在丢失数据的可能的

然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。

这个是MQ场景都会面对的通用的丢消息问题。

2.1 生产者发送消息不丢失

kafka:消息发送+回调

roketmq:事务消息

事务消息原理:

1,half消息主要作用是探测MQ服务是否正常通信

2,在本地事务报错时不往MQ发送消息但是需要给MQ一个UNKNOWN状态的消息通知MQ服务本次事务消息出错,需要等段时间再查询事务状态。等MQ后面查询事务时重试数据存入mysql,如果成功则发送消息,由此保证了生成端的数据不丢失和消息的可靠性。

3,如果rocketMQ节点突然挂掉了没有将消息发送给下游服务时,等rocketMQ重启后会重新从生成者查询事务状态,因此使用事务消息时我们的数据状态需要考虑一个已发送消息单未成功消费的状态,当rocketMQ重新查询事务是这个状态时就会重新给下游服务重试发消息。以此保证了MQ的可用性。

2.2 Mq主从消息不丢失

通过搭建Mq主从集群从而实现Mq的高可用.

rockerMQ:Dledger主从架构

在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。

简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。

Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。

​ 接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。

​ 再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。

2.3 Mq消息存盘不丢失

rockerMQ配置同步刷盘,因为异步刷盘有丢失消息的可能性

在RocketMQ的配置文件broker.conf中,找到flushDiskType参数,并将其设置为SYNC_FLUSH以启用同步刷盘。

flushDiskType=SYNC_FLUSH

同步刷盘意味着消息写入到内存后,会等待磁盘刷盘操作完成后才返回写入成功的确认,这样可以保证消息的持久性,但可能会影响性能。

    二,如何保证消息顺序消费?

    应用场景:

    ​ 每一个订单有从下单、锁库存、支付、下物流等几个业务步骤。每个业务步骤都由一个消息生产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?

    示例代码:

    ​ 生产者核心代码:

    for (int i = 0; i < 10; i++) {
                    int orderId = i;
                    for(int j = 0 ; j <= 5 ; j ++){
                        Message msg =
                                new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,
                                        ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                            @Override
                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                Integer id = (Integer) arg;
                                int index = id % mqs.size();
                                return mqs.get(index);
                            }
                        }, orderId);
                        System.out.printf("%s%n", sendResult);
                    }
                }

    ​ 通过MessageSelector,将orderId相同的消息,都转发到同一个MessageQueue中,分配机制是通过id%size的方式。

    ​ 消费者核心代码:

    consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    for(MessageExt msg:msgs){
                        System.out.println("收到消息内容 "+new String(msg.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });

    ​ 注入一个MessageListenerOrderly实现。

    实现思路:

    ​ 基础思路:只有放到一起的一批消息,才有可能保持消息的顺序。

    ​ 1、生产者只有将一批有顺序要求的消息,放到同一个MesasgeQueue上,Broker才有可能保持这一批消息的顺序。

    ​ 2、消费者只有一次锁定一个MessageQueue,拿到MessageQueue上所有的消息,

    注意点:

    ​ 1、理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。

    ​ 2、生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于几种导致数据热点竞争。

    ​ 2、消费者端只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理。

    ​ 3、消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序。

    ​ 4、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

    三,消息重复消费

    消息幂等的必要性

    在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

    • 发送时消息重复当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
    • 投递时消息重复消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
    • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

    处理方式

    ​ 从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

    1。RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

    ​ 但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

    2。通过分布式锁解决消息幂等问题,以消息ID为锁键,消费者拿到消息时通过消息id判断是否是加锁,如果已经加锁则不处理,获取锁成功则继续处理逻辑。

    四,消息积压问题处理

    1. 发送端性能优化

    • 批量发送,将多条消息合并成一条大消息进行发送
    • 并发发送,使用多线程进行消息发送

    无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。

    比如说,你的消息发送端是一个微服务,主要接受RPC请求处理在线业务。很自然的,微服务在处理每次请 求的时候,就在当前线程直接发送消息就可以了,因为所有RPC框架都是多线程支持多并发的,自然也就实 现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响RPC服务的时延。 这种情况,比较明智的方式就是通过并发来提升发送性能。

    如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?它不关心时延,更注重整个系统的 吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然 后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。

    2,消费端优化

    1)消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫 队列)数量,确保Consumer的实例数和分区数量是相等的。

    如果Consumer的实例数量超过分区数量,这 样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

    2)很多消费程序,他们是这样来解决消费慢的问题的:

    在收到消息的OnMessage方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线 程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单 个Consumer不能并行消费的问题。

    注意:

    会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。

    3)创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。

    参考文章:

    消息队列:消息积压如何处理?_消息积压解决方案-CSDN博客

    java - 阿里三面:MQ 消息丢失、重复、积压问题,如何解决? - 个人文章 - SegmentFault 思否

    rocketmq 同步刷盘、异步刷盘和同步复制、异步复制 - 沐之橙 - 博客园

    RocketMQ消息幂等性处理:构建稳健的消息系统-百度开发者中心

    一个会写python的Java工程师
    最后更新于 2024-11-20