在Java中,要實(shí)現(xiàn)MQTT消息的發(fā)布和訂閱,你需要使用一個支持MQTT協(xié)議的庫。Eclipse Paho是一個流行的MQTT客戶端庫,提供了Java版本。以下是使用Eclipse Paho庫進(jìn)行MQTT消息發(fā)布和訂閱的步驟:
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MqttExample {
public static void main(String[] args) {
String broker = "tcp://localhost:1883";
String clientId = "JavaSampleClient";
try {
IMqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
mqttClient.connect(options);
System.out.println("Connected to MQTT broker");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
// ...
public class MqttExample {
// ...
private static void subscribe(IMqttClient mqttClient) {
try {
mqttClient.subscribe("my/topic", 0, new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message on topic: " + topic);
System.out.println("Message content: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered");
}
});
System.out.println("Subscribed to topic: my/topic");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
private static void publish(IMqttClient mqttClient) {
try {
String topic = "my/topic";
String content = "Hello, MQTT!";
int qos = 0;
boolean retained = false;
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
message.setRetained(retained);
mqttClient.publish(topic, message);
System.out.println("Message published to topic: " + topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// ...
try {
IMqttClient mqttClient = new MqttClient(broker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
mqttClient.connect(options);
System.out.println("Connected to MQTT broker");
subscribe(mqttClient);
publish(mqttClient);
// Keep the client running for a while to receive messages
Thread.sleep(60000);
mqttClient.disconnect();
System.out.println("Disconnected from MQTT broker");
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
這個示例展示了如何使用Eclipse Paho庫在Java中實(shí)現(xiàn)MQTT消息的發(fā)布和訂閱。你可以根據(jù)自己的需求修改代碼,例如更改主題、消息內(nèi)容或質(zhì)量等級(QoS)。