RabbitMQ
a complete and highly reliable
enterprise messaging system based
on the emerging AMQP standard
What is RabbitMQ?
• Robust messaging for applications
• Easy to use
• Runs on all major operating systems
• Supports a huge number of developer
platforms
• Open source and commercially supported
Install & Use
RabbitMQ安装过程:
yum install rabbitmq-server(用yum list rabbitmq-server可以看到我安装的版本是2.6.1)
sudo /etc/init.d/rabbitmq-server start
或者这样启动:rabbitmq-server -detached
rabbitmqctl --help
rabbitmqctl status
没有插件管理工具rabbitmq-plugins?http://stackoverflow.com/questions/8548983/how-to-install-rabbitmq-
management-plugin-rabbitmq-plugins
• 版本太低,插件都要手动去官网下载,太不方便了,改用官网释放的最新版本吧
•
•
•
http://www.rabbitmq.com/install-generic-unix.html 下载解压缩即可
整体配置:cat /home/dongsong/rabbitmq_server-3.0.0/sbin/rabbitmq-defaults
环境配置是CONF_ENV_FILE所指示的文件(http://www.rabbitmq.com/configure.html#customise-general-
unix-environment)
组件配置是CONFIG_FILE所指示的文件(http://www.rabbitmq.com/configure.html#configuration-file)
启动:[dongsong@localhost sbin]$ /home/dongsong/rabbitmq_server-3.0.0/sbin/rabbitmq-server -detached
./rabbitmq-server: line 85: erl: command not found
安装Erlang:http://www.erlang.org/download.html 下载 解压 ./configure --prefix=xx; make; make install
•
•
•
•
•
•
•
•
•
•
停止:/home/dongsong/rabbitmq_server-3.0.0/sbin/rabbitmqctl stop
状态:/home/dongsong/rabbitmq_server-3.0.0/sbin/rabbitmqctl status
启用管理插件: /home/dongsong/rabbitmq_server-3.0.0/sbin/rabbitmq-plugins enable
rabbitmq_management
http://server-name:15672/
概念
• channel 我理解就是连接上的一个操作标识(A
Channel is the primary communication method for
interacting with RabbitMQ. It is recommended that you do
not directly invoke the creation of a channel object in your
application code but rather construct the a channel by
calling the active connection’s channel()method.)
exchange 交换区,一个用于分发数据的虚拟的概念(将
queue与exchange绑定后,往exchange发送的消息会根据
路由规则进入相应的已绑定的queue)
queue 数据队列
当exchange为空字符串时,basic_publish发出的消息会
发往以routing_key为名字的queue里面去。(此时,
routing_key不再是普通路由过滤条件,而是queue的名
字)
Exchange
• fanout 简单分发,message直接输出到已
经绑定的queue
direct 定向分发,根据routing_key选择性
输出到已绑定的(指定了相同routing_key的
所有)queue
topic 多重选择输出,和direct类似,只是
routing_key由.连接单词组成、并可用*和#
作匹配
可由topic实现前两种类型
数据持久化
• queue持久化 channel.queue_declare(queue='task_queue',
durable=True)
message持久化 channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
如上操作还是可能会丢数据:服务器在rabbitmq收到数据和写
盘结束的时间区间内挂掉时
(http://www.rabbitmq.com/tutorials/tutorial-two-python.html)
官网说的解决办法是对于发数据的程序(生产者)需要做事务处
理(channel使用transaction模式),或者使用确认机制(confirm
mode)。且,两者不可同时使用
(http://www.rabbitmq.com/confirms.html)
使用心得
• 当多个消费者连接到同一个queue时,queue里面同一个消息只会被一个消费者拿到
如需要一个消息被多个消费者拿到,可用exchange绑定多个queue,消费者跟这些queue一一
对应,那么生产者往这个exchange发送的消息会被路由到多个queue里面、接着被多个消费者
都拿到(模式3)。
消费者获取消息时可指定是否需要
ack(channel.basic_consume(callback,queue=queue_name,no_ack=True)):
是,当消费者ack以后消息才会从queue里面删除,如果在ack之前消费者挂掉了,该消
息会被其他消费者拿到;
否,当消费者一拿到消息,消息就会从queue里面删除
当使用六种模式中的第二种时,可能会出现负载不均衡的情况:默认分发策略是一个消费
者发一个任务,不管它有多少没处理完;当消费者拿到比较耗时的任务时就不合理了。
解决办法:channel.basic_qos(prefetch_count=1) This tells RabbitMQ not to give more than one
message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it
has processed and acknowledged the previous one.
去年在互动做相关性分析时就有这个问题,一直以为无解呢!当时用的是activemq,当一个
消费者程序启动比较快,另几个启动晚一些的时候,queue里面的几个任务就都被先报到的
消费者预订了。结果是先到的消费者累死累活,后到的消费者闲着没事干。
对于模式6,不明白correlation_id和reply_to有什么必要性,返回结果的queue和message的
标识由业务逻辑控制打入message内部完全没问题嘛...这两个参数反正是由服务端(姑且这样
称呼吧,其实也是一个从queue中取message做计算并往另一个queue塞结果message的程序)
的业务逻辑解析的,又不是RabbitMQ内部针对这两个属性会做什么处理!相比而言,我觉得
delivery_mode才有意义,RabbitMQ会把指定该属性为2的message持久化到硬盘上。
•
Thank You !