RabbitMQ配置及其使用
文档编号:
当前版本:
1.0
作
评
审
批
者:
审:
核:
准:
文档状态:
草稿/发布版
编写日期:
评审日期:
审核日期:
批准日期:
变更次数:
I
文档修订记录
章节编号
1
版本号
修订内容简述
修订日期
作者
1.0
初始建立。
2018-05-25 王正顺
I
目录
1 引言 .................................................................................................................................................1
1.1 Mq 特点 ................................................................................................................................1
1.2 使用场景 ............................................................................................................................. 1
2 RabbitMQ 来源 ................................................................................................................................ 1
3 概念 .................................................................................................................................................2
4 消息持久 .........................................................................................................................................2
5 RabbitMQ 实例(Java)................................................................................................................ 2
5.1 环境配置 ............................................................................................................................. 2
5.2 集群配置 ............................................................................................................................. 3
5.3 例子一代码和效果 ............................................................................................................. 4
5.4 例子二代码和效果 ............................................................................................................. 6
II
1 引言
消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处
理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程
中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你
独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。消息队列可以解决
这样一个问题,也就是其解耦性。解耦伴随的好处就是降低冗余,灵活,易于扩展。
峰值处理能力:当你的应用上了 Hacker News 的首页,你将发现访问流量攀升到一个不
同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发
流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪
费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全
崩溃。
1.1
Mq特点
MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端
则可以读取或者订阅队列中的消息。MQ 和 JMS 类似,但不同的是 JMS 是 SUN JAVA 消息中间
件服务的一个标准和 API 定义,而 MQ 则是遵循了 AMQP 协议的具体实现和产品。
1.2 使用场景
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步
处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
2
RabbitMQ来源
RabbitMQ 是用 Erlang 实现的一个高并发高可靠 AMQP 消息队列服务器。
显然,RabbitMQ 跟 Erlang 和 AMQP 有关。下面简单介绍一下 Erlang 和 AMQP。
Erlang 是一门动态类型的函数式编程语言,它也是一门解释型语言,由 Erlang 虚拟机
解释执行。从语言模型上说,Erlang 是基于 Actor 模型的实现。在 Actor 模型里面,万物
皆 Actor,每个 Actor 都封装着内部状态,Actor 相互之间只能通过消息传递这一种方式来
进行通信。对应到 Erlang 里,每个 Actor 对应着一个 Erlang 进程,进程之间通过消息传递
进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁
开销(不考虑 Erlang 虚拟机底层实现中的锁应用)。
AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了
在一个分布式的系统中各个子系统如何通过消息交互。而 RabbitMQ 则是 AMQP 的一种基于
1
erlang 的实现。AMQP 将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系
统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。
这里不必要对 Erlang 和 AMQP 作过于深入介绍,毕竟本文 RabbitMQ 才是主角。下面直
接看主角表演(实例)啦,至于主角的一些不得不深入介绍的点我们放到最后面。
3 概念
Exchange:交换机,决定了消息路由规则;
Queue:消息队列;
Channel:进行消息读写的通道;
Bind:绑定了 Queue 和 Exchange,意即为符合什么样路由规则的消息,将会放置入哪一
个消息队列;
4 消息持久
1) 将交换机置为可持久
2) 将通道置为可持久
3) 消息发送时设置可持久。
当我们“生产”了一条可持久化的消息,尝试中断 MQ 服务,启动消费者获取消息,消
息依然能够恢复。相反,则抛出异常
5
RabbitMQ实例(Java)
5.1 环境配置
RabbitMQ 的运行需要 erlang 的支持,因此我们先安装 erlang。
64 位下载地址:http://www.erlang.org/download/otp_win64_18.2.1.exe
32 位下载地址:http://www.erlang.org/download/otp_win32_18.2.1.exe
双击选择默认安装就好。
前面我们也讲到 RabbitMQ 就是一个服务器,下面我们就安装对应服务器。
下载地址:
http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.e
xe
双击选择默认安装就好,安装好之后需要启动服务,cmd,进入到安装目录的 sbin 文件夹下,
命令如下:
2
cd C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.4\sbin
rabbitmq-server start
5.2 集群配置
单机环境下的集群配置:
首先启动两个实例,rab 和 rab2,端口分别为 9991 和 9992
RABBITMQ_NODE_PORT=9991 RABBITMQ_NODENAME=rab rabbitmq-server -detached
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
停止 rab2,并将其加入 cluster 集群中
rabbitmqctl -n rab2 stop_app
rabbitmqctl -n rab2 join_cluster rab@rab(@rab,这里的 rab 表示主机名)
重新启动 rab2
RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached
查看集群的状态
rabbitmqctl cluster_status -n rab
显示如下信息表示集群配置正常:
Cluster status of node rab@rab ...
[{nodes,[{disc,[rab2@rab,rab@rab]}]},
{running_nodes,[rab@rab]},
3
{partitions,[]}]
...done.
接下来自然是 jar 包依赖,本文工程采用 eclipse + maven,maven 依赖如下:
com.rabbitmq
amqp-client
3.0.4
commons-lang
commons-lang
2.6
5.3 例子一代码和效果
新建发送者 Send.java,代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] args) throws java.io.IOException
{
/**
* 创建连接连接到 MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置 MabbitMQ 所在主机 ip 或者主机名
factory.setHost("127.0.0.1");
//创建一个连接
Connection connection = factory.newConnection();
4
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world!";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null,
message.getBytes());
System.out.println("Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
新建接收者 Recv.java,代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
//队列名称
private final static String QUEUE_NAME = "queue";
public static void main(String[] args) throws java.io.IOException,
java.lang.InterruptedException
{
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
//设置 MabbitMQ 所在主机 ip 或者主机名
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages. To exit press
CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
5