Activemq-cpp 开发手册
丁靖
2008-05-06
1 引言
1.1 编写目的
快速学习 CMS,提高 CMS 开发效率,提供一个 CMS 开发参考手册
详细 API 手册请参考 http://activemq.apache.org/cms/api_docs/activemqcpp-2.1/
1.2 功能介绍
Activemq-cpp 是一个与 ActiveMQ 交互通讯的 C++ API 开发库,为 C++开发者提供了一个访
问 ActiveMQ 的接口。
Winkeemq-cpp 是一个在 Activemq-cpp 基础上封装的 API 库,对一些重复机械的初始化及销
毁清除及一些不关心的细节进行了封装,从而简化了编程。
1.3 术语解析
ActiveMQ :开源的消息队列服务器
Broker :消息中介,每个消息队列服务器中至少有一个 broker,是消息队列的载体
Destination :消息在 broker 上的目的地
Queue :消息队列
Topic :主题
Message :消息
Producer :消息产生者
Consumer :消息消费者
Client :客户端,生产者和消费者都在客户端上
Server :Activemq 服务器
BrokerUri :客户端访问服务器上 broker 时的 Uri
其它资料请参考 http://activemq.apache.org/
2 开发前准备
在开发前必须先安装 activemq-cpp 及 winkeemq-cpp 库,具体步骤参考《activemq-cpp 安装
及使用文档.doc》
3 CMS
3.1 概述
CMS(stands for C++ Messaging Service)是一组 C++应用程序接口(C++ API),它提供创建、
发送、接收、读取消息的服务。定义了一组和 Sun 公司和它的合作伙伴设计的 CMS API 相
同的公共应用程序接口和相应语法,使得 C++程序能够和其他消息组件进行通信。
CMS 是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database
Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 CMS 则提供
同样与厂商无关的访问方法,以访问消息收发服务。CMS 使您能够通过消息收发服务(有
时称为消息中介程序或路由器)从一个 CMS 客户机向另一个客户机发送消息。消息是
CMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消
息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来
划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对
象 (ObjectMessage) 、 属 性 集 合 (MapMessage) 、 字 节 流 (BytesMessage) 、 原 始 值 流
(StreamMessage),还有无有效负载的消息 (Message)。
消息收发系统是异步的,也就是说,CMS 客户机可以发送消息而不必等待回应。比较
可知,这完全不同于基于 RPC 的(基于远程过程的)系统,如 EJB 1.1、CORBA 和 Java
RMI 的引用实现。在 RPC 中,客户机调用服务器上某个分布式对象的一个方法。在方法
调用返回之前,该客户机被阻塞;该客户机在可以执行下一条指令之前,必须等待方法调用
结束。在 CMS 中,客户机将消息发送给一个虚拟通道(主题或队列),而其它 CMS 客户
机则预订或监听这个虚拟通道。当 CMS 客户机发送消息时,它并不等待回应。它执行发
送操作,然后继续执行下一条指令。消息可能最终转发到一个或许多个客户机,这些客户机
都不需要作出回应。
CMS 的通用接口集合以异步方式发送或接收消息。异步方式接收消息显然是使用间断
网络连接的客户 机,诸如移动电话和 PDA 的最好的选择。另外, CMS 采用一种宽松结合
方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企
业级应用程序,而把开发人力解放出来。
CMS 消息服务支持两种消息模型:Point-to-Point 消息(P2P)和发布订阅消息(Publish
Subscribe messaging,简称 Pub/Sub)。CMS 规范并不要求供应商同时支持这两种消息模型,
但开发者应该熟悉这两种消息模型的优势与缺点。
P2P 消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都
能够被处理,那么应该使用 P2P 消息模型。与 Pub/Sub 消息模型不同,P2P 消息总是能够被
传送到指定的位置。
Pub/Sub 模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被
接受的话,那么应用程序开发者也可以使用 Pub/Sub 消息模型。换句话说,它适用于所有的
消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情
况。
CMS 通过允许创建持久订阅来简化时间相关性,即使消息预订者未激活也可以接收到
消息。此外,使用 持久订阅还可通过队列提供灵活性和可靠性,而仍然允许消息被发给许
多的接收者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic,
subscriptionName); Connection 对象表示了到两种消息模型中的任一种的消息系统的连
接。服务器端和客户机端对象要求管理创建的 CMS 连接的状态。连接是由 Connection
Factory 创建的并且通过 JNDI 查寻定位。
//取得用于 P2P 的 QueueConnectionFactory
QueueConnectionFactory = queueConnectionFactory();
Context messaging = new InitialContext();
QueueConnectionFactory
= (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”);
//取得用于 pub/sub 的 TopicConnectionFactory
TopicConnectonFactory topicConnectionFactory;
Context messaging = new InitialContext();
topicConnectionFactory
= (TopicConnectionFactory)messaging.lookup(“TopicConnectionFactory”);
注意:用于 P2P 的代码和用于 PublishSubscribe 的代码非常相似。
如果 session 被标记为 transactional 的话,确认消息就通过确认和校正来自动地处理。如
果 session 没有标记为 transactional,你有三个用于消息确认的选项。
· AUTO_ACKNOWLEDGE session 将自动地确认收到一则消息。
· CLIENT_ACKNOWLEDGE 客户端程 序将确认收到一则消息,调用这则消息的确认
方法。 · DUPS_OK_ACKNOWLEDGE 这个选项命令 session“懒散的”确认消息传递,
可以想到,这将导致消息提供者传递的一些复制消息可能会出错。这种确认的方式只应当用
于消息消费程序 可以容忍潜在的副本消息存在的情况。
queueSession
=queueConnection.createQueueSession(false,session.AUTO_ACKNOWLEDGE);//P2P
topicSession
= topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); //Pub-Sub
注意:在本例中,一个 session 目的从连结中创建,非值指出 session 是 non-transactional
的,并且 session 将自动地确认收到一则消息。
CMS 现在有两种传递消息的方式。标记为 NON_PERSISTENT 的消息最多投递一次,
而标记 为 PERSISTENT 的消息将使用暂存后再转送的机理投递。如果一个 CMS 服务离线,
那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。 所以默认的消息传
递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递
方式只有当你不需要接收所有的消息时才使用。
虽然 CMS 规范并不需要 CMS 供应商实现消息的优先级路线,但是它需要递送加快的
消息优先于普通级别的消息。CMS 定义了从 0 到 9 的优先级路线级别,0 是最低 的优先级
而 9 则是最高的。更特殊的是 0 到 4 是正常优先级的变化幅度,而 5 到 9 是加快的优先级的
变化幅度。举例来说: topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000);
//Pub-Sub 或 queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);//P2P 这
个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是 10000
(以毫秒度量 )。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制
否则将使其无效时,设置生存周期是有用的。
CMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不
同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java 原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象
· ObjectMessage--一个序列化的 Java 对象
· BytesMessage--一个未解释字节的数据流
CMS 应用程序接口提供用于创建每种类型消息和设置荷载的方法例如,为了在一个队
列创建并发送一个 TextMessage 实例,你可以使用下列语句:
TextMessage message = queueSession.createTextMessage();
message.setText(textMsg);
以 异 步 方 式 接 收 消 息 , 需 要 创 建 一 个 消 息 监 听 器 然 后 注 册 一 个 或 多 个 使 用
MessageConsumer 的 CMS MessageListener 接口。会话(主题或队列)负责产生某些消息,这
些消息被传送到使用 onMessage 方法的监听者那里。
Using namespace cms;
class ExampleListener : public MessageListener {
//把消息强制转化为 TextMessage 格式
public void onMessage(Message message) {
TextMessage textMsg = null;
// 打开并处理这段消息
}
}
当我们创建 QueueReceiver 和 TopicSubscriber 时,我们传递消息选择器字符串:
//P2P QueueReceiver
QueueReceiver receiver;
receiver = session.createReceiver(queue, selector);
//Pub-Sub TopicSubscriber
TopicSubscriber subscriber;
subscriber = session.createSubscriber(topic, selector);
为 了 启 动 消 息 的 交 付 , 不 论 是 Pub/Sub 还 是 P2P , 都 需 要 调 用 start 方 法 。
TopicConnection.start(); //pub-sub
QueueConnection.start(); //P2P
当一条消息被捕捉时,这条消息做为一条必须被强制转化为适当消息类型的普通
Message 对象到达。如 TextMessage
void onMessage(const Message* message){
TextMessage txtMsg=dynamic_cast(message);
…
//对 txtMsg 做一些处理
}
停止消息的传递,无论是 Pub/Sub 还是 P2P,都调用 stop 方法。
TopicConnection. stop (); //pub-sub
QueueConnection. stop (); //P2P
3.2 接口描述
CMS 支持两种消息类型 P2P 和 Pub/Sub,分别称作:P2P Domain 和 Pub/Sub Domain,这
两种接口都继承统一的 CMS Parent 接口,CMS 主要接口如下所示:
CMS Parent
ConnectionFactory
Connection
Destination
Session
MessageProducer
MessageConsumer
以下是对这些接口的简单描述:
ConnectionFactory :连接工厂,CMS 用它创建连接
Connection :CMS 客户端到 CMS Provider 的连接
Destination :消息的目的地
Session: 一个发送或接收消息的线程
MessageProducer: 由 Session 对象创建的用来发送消息的对象
MessageConsumer: 由 Session 对象创建的用来接收消息的对象
3.3 CMS 消息模型
CMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(Header) - 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性
如:CMSDestination,CMSMessageID 等。
消息头
由谁设置
CMSDestination
send 或 publish 方法
CMSDeliveryMode
send 或 publish 方法
CMSExpiration
send 或 publish 方法
CMSPriority
send 或 publish 方法
CMSMessageID
send 或 publish 方法
CMSTimestamp
send 或 publish 方法
CMSCorrelationID
CMSReplyTo
CMSType
客户
客户
客户
CMSRedelivered
CMS Provider
属性(Properties) - 除了消息头中定义好的标准属性外,CMS 提供一种机制增加新属性
到消息头中,这种新属性包含以下几种:
1. 应用需要用到的属性;
2. 消息头中原有的一些可选属性;
3. CMS Provider 需要用到的属性。
标准的 CMS 消息头包含以下属性:
CMSDestination --消息发送的目的地
CMSDeliveryMode --传递模式, 有两种模式: PERSISTENT 和 NON_PERSISTENT,
PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT
表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可靠性和吞吐量之
间找到平衡点。
CMSMessageID 唯一识别每个消息的标识,由 CMS Provider 产生。
CMSTimestamp 一个消息被提交给 CMS Provider 到消息被发出的时间。
CMSCorrelationID 用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。
CMSReplyTo 提供本消息回复消息的目的地址。
CMSRedelivered 如果一个客户端收到一个设置了 CMSRedelivered 属性的消息,则表示
可能该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。
CMSType 消息类型的识别符。
CMSExpiration 消息过期时间,等于 QueueSender 的 send 方法中的 timeToLive 值或
TopicPublisher 的 publish 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果
timeToLive 值等于零,则 CMSExpiration 被设为零,表示该消息永不过期。如果发送后,在
消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
CMSPriority 消息优先级,从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。CMS 不
要求 CMS Provider 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息
到达。
消息体(Body) - CMS API 定义了 4 种消息体格式,也叫消息类型,你可以使用不同形式
发送接收数据并可以兼容现有的消息格式,下面描述这 4 种类型:
消息类型
消息体
TextMessage
string 对象,如 xml 文件内容
MapMessage
名/值对的集合,名是 string 对象,值类型可以是 c++任何基本类型
BytesMessage
ObjectMessage
字节流
对象类型
Message
没有消息体,只有消息头和属性。
下例演示创建并发送一个 TextMessage 到一个队列:
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
message.setCMSType(“text”);
queueSender.send(message);
下例演示接收消息并转换为合适的消息类型:
Message* m = queueReceiver.receive();