RabbitMQ消息的链路跟踪

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

背景

TraceId能标记一次请求的调用链路在我们排查问题的时候十分重要。系统引入MQ后MQ消息默认不带TraceId所以消息发送和处理的链路就断了。下面分享如何对业务逻辑无感的方式将TraceId带到消费端。

难点

RabbitMQ的Message对象可以在属性上设置头信息所以携带TraceId的位置有了问题是怎么无感的方式设置和获取TraceId

Spring RabbitMQ拦截器

在Spring里使用RabbitMQ本身没有拦截器但是有一个消息处理器可以在发送和接收消息之前对消息进行处理。里面有3个重载的方法对原始消息进行转换。我们可以借助这个处理器在Message对象里加上TraceId。

public interface MessagePostProcessor {

	/**
	 * Change (or replace) the message.
	 * @param message the message.
	 * @return the message.
	 * @throws AmqpException an exception.
	 */
	Message postProcessMessage(Message message) throws AmqpException;

	/**
	 * Change (or replace) the message and/or change its correlation data. Only applies to
	 * outbound messages.
	 * @param message the message.
	 * @param correlation the correlation data.
	 * @return the message.
	 * @since 1.6.7
	 */
	default Message postProcessMessage(Message message, Correlation correlation) {
		return postProcessMessage(message);
	}

	/**
	 * Change (or replace) the message and/or change its correlation data. Only applies to
	 * outbound messages.
	 * @param message the message.
	 * @param correlation the correlation data.
	 * @param exchange the exchange to which the message is to be sent.
	 * @param routingKey the routing key.
	 * @return the message.
	 * @since 2.3.4
	 */
	default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
		return postProcessMessage(message, correlation);
	}

}

发送消息时携带TraceId

Spring默认使用RabbitTemplate来发送消息RabbitTemplate的send方法在发送消息之前会调用beforePublishPostProcessors来处理Message对象。beforePublishPostProcessors集合里存的就是MessagePostProcessor对象它会在发消息之前执行

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
	// ...
	// 核心代码
	if (this.beforePublishPostProcessors != null) {
		for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
			messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
		}
	}
	// ...

	sendToRabbit(channel, exch, rKey, mandatory, messageToUse);

	// ...
}

我们要做的是在注册RabbitTemplate的时候加上处理TraceId的逻辑

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);

    template.setBeforePublishPostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
			// 这个方法没调用调用的是下面那个方法
            return message;
        }

        @Override
        public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
            String requestId = MDC.get(MdcConstants.REQUESTID);
            if (StringUtils.isEmpty(requestId)) {
                requestId = StringUtils.random();
            }
            message.getMessageProperties().setHeader(RabbitMQConstants.HEADER_REQUESTID, requestId);
            return message;
        }
    });
    return template;
}

消费消息时获取TraceId

当我们通过@RabbitListener注册consumer时Spring会通过org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop方法不断从consumer队列里拿到消息。在把消息交给@RabbitListener标注的对象前也会对Message对象进行处理。这里的处理器是存在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#afterReceivePostProcessors属性上。

private void doExecuteListener(Channel channel, Object data) {
	if (data instanceof Message) {
		Message message = (Message) data;
		if (this.afterReceivePostProcessors != null) {
			for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
				message = processor.postProcessMessage(message);
				if (message == null) {
					throw new ImmediateAcknowledgeAmqpException(
							"Message Post Processor returned 'null', discarding message");
				}
			}
		}
		// ...

		invokeListener(channel, message);
	}
	else {
		invokeListener(channel, data);
	}
}

怎么设置SimpleMessageListenerContainer#afterReceivePostProcessors的值

SimpleMessageListenerContainer对象是由SimpleRabbitListenerContainerFactory工厂对象创建在创建SimpleMessageListenerContainer对象时会把工厂里的属性拷贝过来afterReceivePostProcessors就是通过工厂拷过来。所以我们直接设置SimpleRabbitListenerContainerFactory的afterReceivePostProcessors值就可以。

@Bean(name = {"rabbitListenerContainerFactory"})
@ConditionalOnProperty(
        prefix = "spring.rabbitmq.listener",
        name = {"type"},
        havingValue = "simple",
        matchIfMissing = true
)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);

    factory.setAfterReceivePostProcessors(message -> {
        Object requestId = message.getMessageProperties().getHeader(RabbitMQConstants.HEADER_REQUESTID);
        if (StringUtils.isEmpty(requestId)) {
            requestId = StringUtils.random();
        }
        MDC.put(MdcConstants.REQUESTID, String.valueOf(requestId));
        return message;
    });
    return factory;
}

这样我们就能在@RabbitListener的消费逻辑里拿到TraceId。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ