RabbitMQ简单使用

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

       这篇文章通过一个最简单的例子让初学者能了解RabbitMQ如何完成生产消息和消息的。

        所有的程序员在学习一门新技术的时候都是从 Hello World 进入到Colorful World的本节也将按照惯例从HelloWorld开始演示RabbitMQ的Produce和Consumer的简单使用。本RabbitMQ系列的演示代码默认都是使用Java语言。

设置账号

        在开始HelloWorld之前需要注意的是RabbitMQ默认的账号是guest / guest这个账号有限制默认只能通过本地网络localhost访问远程访问受限制所以在实际发送和消费消息之前需要设置新的账号和设置权限。具体账号和权限的内容敬请关注后面的更新。

添加账号

我们为HelloWorld创建一个新的用户为root并设置密码为root后续Java客户端代码中使用这个root账号发送和消费消息。

[root@hidden -]# rabbitmqct1 add user root root
Creating user "root"

设置权限

在创建好账号之后就要为这个账号创建权限了。

[root@hidden - ]# rabbitmqct1 set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/"

设置角色

最后需要为这个账号添加角色这里我们添加管理员角色

[root@hidden - ]# rabbitmqct1 set user_tags root administrator
Setting tags for user "root" to [administrator]

经过上面的步骤root账号就已经创建成功也可以通过客户端链接rabbitmq的broker如果遇到下面的问题说明是账号出现的问题参考上面的步骤设置或者检查账号是否正确。

Exception in thread "main" com.rabbitmq.c1ient.AuthenticationFai1ureException :
ACCESS REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker 1ogfi1e.

添加Maven依赖

RabbitMQ的java版客户端的maven以来如下可以根据自己的环境选择具体的版本即可。

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupld>com.rabbitmq</groupld>
    <artifactld>amqp-client</artifactld>
    <version>${rabbitmq.version}</version>
</dependency>

Producer案例

Producer就是用来向MQ发送消息下面就是一个Producer的HelloWorld。

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {

    @Test
    public void helloWorld() throws IOException, TimeoutException {
        // 交换器名字
        String exchangeName = "helloworld_exchange";
        // 路由键
        String routingKey = "helloworld_routing_key";
        // 队列名字
        String queueName = "helloworld_queue";

        // 创建连接工厂用来创建具体的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        // 根据上面的设置信息创建具体的连接
        Connection connection = connectionFactory.newConnection();
        // 在创建的连接上创建一个通道
        Channel channel = connection.createChannel();

        // 在通道上声明交换器
        channel.exchangeDeclare(exchangeName, "direct", true, false, null);
        // 在通道上声明队列
        channel.queueDeclare(queueName, true, false, false, null);
        // 声明交换器和队列的绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        String message = "hello world";
        // 往通道上发送消息消息通过绑定键发送到指定的队列也就是上面申明的绑定关系的队列
        channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        
        // 关闭通过和连接
        channel.close();
        connection.close();

    }

}

Consumer案例

消息发送到MQConsumer就可以订阅队列并开始消费消息下面就是一个Consumer的HelloWorld。

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    @Test
    public void helloWorld() throws IOException, TimeoutException, InterruptedException {
        // 订阅的队列
        String queueName = "helloworld_queue";
        // 订阅队列所在的broker的地址信息这里演示另一种创建连接的方式
        Address[] addresses = {new Address("localhost", 5672)};

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection(addresses);

        Channel channel = connection.createChannel();
        // consumer端一般需要设置的值表示一次消费的消息数量的最大值
        channel.basicQos(64);

        // 创建默认的Consumer并实现回调函数在回调中确认消息发送ack
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(Thread.currentThread().getName() + " -> consumer : " + consumerTag + " , receive message : " + new String(body));

                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 消费消息异步执行
        String consumeTag = channel.basicConsume(queueName, consumer);
        System.out.println(consumeTag);

        // 等到消费消息完成并返回确认给broker
        TimeUnit.SECONDS.sleep(10);

        // 关闭资源
        channel.close();
        connection.close();
    }
}

以上就是通过一个HelloWorld了解下RabbitMQ的简单使用后续会不定期更新RabbitMQ的内容感兴趣的小伙伴敬请关注哦。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ