1000 MQTT Messages die Sekunde - 1000 Threads erstellen ?

Nisbo

Bekanntes Mitglied
Servus,

ich bekomme Daten von einem MQTT Broker,

Java:
String broker       = "tcp://server:1883";
String clientId     = "Java";
MemoryPersistence persistence = new MemoryPersistence();

try {
    mqttClient = new MqttClient(broker, clientId, persistence);
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(true);
    connOpts.setAutomaticReconnect(true);
    System.out.println("Connecting to broker: " + broker);
   
    mqttClient.setCallback(new MqttCallback() {
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("#####################################");
            System.out.println("Connection lost to broker: "+broker);
            System.out.println(throwable.getCause());
            throwable.printStackTrace();
        }
 
        @Override
        public void messageArrived(String t, MqttMessage m) {
            int iiii = m.toString().getBytes().length;
            setReceivedBytes(iiii);
            try {
                handleMqttAnswer(t, m);
            } catch (JSONException e) {
                //
            }
        }
 
        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            // TODO Auto-generated method stub
        }
        });
   
    mqttClient.connect(connOpts);
   
    mqttClient.subscribe("Server/Data", 2);
} catch(MqttException me) {
    //
} catch (Exception e) {
    //
}

das können schon mal bis zu 1000 Datensätze pro Sekunde sein, meistens so aber um die 300 pro Sekunde. Diese Daten werden dann in der Methode

Java:
handleMqttAnswer(t, m);

verarbeitet. Im Normalfall auch kein Problem da der Großtel (99,9%) der empfangenen Nachrichten einfach nur "durchläuft" ohne weitere Aktionen.

Einige Nachrichten allerdings brauchen länger (bis zu 4 Sekunden) ehe sie abgearbeitet sind da sie z.B. weitere Daten aus dem Internet laden oder Daten per MQTT an einen anderen Broker verschicken.

Jetzt habe ich natürlich die Vermutung das mir da Daten verloren gehen können, oder cached der paho mqtt client die Nachrichten und arbeite sie dann danach weiter ab ?

Oder soll ich die Methode
handleMqttAnswer(t, m)
jedesmal in einem neuen Thread aufrufen ?
Da hätte dann evtl ja auch den Nachteil das die Reihenfolge der Nachrichten nicht mehr stimmt.

Wenn sie geached werden wäre soweit eigentlich alles relativ OK ^^
 

DrZoidberg

Top Contributor
Wenn dein Programm im Durchschnitt 1000 Nachrichten pro Sekunde empfängt, dann muss es auch in der Lage sein 1000 Nachrichten pro Sekunde abzuarbeiten, sonst wird der Cache irgendwann voll sein.
Wenn 0,1% der Nachrichten jeweils 4s in Anspruch nehmen, dann heißt das, jede Sekunde kommt eine Nachricht an, die 4 Sekunden braucht um sie abzuarbeiten. Das wird nicht funktionieren. Da darf dann maximal alle 4 Sekunden eine solche Nachricht ankommen.
Eine Lösung wäre die Verwendung von mehreren Worker Threads. Dafür eignet sich z.b. ein ExecutorService. Das erfordert dann aber nicht 1000 Threads sondern nur ein paar wenige.
Muss die Reihenfolge unbedingt erhalten bleiben?
 

Nisbo

Bekanntes Mitglied
OK dann passt meine Prozentangabe nicht ^^

Es kommen evtl mal 4 Nachrichten die Minute welche länger dauern, dann manchmal mehrere Minuten keine welche so lange dauern.

Die Reihenfolge muss eingehalten werden.

Was ich mir überlegt hatte die Nachrichten welche "abgearbeitet" werden müssen in eine Art "Task-Liste" zu schreiben um sie dann der Reihe nach getrennt von der handleMqttAnswer Methode mit einem Worker abzuarbeiten aber das muss ich nochmal gut überdenken ob es dann so hinhaut.

Wenn der paho MqttClient jetzt die Nachrichten welche vom Broker empfangen und hier abgearbeitet werden
Java:
public void messageArrived(String t, MqttMessage m) {
                //
}

cached dann hat sich die ganze Frage eh erübrigt.
Was ich mit chachen meine

Kommt eine Nachricht rein vom Broker dann wird ja
messageArrived(String t, MqttMessage m)
aufgerufen welche dan meine Methode aufruft.

Wenn meine Methode jetzt 4 Sekunden braucht, empfängt der Mqtt Client dann in der Zeit weiter Daten und cached diese ?

Also werden die Nachrichten welche dann in den 4 Sekunden eingetroffen sind trotzdem abgearbeitet oder empfängt der Client diese nicht ?
 

Nisbo

Bekanntes Mitglied
Das nenne ich mal eine coole Doku :)

Habe mir gerade nochmal den Counter der Nachrichten angesehen, wenn ein langer task abgearbeite wird hält der an und danach werden dann erst einmal sehr viele Nachrichten verarbeitet bis er wieder normal ansteigt, wird also wohl einen Cache haben :)
 

Thallius

Top Contributor
Naja ich fände es sauberer die "Zusatzarbeit" in einen eigenen Thread zu machen. Sprich der handleMqttAnswer schaut nur nach ob weitere Arbeiten an der Message nötig ist und wenn ja schickt er diese an einen weiteren Thread der die dann abarbeitet. Ob das dann jeweils ein eigener Thread ist oder ob das auch nur 1 Thread ist der die Messages dann in einer Queue abarbeitet ist Geschmacksache.

Gruß

Claus
 

Tobse

Top Contributor
Naja ich fände es sauberer die "Zusatzarbeit" in einen eigenen Thread zu machen. Sprich der handleMqttAnswer schaut nur nach ob weitere Arbeiten an der Message nötig ist und wenn ja schickt er diese an einen weiteren Thread der die dann abarbeitet. Ob das dann jeweils ein eigener Thread ist oder ob das auch nur 1 Thread ist der die Messages dann in einer Queue abarbeitet ist Geschmacksache.

Der TO hatte das ja auch schon vorgeschlagen und ich sehe das genauso. Scheint mir die sauberste Lösung. Damit sollte man mit 3-4 Threads alles abarbeiten können.

Das wichtige ist ja, dass das Programm mit den anderen Messages weiter machen kann, während die lange Message abgearbeitet wird. Wenn man die langen Messages auslagert entsteht da mMn. auch kein Problem. Und selbst wenn Pro Minute 30 solche Nachrichten kommen kann man die über 1 oder 2 zusätzliche Threads abdecken.
 

JuKu

Top Contributor
Mit einem ExecutorService bekommst du dein Problem wahrscheinlich in den Griff.
Aber bloß nicht für jede Nachricht einen eigenen Thread erstellen!
 

Ähnliche Java Themen

Neue Themen


Oben