实现实现Java与与MQTT的简单通讯(附带调试方法)
的简单通讯(附带调试方法)
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,
如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
MQTT可以由多种语言来实现,下面来记录一下Java语言的简单实现
、首先在idea里面配置所需库
1、首先在
里面配置所需库
https://pan.baidu.com/s/1QW4Pks_nEQW1o1JktgJOmg 提取码:kj7e 获得jar包
2、下面附上客户端类和回调类
、下面附上客户端类和回调类
回调类的作用在于消息的接收处理(下面代码注释会有详细的解释
下面代码注释会有详细的解释)
回调类的作用在于消息的接收处理
1)客户端类
客户端类
package MQTT;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
private static String ClientName = ""; //待填 将在服务端出现的名字
private static String IP = ""; //待填 服务器IP
public static void main(String[] args) {
start(ClientName);
}
public static void start(String clientId) {
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
//这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
//设置连接超时时间,单位是秒
mqttConnectOptions.setConnectionTimeout(10);
//设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null != clientId) {
try {
mqttClient = new MqttClient("tcp://"+IP+":1883", clientId,memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("连接状态:"+mqttClient.isConnected());
//设置连接和回调
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
//创建回调函数对象
MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
//客户端添加回调函数
mqttClient.setCallback(MQTTReceiveCallback);
//创建连接
try {
System.out.println("创建连接");
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}else {
System.out.println("mqttClient为空");
}
System.out.println("连接状态"+mqttClient.isConnected());
}
// 关闭连接
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("memoryPersistence is null");
}
//关闭连接
if(null != mqttClient) {
if(mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is not connect");
}
}else {
System.out.println("mqttClient is null");
}
}
// 发布消息
public static void publishMessage(String pubTopic, String message, int qos) {
if(null != mqttClient&& mqttClient.isConnected()) {
System.out.println("发布消息 "+mqttClient.isConnected());
System.out.println("id:"+mqttClient.getClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(message.getBytes());
MqttTopic topic = mqttClient.getTopic(pubTopic);
if(null != topic) {
try {
MqttDeliveryToken publish = topic.publish(mqttMessage);
if(!publish.isComplete()) {
System.out.println("消息发布成功");
}
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}else {
reConnect();
}
}
// 重新连接
public static void reConnect() {
if(null != mqttClient) {
if(!mqttClient.isConnected()) {
if(null != mqttConnectOptions) {
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttConnectOptions is null");
}
}else {
System.out.println("mqttClient is null or connect");
}
}else {
start(ClientName);
}
}
// 订阅主题
public static void subTopic(String topic) {
if(null != mqttClient&& mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is error");
}
}
// 清空主题
public void cleanTopic(String topic) {
if(null != mqttClient&& !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else {
System.out.println("mqttClient is error");
}
}
}
1)回调类回调类
package MQTT;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* (1):public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* (2):public void connectionLost(Throwable cause)在断开连接时调用。
*
* (3):public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
public class MQTTReceiveCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
3、调试与使用
、调试与使用
在程序中调用下面两个函数
subTopic("/World"); //订阅/World这个主题
publishMessage("/World","hello World(客户端)",1);//向主题world发送hello World(客户端)
运行成功后控制台的显示
可以看到此时客户端也收到了自己发送的消息,原因是传给消息发送函数的主题与客户端本身订阅的主题是一样的,服务器接收到消息后会给所有订阅了此主题的客户端转发消息
服务器接收到消息后会给所有订阅了此主题的客户端转发消息
可以看到服务端接收到了消息(右下角)
服务端点击发送消息后
此时客户端的控制台
作者:梦视空