1,ActiveMQConnection下的接受方法

public void onCommand(final Object o) { 

         final Command command = (Command)o; 

         if (!closed.get() && command != null) { 

             try { 

                 command.visit(new CommandVisitorAdapter() { 

                     @Override 

                     public Response processMessageDispatch(MessageDispatch md) throws Exception { 

                         waitForTransportInterruptionProcessingToComplete(); 

                         ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 

                         if (dispatcher != null) { 

                             // Copy in case a embedded broker is dispatching via 

                             // vm:// 

                             // md.getMessage() == null to signal end of queue 

                             // browse. 

                             Message msg = md.getMessage(); 

                             if (msg != null) { 

                                 msg = msg.copy(); 

                                 msg.setReadOnlyBody(true); 

                                 msg.setReadOnlyProperties(true); 

                                 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 

                                 msg.setConnection(ActiveMQConnection.this); 

                                 md.setMessage(msg); 

                             } 

                             dispatcher.dispatch(md); 

                         } 

                         return null; 

                     } 

}





2,MessageDispatch 类实例

MessageDispatch {commandId = 4, responseRequired = false, consumerId = ID:shijiangyong-3522-1411886444218-1:1:-1:1, destination = topic://ActiveMQ.Advisory.Queue, message = ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:shijiangyong-1615-1411873155375-0:0:0:0:1, originalDestination = null, originalTransactionId = null, producerId = ID:shijiangyong-1615-1411873155375-0:0:0:0, destination = topic://ActiveMQ.Advisory.Queue, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@9b5027, dataStructure = DestinationInfo {commandId = 0, responseRequired = false, connectionId = null, destination = queue://resendqueue3, operationType = 0, timeout = 0, brokerPath = null}, redeliveryCounter = 0, size = 0, properties = {originBrokerId=NOT_SET}, readOnlyProperties = false, readOnlyBody = false, droppable = false}, redeliveryCounter = 0}



3,ActiveMQConnection类实例

ActiveMQConnection {id=ID:shijiangyong-3522-1411886444218-1:1,clientId=ID:shijiangyong-3522-1411886444218-0:1,started=true}



4,AdvisoryConsumer实现了ActiveMQDispatcher接口
 

public void dispatch(MessageDispatch md) { 



         // Auto ack messages when we reach 75% of the prefetch 

         deliveredCounter++; 

         if (deliveredCounter > (0.75 * info.getPrefetchSize())) { 

             try { 

                 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter); 

                 connection.asyncSendPacket(ack); 

                 deliveredCounter = 0; 

             } catch (JMSException e) { 

                 connection.onClientInternalException(e); 

             } 

         } 



         DataStructure o = md.getMessage().getDataStructure(); 

         if (o != null && o.getClass() == DestinationInfo.class) { 

             processDestinationInfo((DestinationInfo)o); 

         } else { 

             //This can happen across networks 

             if (LOG.isDebugEnabled()) { 

                 LOG.debug("Unexpected message was dispatched to the AdvisoryConsumer: "+md); 

             } 

         } 
    }


附图

activemq消息分发MessageDispatch_ide

activemq消息分发MessageDispatch_sed_02

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6