logo资料库

实现Java与MQTT的简单通讯(附带调试方法).pdf

第1页 / 共5页
第2页 / 共5页
第3页 / 共5页
第4页 / 共5页
第5页 / 共5页
资料共5页,全文预览结束
实现实现Java与与MQTT的简单通讯(附带调试方法) 的简单通讯(附带调试方法) MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中, 如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。 MQTT可以由多种语言来实现,下面来记录一下Java语言的简单实现 、首先在idea里面配置所需库 1、首先在 里面配置所需库 https://pan.baidu.com/s/1QW4Pks_nEQW1o1JktgJOmg 提取码:kj7e 获得jar包 2、下面附上客户端类和回调类 、下面附上客户端类和回调类 回调类的作用在于消息的接收处理(下面代码注释会有详细的解释 下面代码注释会有详细的解释) 回调类的作用在于消息的接收处理 1)客户端类 客户端类 package MQTT; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MyMqttClient { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; private static String ClientName = ""; //待填 将在服务端出现的名字 private static String IP = ""; //待填 服务器IP public static void main(String[] args) { start(ClientName); } public static void start(String clientId) { //初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, //这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession(true); //设置连接超时时间,单位是秒 mqttConnectOptions.setConnectionTimeout(10); //设置持久化方式 memoryPersistence = new MemoryPersistence(); if(null != clientId) { try { mqttClient = new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("连接状态:"+mqttClient.isConnected()); //设置连接和回调 if(null != mqttClient) { if(!mqttClient.isConnected()) {
//创建回调函数对象 MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback(); //客户端添加回调函数 mqttClient.setCallback(MQTTReceiveCallback); //创建连接 try { System.out.println("创建连接"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("mqttClient为空"); } System.out.println("连接状态"+mqttClient.isConnected()); } // 关闭连接 public void closeConnect() { //关闭存储方式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("memoryPersistence is null"); } //关闭连接 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is not connect"); } }else { System.out.println("mqttClient is null"); } } // 发布消息 public static void publishMessage(String pubTopic, String message, int qos) { if(null != mqttClient&& mqttClient.isConnected()) { System.out.println("发布消息 "+mqttClient.isConnected()); System.out.println("id:"+mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if(null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if(!publish.isComplete()) { System.out.println("消息发布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { reConnect(); } } // 重新连接 public static void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttConnectOptions is null"); } }else { System.out.println("mqttClient is null or connect"); } }else { start(ClientName); } }
// 订阅主题 public static void subTopic(String topic) { if(null != mqttClient&& mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if(null != mqttClient&& !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } } } 1)回调类回调类 package MQTT; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * 发布消息的回调类 * * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 * 必须在回调类中实现三个方法: * * (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 * * (2):public void connectionLost(Throwable cause)在断开连接时调用。 * * (3):public void deliveryComplete(MqttDeliveryToken token)) * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 * 由 MqttClient.connect 激活此回调。 * */ public class MQTTReceiveCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + new String(message.getPayload())); } } 3、调试与使用 、调试与使用
在程序中调用下面两个函数 subTopic("/World"); //订阅/World这个主题 publishMessage("/World","hello World(客户端)",1);//向主题world发送hello World(客户端) 运行成功后控制台的显示 可以看到此时客户端也收到了自己发送的消息,原因是传给消息发送函数的主题与客户端本身订阅的主题是一样的,服务器接收到消息后会给所有订阅了此主题的客户端转发消息 服务器接收到消息后会给所有订阅了此主题的客户端转发消息 可以看到服务端接收到了消息(右下角) 服务端点击发送消息后
此时客户端的控制台 作者:梦视空
分享到:
收藏