IoT
MQTT & Java

MQTT & Java

Dans ce tutoriel j’ai essayé de simplifier au maximum la mise en œuvre de MQTT en Java. J’ai pour cela utilisé la librairie Paho MQTT pour Java mise à disposition par la Fondation Eclipse. Le code source des exemples ci-dessous ainsi que la version mqttv3-1.2.5 de la librairie sont dispo dans une archive ici.

NB: Avoir la librairie Paho MQTT pour Java (org.eclipse.paho.client.mqttv3-1.2.5.jar) dans le même répertoire que vos classes de test.


Le programme de démo dispo chez Eclipse (PahoDemo.java) me semble un peu compliqué à appréhender pour un débutant. C’est pourquoi j’ai préféré le scinder en 2 programmes distincts: un qui envoie des messages MQTT (PahoTest001Producer.java) et un qui reçoit ces messages MQTT (PahoTest001Consumer.java). Dans la terminologie MQTT on parle de publish et de subscribe.

MQTT
MQTT

Quelques infos supplémentaires sur MQTT:


Envoi d’un message

Voici les principales étapes pour l’envoi d’un message MQTT:

  1. configuration du broker MQTT et génération du ID client unique; à noter qu’il semble nécessaire d’activer la persistence pour que la démo fonctionne (astuce trouvée sur un forum)

    String uri = "tcp://test.mosquitto.org:1883";
    String clientID = UUID.randomUUID().toString();
    MemoryPersistence persistence = new MemoryPersistence();
    
  2. création du connecteur MQTT (client) et connection au broker MQTT

    client = new MqttClient(uri, clientID, persistence);
    client.connect();
    
  3. création d’un message MQTT (payload = chaîne de caractères) puis publication sur le broker

    MqttMessage message = new MqttMessage();
    message.setPayload("A single message from my computer".getBytes());
    client.publish("foo", message);
    
Code source complet de la classe <code>PahoTest001Producer</code>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.UUID;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PahoTest001Producer implements MqttCallback {

MqttClient client;

public PahoTest001Producer() {
}

public static void main(String[] args) {
    new PahoTest001Producer().doDemo();
}

public void doDemo() {
    try {
        String uri = "tcp://test.mosquitto.org:1883";
        String clientID = UUID.randomUUID().toString();
        MemoryPersistence persistence = new MemoryPersistence();
        System.out.println("*** uri = "+uri);
        System.out.println("*** UUID = "+clientID);
        client = new MqttClient(uri, clientID, persistence);

        client.connect();
        client.setCallback(this);

        MqttMessage message = new MqttMessage();
        message.setPayload("A single message from my computer".getBytes());
        System.out.println("*** msgId = "+message.getId());
        client.publish("foo", message);

        client.disconnect();
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

@Override
public void connectionLost(Throwable cause) {
    cause.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    System.out.println("["+topic+"] "+message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("Delivery complete...");
}

}

Réception d’un message

Voici les principales étapes pour la réception d’un message MQTT

  1. configuration du broker MQTT et génération du ID client unique; à noter qu’il semble nécessaire d’activer la persistence pour que la démo fonctionne (astuce trouvée sur un forum)

    String uri = "tcp://test.mosquitto.org:1883";
    String clientID = UUID.randomUUID().toString();
    
  2. création du connecteur MQTT (client) et connection au broker MQTT

    client = new MqttClient(uri, clientID, persistence);
    client.connect();
    
  3. configuration du client pour définir l’action à exécuter (notion de callback) à réception d’un message; ici on appelera la méthode messageArrived(...) de notre propre classe; une fois que le message aura été traité, ce sera la méthode deliveryComplete(...) qui sera ensuite invoquée

    client.setCallback(this);
    
  4. ces 2 méthodes messageArrived(...) et deliveryComplete(...) sont implémentées comme suit

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("["+topic+"] "+message);   
    }
    
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("Delivery complete...");
    }
    
  5. souscription du client au topic foo sur le broker MQTT; dès qu’un client publiera un message sur ce topic, le broker le diffusera à tous les autres clients qui y auront souscrit

    client.subscribe("foo");
    
Code source complet de la classe <code>PahoTest001Consumer</code>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.UUID;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class PahoTest001Consumer implements MqttCallback {

MqttClient client;

public PahoTest001Consumer() {
}

public static void main(String[] args) {
    new PahoTest001Consumer().doDemo();
}

public void doDemo() {
    try {
        String uri = "tcp://test.mosquitto.org:1883";
        String clientID = UUID.randomUUID().toString();
        MemoryPersistence persistence = new MemoryPersistence();
        System.out.println("*** uri = "+uri);
        System.out.println("*** UUID = "+clientID);
        client = new MqttClient(uri, clientID, persistence);

        client.connect();
        client.setCallback(this);
        client.subscribe("foo");
    } catch (MqttException e) {
        e.printStackTrace();
    }
}

@Override
public void connectionLost(Throwable cause) {
    cause.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    System.out.println("["+topic+"] "+message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("Delivery complete...");
}

}

Exécution de cet exemple

  • Comme indiqué au début de ce tutoriel, il est nécessaire de placer la librairie Paho MQTT pour Java (org.eclipse.paho.client.mqttv3-1.2.5.jar) dans le même répertoire que vos classes de test.

  • Compiler le code:

    javac -cp .:./org.eclipse.paho.client.mqttv3-1.2.5.jar PahoTest001Producer.java
    javac -cp .:./org.eclipse.paho.client.mqttv3-1.2.5.jar PahoTest001Consumer.java
    
  • Exécuter les 2 programmes dans 2 terminaux différents en démarrant le consumer en $1^{er}$ pour qu’il soit bien à l’écoute quand le producer enverra le message:

    java -cp .:./org.eclipse.paho.client.mqttv3-1.2.5.jar PahoTest001Producer
    java -cp .:./org.eclipse.paho.client.mqttv3-1.2.5.jar PahoTest001Consumer