如何使用MQ(java代码实现)

<!--指定jdk版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.7</version>
        </dependency>
    </dependencies>

生产者 - 消息队列 - 消费者

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/*
生产者,目标是发消息
 */
public class Product {
    //队列名称

    public static  final  String QUEUE_NAME="hello";

    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //建立一个连接方式
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("www.littlehei.fun");
        //用户名
        factory.setUsername("guest");

        //密码
        factory.setPassword("guest");

        //创建链接
        Connection connection = factory.newConnection();

        //获取信道
        Channel channel = connection.createChannel();

        //生成一个队列
        /*
        生成队列
        参数1,队列名称
        参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中
        参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费
        参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除
        参数5,其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message = "hello world";//初次使用

        /*
        发送一个消费
        参数1,发送到哪个交换机
        参数2.路由的key值是哪个?  本次是队列的名称
        参数3,其他参数配置
        参数4,发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送成功");
    }

}

rabbitMQ学习-pom配置和生产者消费者_rabbitmq


消费者:用来接受生成者产生的消息。

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
消费者,用来消费生成者产生的代码
 */
public class Consumer {
    //队列的名称:
    private static  final String QUEUE_NAME="hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("www.littlehei.fun");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection =  factory.newConnection();
        Channel channel =  connection.createChannel();

        //声明
        DeliverCallback deliverCallback = (consumerTag,message)->{
         //  String message = new String("自己手动去创建一个消息,但是不推荐");
            System.out.println(new String(message.getBody()));
        };

        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息消费被中断");
        };

        /*
        消费者接收消息
        参数1,消费哪个队列
        参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
        参数3,消费者未成功消费的回调
        参数4,消费者取录消费的回到
         */

      channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

工作队列

生成者 —大量发消息-- 队列 – 接到消息–工作线程1或者工作线程2.。。。
轮训处理消息,你一个,我下一个,他下下个。

不同工作线程之间的关系是竞争关系

创建链接工具类:

public class GetConnection {
    //建立一个工具类,每次都直接使用,减少代码重复量
    public  static Channel getChannel() throws IOException, TimeoutException {
        //创建一个链接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("ip地址");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

轮训分发代码:

/*
一个工作线程,可以多个创建,可以多线程创建,具体看你自己如何定于
 */
public class WorkThread1 {
    //首先还是创建一个队列名称
    public  static final   String  QUEUE_NAME = "hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

          /*
        消费者接收消息
        参数1,消费哪个队列
        参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
        参数3,消费者未成功消费的回调
        参数4,消费者取录消费的回到
         */
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("接收到的消息为" + new String(message.getBody()));
        };

        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println("消息被取消消费接口回调");
        };
        System.out.println("C1等待接收消息...");

        //消息接收
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

生产者:

/*
生产者,可以发送大量消息
 */
public class Product1 {

    //队列名称
    public  static final  String QUEUE_NAEM="hello";

    //发送大量消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAEM,false,false,false,null);

        //发送消息
        //从控制台中输入
        Scanner sc = new Scanner(System.in);
        //判断是否有下一个消息输入

        while (sc.hasNext()){
            String name = sc.next();
            channel.basicPublish("",QUEUE_NAEM,null,name.getBytes());
            System.out.println("发送完成: "+name);
        }

    }

}
----------结果----------
nihap
发送完成nihap
wp1
发送完成wp1
ci1
发送完成ci1
wooda
发送完成wooda
我喜欢你
发送完成我喜欢你
------------------
C1等待接收消息...
接收到的消息为nihap
接收到的消息为ci1
接收到的消息为我喜欢你
--------------------
C2等待接收消息...
接收到的消息为wp1
接收到的消息为wooda


阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: RabbitMQ