【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析

DefaultMQPushConsumer

使用系统控制读取操作的DefaultMQPushConsumer可以自动调用传入的处理方法来处理收到的消息。通过设置各种参数和传入处理消息的函数使用DefaultMQPushConsumer的主要目的是方便配置和处理消息。在收到消息后系统会自动保存Offset并且如果加入了新的DefaultMQPushConsumer系统会自动做负载均衡。

RocketMQ的消息模式

RocketMQ提供Clustering和Broadcasting两种消息模式。

  • Clustering模式下ConsumerGroup内每个Consumer只消费所订阅消息的一部分而所有Consumer消费内容合在一起构成Topic内容实现负载均衡。

  • Broadcasting模式下同一ConsumerGroup内每个Consumer都接收所订阅Topic的全部消息每个消息分发给多个Consumer消费。

推模式的的案例代码

使用 DefaultMQPushConsumer 可以自动控制读取操作收到消息后会自动调用传入的处理方法进行处理并且自动保存 Offset。主要需要设置好各种参数以及传入处理消息的函数。当加入新的 DefaultMQPushConsumer 后系统会自动进行负载均衡。

public class DefaultMQPushConsumerSample {

    public static void main(String[] args) throws MQClientException {
        // Consumer 的 GroupName 用于把多个 Consumer 组织到一起提高并发处理能力GroupName 需要和消息模式 MessageModel 配合使用。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // NameServer 的地址和端口号可以填写多个用分号隔开达到消除单点故障的目的比如“ip1portip2portip3port”。
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /* Specify where to start in case the specified Consumer group is a brand new one. */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        /* Subscribe one more more Topics to consume. */
        // Topic 名称用来标识消息类型需要提前创建。
        // 如果不需要消费某个 Topic 下的所有消息可以通过指定消息的 Tag 进行消息过滤
        // 比如Consumer.subscribe"TopicTest""tag1||tag2||tag3"表示这个 Consumer 要消费“TopicTest”下带有tag1或tag2或tag3的消息
        //  Tag 是在发送消息时设置的标签。在填写 Tag 参数的位置 用 null 或者“*” 表示要消费这个 Topic 的所有消息。
        consumer.subscribe("TopicTest", "*");
        /* * Register callback to execute on arrival of Messages fetched from brokers. */
        consumer.registerMessageListener(
                (MessageListenerConcurrently) (msgs, context) -> {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    System.out.println();
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
        /* * Launch the Consumer instance.*/
        consumer.start();
    }
}

DefaultMQPushConsumer 的处理流程

DefaultMQPushConsumer 的主要功能是由 DefaultMQPushConsumerImpl 类实现的。消息的处理逻辑在 pullMessage 函数中的 PullCallBack 中完成。PullCallBack 函数里有一个 switch 语句根据从 Broker 返回的消息类型进行相应的处理。

com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

PullCallback pullCallback = new PullCallback() {
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
            switch(pullResult.getPullStatus()) {
            case FOUND:
            	// 省略代码
                break;
            case NO_NEW_MSG:
            	// 省略代码
                break;
            case NO_MATCHED_MSG:
            	// 省略代码
                break;
            case OFFSET_ILLEGAL:
                // 省略代码
            }
        }
    }
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
            DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e);
        }
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L);
    }
};

长轮询

DefaultMQPushConsumer 中通过使用“PullRequest”以“长轮询”(long polling)方式实现了 Push 效果。长轮询方式既保留了 Pull 的好处又具有 Push 方式的实时性。在长轮询中客户端的请求权力仍然掌握在 Consumer 手中即使 Broker 有大量消息积压也不会主动推送给 Consumer。长轮询方式的局限性是需要占用资源来维护客户端的请求因此适合在消息队列等客户端连接数可控的场景中使用。

Push 方式是 Server 端接收到消息后主动将消息推送给 Client 端这种方式实时性强。然而对于提供队列服务的 Server 来说用 Push 方式主动推送会增加 Server 端的工作量从而影响 Server 的性能而且 Client 的处理能力不同Client 的状态也不受 Server 控制如果 Client 不能及时处理 Server 推送过来的消息就会出现潜在问题。

Pull 方式是 Client 端循环地从 Server 端拉取消息主动权在 Client 手中自己拉取到一定数量的消息后再进行处理。Pull 方式的问题在于循环拉取消息的间隔不好设定间隔太短会导致一种“忙等”状态浪费资源而每个 Pull 的时间间隔太长会导致 Server 端有更多的消息到来而没有被及时处理。

长轮询方式通过 Client 端和 Server 端的合作既保留了 Pull 的优点又在保证实时性方面达到了目的。

com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
    brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().
	pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);

requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis) 的作用是设置 Broker 的最长阻塞时间其默认设置是 15 秒但需注意仅当 Broker 没有新消息时才会被阻塞如果有新消息则会立即返回。

“长轮询” 服务端代码

从 Broker 的源码中可以看出服务端在接收到新的消息请求后并不会急于返回而是通过一个循环状态不断地查看队列中是否有新消息。每次查看状态时会暂停一段时间默认为 5 秒然后再次进行检查。在默认情况下当 Broker 没有新的消息时第三次检查时若等待时间超过 Request 中设定的 Broker-SuspendMaxTimeMillis会返回一个空结果。

if (this.brokerController.getBrokerConfig().isLongPollingEnable()){
    this.waitForRunning( 5 * 1000); 
} else {
  this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); 
}
long beginLockTimestamp = this.systemClock.now(); 
this.checkHoldRequest(); 
long costTime = this.systemClock.now() - beginLockTimestamp; 
if (costTime > 5 * 1000) { 
    Log. info("[ NOTIFYME] check hold request cost {} ms.", costTime); 
}

在等待的过程中一旦 Broker 收到新的消息就会立即调用 notifyMessageArriving 函数并返回请求结果。"长轮询"的核心是Broker 会暂时地保留客户端请求在这段时间内如果有新的消息到达则可以不用创建新的连接而是利用现有的连接立刻返回消息给 Consumer。

messageQueue和processQueue

PullRequest中定义了messageQueue和processQueue。

processQueue

processQueue是一个快照类在PushConsumer运行时每个MessageQueue都会有一个对应的ProcessQueue对象用于保存该MessageQueue消息处理状态的快照。

ProcessQueue对象主要包含一个TreeMap和一个读写锁。TreeMap以Message Queue的Offset作为Key以消息内容的引用为Value保存了所有从MessageQueue获取到但还未被处理的消息读写锁控制着多个线程对TreeMap对象的并发访问。

在pull逻辑中PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度如果有任何一个值超过了设置的大小则会隔一段时间再拉取消息以达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。相应代码如下

com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

PushConsumer会判断获取但还未处理的消息个数
long size = processQueue.getMsgCount().get();
if (size > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, 50L);
    if (this.flowControlTimes1++ % 1000L == 0L) {
        this.log.warn("the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, this.flowControlTimes1});
    }
}
消息总大小、Offset的跨度
if (!this.consumeOrderly) {
        if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.flowControlTimes2++ % 1000L == 0L) {
                this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.flowControlTimes2});
            }

            return;
        }
    }
ProcessQueue对象主要包含一个TreeMap和一个读写锁
 else {
        if (!processQueue.isLocked()) {
            this.executePullRequestLater(pullRequest, 3000L);
            this.log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }

        if (!pullRequest.isLockedFirst()) {
            long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy});
            if (brokerBusy) {
                this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset);
            }

            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    }

内容总结

DefaultMQPushConsumer使用长轮训技术让consumer不断向broker端请求消息如果没有可消费的消息则阻塞一段时间等待broker推送消息给consumer。具体实现过程为先每隔一段时间从broker获取消息进行消费如果没有需要消费的消息则调用poll函数向远程broker获取最新的消息最长等待时间为Consumer的maxTimeConsumeConitnusly属性如果超时时间到达还没有新的消息则返回null。这种方式实现实时更新消息且对于broker的开销较小但会导致consumer不断发起请求增加网络负载和调用次数。因此需要合理设置长轮询的超时时间。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6