RabbitMQ-客户端源码之ChannelN
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
ChannelN是整个RabbitMQ客户端最核心的一个类了其包含的功能点甚多这里需要分类阐述。
首先来看看ChannelN的成员变量:
private final Map _consumers = Collections.synchronizedMap(new HashMap());
private volatile Consumer defaultConsumer = null;
private final ConsumerDispatcher dispatcher;
private final Collection returnListeners = new CopyOnWriteArrayList();
private final Collection flowListeners = new CopyOnWriteArrayList();
private volatile CountDownLatch finishedShutdownFlag = null;
private final Collection confirmListeners = new CopyOnWriteArrayList();
private long nextPublishSeqNo = 0L;
private final SortedSet unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet());
private volatile boolean onlyAcksReceived = true;
源代码中有关ChannelN的呈现顺序有所不同这里博主为了区分开来重新排了序。
processAsync(Command command)
在AMQChannel这个抽象类中唯一的抽象方法即为此方法这个方法主要用来针对接受到broker的AMQCommand进行进一步的处理至于怎么接受Socket怎么封装成帧怎么确定一个AMQComand已经封装完毕都已在调用此方法前完成。此方法可以处理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等这些从broker端回传的AMQComand.
这个方法也比较长下面也会涉及到这个方法内的内容。
Confirm.Select & Basic.Publish
在[RabbitMQ之消息确认机制(事务+Confirm)][RabbitMQ_Confirm]这篇文章中博主就讲到RabbitMQ的producer端确认机制分为事务机制和Confirm机制这里就来阐述下Confirm机制的内部实现。
和Confirm机制有关的成员变量有:
private final Collection confirmListeners = new CopyOnWriteArrayList();
private long nextPublishSeqNo = 0L;
private final SortedSet unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet());
private volatile boolean onlyAcksReceived = true;
在使用Confirm机制的时候首先要置Channel为Confirm模式即向broker端发送Confirm.Select。
业务代码(DEMO实例):
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//TODO
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//TODO
}
});
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.waitForConfirms();
在创建完Channel之后调用channel.confirmSelect()方法即可confirmSelect()代码如下:
public Confirm.SelectOk confirmSelect()
throws IOException
{
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
return (Confirm.SelectOk)
exnWrappingRpc(new Confirm.Select(false)).getMethod();
}
这里的成员变量nextPublishSeqNo是用来为Confirm机制服务的当Channel开启Confirm模式的时候nextPublishSeqNo=1标记第一条publish的序号当Publish时:
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
}
client端向broker端Basic.Pubish发送消息并将当前的序号加入到unconfirmedSet中并自加nextPublishSeqNo++等待下一个消息的发送。
有关Confirm.Select的详细用法可以参考:[RabbitMQ之消息确认机制(事务+Confirm)][RabbitMQ_Confirm]
之后等待broker的确认回复(Basic.Ack/.Nack):channel.waitForConfirms()
public boolean waitForConfirms(long timeout)
throws InterruptedException, TimeoutException {
if (nextPublishSeqNo == 0L)
throw new IllegalStateException("Confirms not selected");
long startTime = System.currentTimeMillis();
synchronized (unconfirmedSet) {
while (true) {
if (getCloseReason() != null) {
throw Utility.fixStackTrace(getCloseReason());
}
if (unconfirmedSet.isEmpty()) {
boolean aux = onlyAcksReceived;
onlyAcksReceived = true;
return aux;
}
if (timeout == 0L) {
unconfirmedSet.wait();
} else {
long elapsed = System.currentTimeMillis() - startTime;
if (timeout > elapsed) {
unconfirmedSet.wait(timeout - elapsed);
} else {
throw new TimeoutException();
}
}
}
}
}
可以看到waitForConfirms其实本质上是在等待unconfirmedSet变成empty否则就线程wait()。
当接收到broker端的ACK/NACK回复时一步步的经过处理到达processAsync(Command command)方法然后进而处理Basic.Ack/.Nack帧。
else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
}
首先是将相应的Method做一下转换之后callConfirmListeners()这个方法是调用成员变量confirmListeners这个list里的所有的ConfirmListener:
private final Collection confirmListeners = new CopyOnWriteArrayList();
这个ConfirmListener的list就需要在channel.basicPushlish()调用之前先:
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//TODO
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//TODO
}
});
在调用完ConfirmListener之后继续调用handleAckNack方法:
private void handleAckNack(long seqNo, boolean multiple, boolean nack) {
if (multiple) {
unconfirmedSet.headSet(seqNo + 1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
synchronized (unconfirmedSet) {
onlyAcksReceived = onlyAcksReceived && !nack;
if (unconfirmedSet.isEmpty())
unconfirmedSet.notifyAll();
}
}
这个方法本意上是对收到某条消息的ACK或者NACK的处理发送消息时Basic.Publish的nextPublishNo对应于相应的ACK/NACK的deliveryTag将其从unconfirmedSet中删除即可如果有NACK帧则将其相应的标识onlyAcksReceived设置为false判断此时unconfirmedSet是否为空如果条件成立则notifyAll()将waitForConfirm唤起返回onlyAcksReceived的状态。
如果channel.waitForConfirm()返回为false则说明broker没有接受client发送的消息此时需要在业务代码中做进一步处理比如重发。