RabbitMQ配置教程(非框架整合)
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
1 安装Erlang(spring整合及死信队列将在下一篇讲解)
# wget http://erlang.org/download/otp_src_19.3.tar.gz
# tar zxvf otp_src_19.3.tar.gz
# cd otp_src_19.3
# ./configure --prefix=/opt/erlang
//如果上一步报错,则执行yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
# make && make install
# vim /etc/profile
ERLANG HOME=/opt/erlang
export PATH=$PATH:$ERLANG HOME/bin
export ERLANG_HOME
# source /etc/profile
# erl //检查是否安装成功
# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-generic-unix-3.6.12.tar.xz
# xz -d rabbitmq-server-generic-unix-3.6.12.tar.xz //转换压缩文件格式
# cd /opt
# mv rabbitmq_server-3.6.12 rabbitmq
# vim /etc/profile
export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq
# source /etc/profile
# rabbitmq-server -detached //启动rabbitmq服务
# rabbitmqctl status //查看rabbitmq状态,也可用rabbitmqctl cluster status查看集群信息
2 使用java程序连接测试其生产者消费者
rabbitmq模型图如下
引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.1</version>
</dependency>
rabbitmq默认在本地通过localhost访问时账号密码是guest/guest,但是远程的话就需要设置如下:
# rabbitmqctl add_user root root //新建root用户
# rabbitmqctl set_permissions -p / root ".*" ".*" ".*" //给root授予所有权限
# rabbitmqctl set_user_tags root administrator //设置root为管理员
编写消费者客户端代码如下:
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "xx.xxx.xxx.xx";
private static final int PORT = 5672;//RabbitMQ服务端默认端口为5672
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(); //创建链接
Channel channel = connection.createChannel(); //创建信道
// 创建一个type="direct"、持久的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 创建一个持久的、排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 将交换器与队列通过路由健绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条持久化的消息:hello world
String message = "hello world!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 关闭资源
channel.close();
connection.close();
}
}
编写消费者客户端代码如下:
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "134.175.124.102";
private static final int PORT = 5672;//RabbitMQ服务端默认端口为5672
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[] {
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
// 这里的连接方式与生产者的demo略有不同,注意辨别区别
Connection connection = factory.newConnection(addresses); //创建连接
final Channel channel = connection.createChannel(); //创建信道
channel.basicQos(64); //设置客户端最多接收未被ack的消息的个数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
// 等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |