RabbitMQ-客户端源码之ChannelManager

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

关于ChannelManager官方注解Manages a set of channels, indexed by channel number (1… _channelMax)。

ChannelManager类的代码量不是很多主要用来管理Channel的channelNumber=0的除外应为channelNumber=0是留给Connection的特殊的channelNumber。

下面是ChannelManager的成员变量

/** Monitor for _channelMap and channelNumberAllocator */
private final Object monitor = new Object();
    /** Mapping from 1.._channelMax to {@link ChannelN} instance */
    private final Map _channelMap = new HashMap();
    private final IntAllocator channelNumberAllocator;

private final ConsumerWorkService workService;

private final Set shutdownSet = new HashSet();

/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;

这上面的成员变量下面会有涉及。


对于ChannelManager的使用是AMQConnection中的成员变量

/** Object that manages a set of channels */
private volatile ChannelManager _channelManager;

AMQConnection中start()的_channelManager中对其初始化

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
    return new ChannelManager(this._workService, channelMax, threadFactory);
}

再调用其构造函数

public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
    if (channelMax == 0) {
        // The framing encoding only allows for unsigned 16-bit integers
        // for the channel number
        channelMax = (1 << 16) - 1;
    }
    _channelMax = channelMax;
    channelNumberAllocator = new IntAllocator(1, channelMax);

    this.workService = workService;
    this.threadFactory = threadFactory;
}

这里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService()

private void initializeConsumerWorkService() {
    this._workService  = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
}

再回到构造函数。

channelMax参数是在client接收到broker的Connection.Tune帧中的“Channel-Max”参数之后设置的如果为0则表示没有限制这里就会设置为默认的最大值2的16次方-1。
threadFactory参数是指Executors.defaultThreadFactory();

关于ConsumerWorkService请参考文章末尾处。


使用过RabbitMQ的同学知道要生产或者消费消息之前必须要初始化Channel,如下

Channel channel = connection.createChannel();

这个createChannel()是AMQConnection中的方法

public Channel createChannel(int channelNumber) throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    return cm.createChannel(this, channelNumber);
}
public Channel createChannel() throws IOException {
    ensureIsOpen();
    ChannelManager cm = _channelManager;
    if (cm == null) return null;
    return cm.createChannel(this);
}

这里就是调用了ChannelManager的createChannel方法。

下面是ChannelManager中关于创建Channel的代码

public ChannelN createChannel(AMQConnection connection) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
            return null;
        } else {
            ch = addNewChannel(connection, channelNumber);
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
    ChannelN ch;
    synchronized (this.monitor) {
        if (channelNumberAllocator.reserve(channelNumber)) {
            ch = addNewChannel(connection, channelNumber);
        } else {
            return null;
        }
    }
    ch.open(); // now that it's been safely added
    return ch;
}

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
    if (_channelMap.containsKey(channelNumber)) {
        // That number's already allocated! Can't do it
        // This should never happen unless something has gone
        // badly wrong with our implementation.
        throw new IllegalStateException("We have attempted to "
                + "create a channel with a number that is already in "
                + "use. This should never happen. "
                + "Please report this as a bug.");
    }
    ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
    _channelMap.put(ch.getChannelNumber(), ch);
    return ch;
}

protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
    return new ChannelN(connection, channelNumber, workService);
}

上面有两个createChannel方法一个是带了channelNumber的一个是自动分片channelNumber的分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。

注意两个createChannel方法中都有这样一句代码

ch.open();

这个是什么呢其实是调用ChannelN的open方法

/**
 * Package method: open the channel.
 * This is only called from {@link ChannelManager}.
 * @throws IOException if any problem is encountered
 */
public void open() throws IOException {
    // wait for the Channel.OpenOk response, and ignore it
    exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}

这样就调用了AMQChannel的rpc方法向broker发送了一个Channel.Open帧。

addNewChannel方法实际上是创建了一个ChannelN对象然后置其于ChannelManager中的_channelMap中方便管理。

channelNumberAllocator是channelNumber的分配器其原理是采用BitSet来实现channelNumber的分配有兴趣的同学可以深究进去看看。

关于ChannelN类会有专门一篇博文来讲述其实整个RabbitMQ-client的代码最关键的就是ChannelN这个类需要着重讲述。

细心的朋友可能会发现关于ConsumerWorkService这个我并没有做什么阐述。这个主要牵涉到Channel层面的处理涉及到的类有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化在ChannelManager中引用。至于这里怎么理解在ChannelN中这么解释
service for managing this channel’s consumer callbacks。意思是管理消费回调的服务。
综述ChannelManager主要用来管理Channel, 包括channelNumber与Channel之间的映射关系。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ