RabbitMQ-客户端源码之ChannelN

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

当接收到broker端的ACK/NACK回复时一步步的经过处理到达processAsync(Command command)方法然后进而处理Basic.Ack/.Nack帧。

else if (method instanceof Basic.Ack) {
    Basic.Ack ack = (Basic.Ack) method;
    callConfirmListeners(command, ack);
    handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
    return true;
} else if (method instanceof Basic.Nack) {
    Basic.Nack nack = (Basic.Nack) method;
    callConfirmListeners(command, nack);
    handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
    return true;
} 

首先是将相应的Method做一下转换之后callConfirmListeners()这个方法是调用成员变量confirmListeners这个list里的所有的ConfirmListener

private final Collection confirmListeners = new CopyOnWriteArrayList();

这个ConfirmListener的list就需要在channel.basicPushlish()调用之前先

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        //TODO
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //TODO
    }
});

在调用完ConfirmListener之后继续调用handleAckNack方法

private void handleAckNack(long seqNo, boolean multiple, boolean nack) {
    if (multiple) {
        unconfirmedSet.headSet(seqNo + 1).clear();
    } else {
        unconfirmedSet.remove(seqNo);
    }
    synchronized (unconfirmedSet) {
        onlyAcksReceived = onlyAcksReceived && !nack;
        if (unconfirmedSet.isEmpty())
            unconfirmedSet.notifyAll();
    }
}

这个方法本意上是对收到某条消息的ACK或者NACK的处理发送消息时Basic.Publish的nextPublishNo对应于相应的ACK/NACK的deliveryTag将其从unconfirmedSet中删除即可如果有NACK帧则将其相应的标识onlyAcksReceived设置为false判断此时unconfirmedSet是否为空如果条件成立则notifyAll()将waitForConfirm唤起返回onlyAcksReceived的状态。

如果channel.waitForConfirm()返回为false则说明broker没有接受client发送的消息此时需要在业务代码中做进一步处理比如重发。


Basic.Qos

消费者在开启ACK的情况下对接受到的消息可以根据业务的需要异步对消息进行确认。
然而在实际使用过程中由于消费者自身处理能力有限从RabbitMQ获取一定数量的消息好厚希望rabbitmq不再将队列中的消息推送过来当对消息处理完后即对消息进行了ack,并且有能力处理更多的消息再接受来自队列的消息。在这种场景下我们可以设置Basic.Qos中的prefetch_count来达到这个效果。


Basic.Consume

与消费有关的成员变量

private final Map _consumers =
    Collections.synchronizedMap(new HashMap());
private volatile Consumer defaultConsumer = null;
private final ConsumerDispatcher dispatcher;

源码如下

/**
 * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
 * method.
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param consumerTag a client-generated consumer tag to establish context
 * @param noLocal true if the server should not deliver to this consumer
 * messages published on this channel's connection
 * @param exclusive true if this is an exclusive consumer
 * @param callback an interface to the consumer object
 * @param arguments a set of arguments for the consume
 * @return the consumerTag associated with the new consumer
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 */
public String basicConsume(String queue, boolean autoAck, String consumerTag,
                           boolean noLocal, boolean exclusive, Map arguments,
                           final Consumer callback)
    throws IOException
{
    BlockingRpcContinuation k = new BlockingRpcContinuation() {
        public String transformReply(AMQCommand replyCommand) {
            String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
            _consumers.put(actualConsumerTag, callback);

            dispatcher.handleConsumeOk(callback, actualConsumerTag);
            return actualConsumerTag;
        }
    };

    rpc(new Basic.Consume.Builder()
         .queue(queue)
         .consumerTag(consumerTag)
         .noLocal(noLocal)
         .noAck(autoAck)
         .exclusive(exclusive)
         .arguments(arguments)
        .build(),
        k);

    try {
        return k.getReply();
    } catch(ShutdownSignalException ex) {
        throw wrap(ex);
    }
}

这个方法最精简的只要两个参数即String queue和Consumer callbackpublic String basicConsume(String queue, Consumer callback)。

方法主要是发送Basic.Consume帧然后等待Basic.ConsumeOk帧。待收到broker端的Basic.ConsumeOk帧之后触发BlockingRpcContinuation中的transformReply()方法。有关BlockingRpcContinuation在[[五]RabbitMQ-客户端源码之AMQChannel][RabbitMQ-_AMQChannel]中有陈述。transformReply()方法先是提取consumerTag这个consumerTag是在channel.basicConsume()方法中设置的是其中的一个参数如果设置了此参数那么consumerTag就是这个参数的值如果没有设置这个consumerTagBroker会返回一个consumerTag类似amq.ctag-Mg0eSv2GgfG6UzfncD8E9g。然后作为key和Consumer这个回调函数一起放置到_consumer这个回调函数中以备后面检索调用。这个consumerTag还作为transformReply()方法的返回值存入到BlockingRpcContinuation对象中既而在basicConsume这个方法最后调用k.getReply()方法是获取其值也就是说basicConsume方法的返回值就是consumerTag。

当发送Basic.Consume帧之后由broker返回的是Basic.ConsumeOk帧+Basic.Deliver帧Basic.ConsumerOk帧由上面方法处理Basic.Deliver帧由processAsync处理。

说到basicConsume方法还有一个重要的就是设置Consumer这个回调函数。一般为了方便直接使用RabbitMQ客户端自带的QueueingConsumer来处理当然也可以实现一个自定义的Consumer当然了需要实现Consumer这个接口可以参考QueueingConsumer的父类DefaultConsumer, 有关Consumer相关的更多细节可以参考[[九]RabbitMQ-客户端源码之Consumer][RabbitMQ-_Consumer]。

dispatcher.handleConsumeOk(callback, actualConsumerTag);这段代码实际上就是callback.handleConsumeOk(actualConsumerTag)这个还是调用到Consumer的方法处理。


Basic.Get

上面的Basic.Consume是基于push模式的而Basic.Get是基于pull模式的。相关的代码如下

public GetResponse basicGet(String queue, boolean autoAck)
    throws IOException
{
    AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
                                              .queue(queue)
                                              .noAck(autoAck)
                                             .build());
    Method method = replyCommand.getMethod();

    if (method instanceof Basic.GetOk) {
        Basic.GetOk getOk = (Basic.GetOk)method;
        Envelope envelope = new Envelope(getOk.getDeliveryTag(),
                                         getOk.getRedelivered(),
                                         getOk.getExchange(),
                                         getOk.getRoutingKey());
        BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
        byte[] body = replyCommand.getContentBody();
        int messageCount = getOk.getMessageCount();
        return new GetResponse(envelope, props, body, messageCount);
    } else if (method instanceof Basic.GetEmpty) {
        return null;
    } else {
        throw new UnexpectedMethodError(method);
    }
}

基本上就是客户端发送Basic.Get至BrokerBroker返回Basic.GetOK并携带数据。注意方法最后返回GetResponse对象这个对象就是包装了一下数据。


事务

和事务有关的代码

/** Public API - {@inheritDoc} */
public Tx.SelectOk txSelect()
    throws IOException
{
    return (Tx.SelectOk) exnWrappingRpc(new Tx.Select()).getMethod();
}

/** Public API - {@inheritDoc} */
public Tx.CommitOk txCommit()
    throws IOException
{
    return (Tx.CommitOk) exnWrappingRpc(new Tx.Commit()).getMethod();
}

/** Public API - {@inheritDoc} */
public Tx.RollbackOk txRollback()
    throws IOException
{
    return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod();
}

这里可以看到基本对于事务的处理是采用rpc的方法一对一的进行交互有关RabbitMQ的事务机制可以参考[RabbitMQ之消息确认机制事务+Confirm][RabbitMQ_Confirm]。


其余

ChannelN还有

  • 关于ExchangeQueue的申明创建删除绑定解绑
  • 关闭处理
  • Basic.Return
  • Basic.Flow
  • Basic.Recover
  • Basic.Cancel
  • Basic.Ack/.Nack/.Reject

这些就不做详细介绍了。有兴趣的同学可以继续翻阅源码这些都比较简单。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ