RabbitMQ消息队列实战(2)—— Java调用RabbitMQ的三种方式
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
本文主要介绍Java中调用RabbitMQ的三种方式。三种方式实际上对应了三种不同的抽象级别
首先通过Java原生代码来访问RabbitMQ。在这种方式下需要手动创建Connection创建Channel然后通过Channel对象可以显式的创建Exchange、Queue和Bind等等。这种方式的好处就是使得我们能够很显式地了解到整个RabbitMQ操作的生命周期建议新手可以通过这种方式学习RabiitMQ的入门。
spring-boot-starter-amqp对RabbitMQ的使用进行了进一步的封装通过这种方式使用集成到spring boot中的RabbitMQ时我们不再关心Connect和Channel的创建spring boot会替我们创建好。我们索要做的只是通过注解的方式创建Exchange、Queue和Bind对象并把他们交给spring ioc进行管理然后spring boot又会自动生成这些对象对应的交换机、队列和绑定。
Java中操作RabbitMQ的最后一种方法是通过EDAEvent Driven Achitecture事件驱动架构框架的spring cloud stream。spring cloud stream对RabitMQ准确的说应该是消息队列封装的更加彻底我们甚至不用关心使用的消息队列是RabbitMQ还是Kafkaspring cloud stream可以配置RabbitMQ和Kafak两种消息队列并进行无缝切换。在使用时spring cloud stream时只需一个标签就能自动创建RabitMQ的Connection、Chanel甚至你都不用关心Exchange、Queue和Bind这些在spring-boot-starter-amqp中还需要手动创建的对象他们就被创建好了。spring cloud stream的强大之处就在于它的封装但是不足之处也在于它的封装封装的太强必然增加了学习成本和调试难度而且类似RabbitMQ和Kafka这种中间件的使用一般在系统创建之处就一定确定进行无缝切换就显得有些鸡肋了。
下面我们就以代码的方式演示这三种调用RabbitMQ的方式
一、Java原生代码调用RabbitMQ
1.1 交换机和队列的创建
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.rabbitMqHost);
factory.setPort(this.rabbitMqPort);
factory.setConnectionTimeout(this.rabbitMqTimeOut);
factory.setUsername(this.rabbitMqUsername);
factory.setPassword(this.rabbitMqPassword);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
channel.queueDeclare("test-queue", true, false, false, null);
com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
channel.basicPublish("direct-exchange", "test-queue", null, msg.getBytes("UTF-8"));
上述的代码创建了一个直连交换机、一个队列并进行绑定最后向交换机中发送了一个"Hello World"的字符串。
1~7行创建了一个ConnectionFactory 对象并进行配置配置的参数包括RabbitMQ的ip地址host端口号port、超时connectionTimeout等等。
第8行通过ConnectionFactory 对象创建了一个Connection 对象此时已经完成了对RabbitMQ服务器的连接。如果我们通过RabitMQ Magement Web查看可以看到这个链接。
第9行创建用来通信的信道Channel。
第10行声明和创建交换机。这里exchangeDeclare有五个参数。第1个参数指定了交换机的名称第2个参数指定了交换机的类型direct、topic或者fanout第3个参数指定交换机是否要持久化如果设置为true那么交换机的元数据要持久化到内存中第4个参数指定交换机在没有队列与其绑定时是否删除设置为false表示不删除最后一个参数是Map<String, Object>类型用来指定交换机其它一些结构化的参数我在这里直接设置为null。
第11行声明了一个名为test-queue的队列。queueDeclare有5个参数第1个参数指定了队列的名称第2个参数表示队列是否要持久化但是需要注意这里的持久化只是队列名称等这些队列元数据的持久化不是队列中消息的持久化。第3个参数表示队列是不是私有的如果是私有的只有创建它的应用程序才能从队列消费消息第4个参数表示队列在没有消费者订阅时是否自动删除第5个参数是队列的一些结构化信息比如声明死信队列、磁盘队列会用到。
第12行创建了一个bind对象将交换机和队列进行绑定queueBind的三个参数中第1个参数指定了队列名称第2个参数指定了交换机名称第3个参数是路由键在直连模式下为队列名称。
第13行发送消息在直连模式下需要指定直连交换机名称参数1路由键参数2也就是目标队列名称参数3类型为BasicProperties可以为消息附带一些额外的附件比如在使用RabbitMQ远程RPC调用模式发送消息时可以用到这里直接设置为null。参数4就是要发送的消息转换成的二进制数组。
上面就是一个创建direct exchange和queue并发送消息的例子如果要使用topic exchange或者fanout exchange只需要一些小小的改动即可。
比如创建topic exchange要明确指明交换机的类型为topic
channel.exchangeDeclare("topic-exchange", "topic", true, false, null);
绑定时指定主题为路由键
channel.queueBind("test-queue", "topic-exchange", "fruit");
发送消息时指定主题为路由键
channel.basicPublish("topic-exchange", "fruit", null, msg.getBytes("UTF-8"));
再比如创建fanout exchange要明确指明交换机的类型为fanout
channel.exchangeDeclare("fanout-exchange", "fanout", true, false, null);
绑定时指定路由键为空
channel.queueBind("test-queue", "fanout-exchange", "");
发布消息时指定路由键为空
channel.basicPublish("fanout-exchange", "", null, msg.getBytes("UTF-8"));
1.2 消费者订阅队列的消息
在上面的例子中我们演示了创建direct、topic和fanout三种类型的exchange以及关联好了队列现在我们创建一个消费者来订阅队列里面的消息。首先实现Consumer接口
public class TestConsumer implements Consumer {
@Override
public void handleConsumeOk(String s) {
System.out.println(s);
}
@Override
public void handleCancelOk(String s) {}
@Override
public void handleCancel(String s) throws IOException {}
@Override
public void handleShutdownSignal(String s, ShutdownSignalException e) {}
@Override
public void handleRecoverOk(String s) {}
@Override
public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String str = new String(bytes);
System.out.println("接受到的字符串是" + str);
}
}
Consumer接口的方法有6个在这里我们只用到了2个handleConsumeOk在消费者获取到消息后调用而handleDelivery是在调用handleConsumeOk后调用。我们业务的主要逻辑在handleDelivery中因为在这个方法之中我们可以获取到消息并进行相应的处理。实现了自己的消费者接下来需要用该消费者订阅队列
// 创建连接和信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.rabbitMqHost);
factory.setPort(this.rabbitMqPort);
factory.setConnectionTimeout(this.rabbitMqTimeOut);
factory.setUsername(this.rabbitMqUsername);
factory.setPassword(this.rabbitMqPassword);
factory.setVirtualHost("/");
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new TestConsumer();
channel.basicConsume("test-queue", true, consumer);
while (true) {
Thread.sleep(3600000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null && connection.isOpen()) {
connection.close();
}
}
第1~12行和生产者创建Connection的过程一致。
第13行创建了一个TestConsumer对象。
第14行订阅队列test-queue中的消息。basicConsume方法有三个参数第1个参数是指明要订阅的通道的名称第2个参数指明是否自动ack如果是true这个方法结束后会自动进行ack如果是false需要额外手动的ack第3个参数就是装配的消费者。
第15~17行没有业务上的功能只是单纯不让程序结束。
当运行了消费者以后就可以看到消费者消费了队列中的消息。
消费者打印消息
队列中的消息已经清空
以上就是通过Java原生的代码调用RabbitMQ的例子接下来我们学习下另外一种调用RabbitMQ的方式通过 spring-boot-starter-amqp调用。
二、 spring-boot-starter-amqp调用RabbitMQ
2.1 生产者
首先先要创建spring boot代码工程并且pom文件中引入spring-boot-starter-amqp的依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建RabbitMQ的配置
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
//队列 起名TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效
// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue", true, false, false);
}
//Direct交换机 起名TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键TestDirectQueue
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
}
}
可以看到在上面的代码中我们创建了connectionFactory、rabbitTemplate、TestDirectQueue、TestDirectExchange和bindingDirect我想从bean name大家也都能猜到这是在创建RabbitMq的连接工厂、交换机、队列和绑定等等。其中rabbitTemplate我们在上文中没有接触过实际上这就是spring boot对RabbitMQ根据bean创建信道、交换机等基本组件的封装利用模板方法模式将创建过程进行了隐藏也对消息的发布和订阅过程进行了隐藏。
完成了上述配置之后运行程序是不是就自动创建了这些交换机、通道等等呢也许你会回答是但是笔者在运行程序后查看RabbitMQ的web发现还是一片空白
是我们没有创建成功其实不然是因为rabbitTemplate在创建这些组件时是采用的懒加载模式只有在发送消息之前才会去真正创建这些交换机、通道等等。所以接下来我们创建一个工具类并在单元测试中通过该工具类发送消息
@Service
public class RbmqServiceImpl implements RbmqService {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void sendMsg(String str) {
Message message=new Message(str.getBytes());
rabbitTemplate.send("TestDirectExchange","TestDirectQueue",message);
}
}
在单元测试中测试消息的发送
@Test
public void sendMsgTest() {
rbmqService.sendMsg("Hello World");
}
此时再去看RabbitMQ的管理Web发现交换机和队列都创建完成而且队列中也缓存了消息
2.2 消费者
前面介绍了使用spring boot集成的RabbitMQ组件创建生产者的方法下面我们介绍下消费者的创建。
配置类RabbitConfig的定义基本和生产者一致这里不再赘述我们重点介绍下使用@RabbitListener标签监听队列的方法。首先还是创建一个工具类
@Service
public class RbmqServiceImpl implements RbmqService {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "TestDirectQueue")
@Override
public void consumer(String msg) {
System.out.println("接受到的消息是" + msg);
}
}
可以看到通过RabbitListener标签我们直接实现了订阅TestDirectQueue队列。此时运行程序会接受并打印我们用生产者发送的消息
@RabbitListener标签除了指定监听的队列之外还可以创建交换机和队列并进行绑定然后再开启监听
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "TestTopicQueue", durable = "true"),
exchange = @Exchange(value = "orderTopicExchange", type = ExchangeTypes.TOPIC),
key = "TestTopicRouting")
)
以上就是使用spring-boot-starter-amqp集成RabbitMQ的方法最后我们学习下使用spring cloud stream来操作RabbitMQ。
三、spring cloud stream调用RabbitMQ
3.1 生产者
新建一个spring-boot工程pom文件中引入以下依赖
<dependencyManagement>
<dependencies>
<!-- spring-cloud-dependencies start-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring-cloud-dependencies end-->
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
同样在配置文件application.yml中进行RabbitMQ链接信息的配置
server:
port: 8022
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虚拟host 可以不设置,使用server默认host
#virtual-host: JCcccHost
使用spring cloud stream方式调用RabbitMQ首先要创建绑定接口
public interface OutputMessageBinding {
/** Topic 名称*/
String OUTPUT = "message-center";
@Output(OUTPUT)
MessageChannel output();
}
这里需要说明一下
第5行通过Output标签指定了消息的输出exchange这里会创建一个名称为message-center-out的exchange而且类型为topic。通过Spring cloud stream创建的exchange默认的类型都是topic。
接下来我们需要再次创建一个工具类
@Service
@EnableBinding(OutputMessageBinding.class)
public class RbmqServiceImpl implements RbmqService {
@Resource
private OutputMessageBinding outputMessageBinding;
@Override
public void sendMsg(String msg) {
outputMessageBinding.output().send(MessageBuilder.withPayload(msg).build());
}
}
第2行中的EnableBinding就是为了激活绑定类OutputMessageBinding。OutputMessageBinding被激活之后会产生一个名称为outputMessageBinding的bean托管到IOC中然后在第6行获取到了这个bean。在第10行中获取outputMessageBinding的output对象进行消息的发送。
在单元测试中对发送信息的接口进行调用
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class RbmqTest {
@Autowired
RbmqService rbmqService;
@Test
public void sendMsgTest() {
rbmqService.sendMsg("Hello World");
}
}
然后在web上可以看到新建的topic类型的exchange
3.1 消费者
消费者的创建和使用过程其实和生产者比较类似首先也是需要创建一个绑定接口
public interface InputMessageBinding {
String INPUT = "message-center";
@Input(INPUT)
SubscribableChannel input();
}
在消费者的绑定接口中使用@Input标签用来表明该对象为消费者对象。接下来同样也需要创建一个工具类并注入到IOC容器中
@Service
@EnableBinding({InputMessageBinding.class})
public class RbmqServiceImpl implements RbmqService {
@StreamListener(InputMessageBinding.INPUT)
@Override
public void consume(String msg) {
System.out.println("接受到的消息是" + msg);
}
}
可以看到在第8行我们使用了StreamListener标签监听了创建的InputMessageBinding的INPUT字段StreamListener会在内部进行处理实际上监听的是名名为message-center+随机字符串的队列而队列和message-center也自动进行了绑定
看下上面红色圈出部分的队列和交换机的绑定你会发现绑定的路由键为'#'表示路由匹配任意规则也就是说从名为message-center的exchange发出的消息都会路由到该队列上。
至此Java中三种操作RabbitMQ的方式都已经介绍完毕。