Servus,
ich weiß der Text ist länger und so etwas wird gerne mal nich tgelesen aber evtl findet sich ja doch der eine oder andere
ich habe ein Programm (Private für den Amaterufunkbereich, also absolut kostenlos) welches sich zu einen MQTT Server verbindet (QOS 0 und 2 je nach Topic) dann die Daten auswertet und ausgewählte bzw aufbereitete Daten an einen anderen MQTT Broker weiter leitet. (QOS 2)
Auf den Broker wo die Daten her kommen habe ich keinen Zugriff, auf den Broker wo die Daten hin gehen habe ich Zugriff.
Das Programm läuft jetzt 48 Stunden und
- hat 56 Millionen Nachrichten empfangen und verarbeitet (10,4 GB)
- 702.000 Nachrichten weitergeleitet
Das Programm startet mit etwa 50 MB im RAM und hatte jetzt 680 MB im RAM (läuft auf einem 2010er Windows vServer mit nur 2 GB RAM und 2 Kernen) und nähert sich extrem der 100% RAM Auslastung des vServers.
Vor 2 Tagen hatte ich den Error bekommen und das Programm ist ausgestiegen
Das Programm läuft noch in Eclipse so das ich Fehler besser auswerten kann.
Der Timer ist von meinem "WatchDog" welcher alle 15 Sekunden guckt ob noch Messages empfangen worden sind bzw weitergeleitet worden sind. Wenn nicht wird der jeweilige MQTT Client beendet und neu gestartet. Hierbei schicke ich mir auch eine Nachricht per Telegram.
Das nur als Info was der Timer macht, der Timer wird nach Programmstart automatisch gestartet aber den Timer selbst mache ich dafür jetzt nicht wirklich verantwortlich.
So schaut der Timer aus:
nicht über das Boolean "watchDogDontSendTelegram" wundern, das hat seinen Sinn. ^^
Als MQTT Client wird Paho mqttv3_1.2.0 verwendet
So schaut der Client aus der die Daten empfängt, ich habe lediglich die URL und die Topics geändert,
der Client der die Daten sendet schaut ähnlich aus nur halt ohne Topic Subsciption, auchhier habe ich nur die Zugangsdaten und die URL geändert
Ein erster Ansatzpunkt vorher war
MqttConnectOptions.setMaxInflight() gewesen welches per default auf 10 steht und ich deswegen öfters mal Hänger hatte, also habe ich es testweise auf setMaxInflight(2500) geändert (in einer Anleitung stand was von 64000 was wohl geht)
Danach ist dann erst mal alles stabil gelaufen
nebenbei bemerkt habe ich 2 ExecuterService
Der 10er Pol ist dafür da Aufgaben abzuarbeiten, also Anfragen welche über MQTT vom Sender kommen. Das sind aber nicht viele, evtl so 5 in der Minute, manchmal auch Minuten lang nichts. Die Aufgaben selbst dauern nur ca 1 Sekunde.
Der 15er Pool ist für das weiter senden der Nachrichten per QOS2 an den anderen Broker, das sind so 2-10 Nachrichten pro Sekunde
Hier bei MQTT liegt eine meiner Vermutungen das dort irgendwie der nennen wir es mal Cache nicht gelöscht wird durch den Garbage Collector oder ich mache was falsch. Irgendwo hatte ich auch mal gelesen das es öfters wohl Probleme mit QOS 2 Messages geben soll.
Da ja wie man sieht doch ganz schon was an Nachrichten rein kommt und einige auch doppelt mache ich einen Duplicatecheck welcher ungefähr 1/3 der Nachrichten raus filtert so das sie nicht weiter bearbeitet werden müssen geschweige denn weiter gesendet werden.
Das Ganze mache ich mit einer Arraylist
Hierbei ist message einfach nur die MqttMessage.getPayload()
ob das jetzt eine tolle Lösung ist weiß ich nicht, pro Sekunde kommen ca 300 Nachrichten rein und nach 3 Sekunden lösche ich die alten Einträge in der Arraylist
Hier noch das DoubleData Objekt
Hier wäre ein weiter Ansatzpunkt von mir das dort der Garbage Collector nicht aufräumt oder ich was falsch gemacht habe.
Im Endeffekt ist ein ganz ganz leichter Anstieg vom RAM erwartet da sobald ein neuer User in Datenstrom auf taucht wird dafür ein Objekt angelegt und dieses dann in eine Arrayliste gepackt und wenn der User wieder aktiv ist wird dieses Objekt aktualisiert,
Aktuell sind das ca 11.000 User, wenn ich die ArrayList serilized speichere ist die Datei ca 1,4 MB groß, das ganze mache ich so mittels stream wobei ich hier jetzt auch nicht weiß ob Stream da gut ist oder nicht, dachte mir zumindest das bei der Anzahl an Einträgen Stream effectiver ist
und
hier sehe ich eigentlich kein Potential für ein Memeory Problem
Also was meint ihr ? Habe ich was falsch gemacht was dazu führen kann ? Oder ist es es evtl ein Problem mit dem Paho Clienten oder mit dem Java Garbage Control ?
ich weiß der Text ist länger und so etwas wird gerne mal nich tgelesen aber evtl findet sich ja doch der eine oder andere
ich habe ein Programm (Private für den Amaterufunkbereich, also absolut kostenlos) welches sich zu einen MQTT Server verbindet (QOS 0 und 2 je nach Topic) dann die Daten auswertet und ausgewählte bzw aufbereitete Daten an einen anderen MQTT Broker weiter leitet. (QOS 2)
Auf den Broker wo die Daten her kommen habe ich keinen Zugriff, auf den Broker wo die Daten hin gehen habe ich Zugriff.
Das Programm läuft jetzt 48 Stunden und
- hat 56 Millionen Nachrichten empfangen und verarbeitet (10,4 GB)
- 702.000 Nachrichten weitergeleitet
Das Programm startet mit etwa 50 MB im RAM und hatte jetzt 680 MB im RAM (läuft auf einem 2010er Windows vServer mit nur 2 GB RAM und 2 Kernen) und nähert sich extrem der 100% RAM Auslastung des vServers.
Vor 2 Tagen hatte ich den Error bekommen und das Programm ist ausgestiegen
Code:
Exception in thread "Timer-0" java.lang.OutOfMemoryError: Java heap space
Das Programm läuft noch in Eclipse so das ich Fehler besser auswerten kann.
Der Timer ist von meinem "WatchDog" welcher alle 15 Sekunden guckt ob noch Messages empfangen worden sind bzw weitergeleitet worden sind. Wenn nicht wird der jeweilige MQTT Client beendet und neu gestartet. Hierbei schicke ich mir auch eine Nachricht per Telegram.
Das nur als Info was der Timer macht, der Timer wird nach Programmstart automatisch gestartet aber den Timer selbst mache ich dafür jetzt nicht wirklich verantwortlich.
So schaut der Timer aus:
Java:
private void startTimerTask(){
this.timerTask = new TimerTask() {
public void run() {
watchDogSeconds--;
labelWatchDogTimer.setText(String.valueOf(watchDogSeconds));
if(watchDogSeconds == 0) {
watchDogSeconds = 15;
labelWatchDogTimer.setText(String.valueOf(watchDogSeconds));
if(watchDogCounterLH == messagesSendLastHeard) {
System.out.println("LH Server - Restart needed");
// Restart RR Server
try {
System.out.println("WatchDog - Try to Restart LH Server - Before Telegram Message");
sendTelegramMessage("WatchDog - Try to Restart LH Server");
System.out.println("WatchDog - Try to Restart LH Server - After Telegram Message");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JSONException e) {
e.printStackTrace();
}
watchDogDontSendTelegram = true;
System.out.println("WatchDog - Try to Restart LH Server - Before startWebSocketServer");
startRRmqttBroker();
System.out.println("WatchDog - Try to Restart LH Server - Before startWebSocketServer");
watchDogDontSendTelegram = false;
}
if(watchDogCounterMQTT == messagesReceived) {
System.out.println("MQTT Client - Restart needed");
// Restart MQTT
try {
System.out.println("WatchDog - Try to Restart MQTT Client - Before Telegram Message");
sendTelegramMessage("WatchDog - Try to Restart MQTT Client");
System.out.println("WatchDog - Try to Restart MQTT Client - After Telegram Message");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JSONException e) {
e.printStackTrace();
}
watchDogDontSendTelegram = true;
System.out.println("WatchDog - Try to Restart MQTT Client - Before mqttConnect");
mqttConnect();
System.out.println("WatchDog - Try to Restart MQTT Client - After mqttConnect");
watchDogDontSendTelegram = false;
}
// Set last Status (received messages)
watchDogCounterMQTT = messagesReceived;
watchDogCounterLH = messagesSendLastHeard;
labelWatchDogMQTT.setText(String.valueOf(watchDogCounterMQTT));
labelWatchDogLH.setText(String.valueOf(watchDogCounterLH));
}
}
};
this.timerTS12.scheduleAtFixedRate(this.timerTask, 0, 1000);
}
nicht über das Boolean "watchDogDontSendTelegram" wundern, das hat seinen Sinn. ^^
Als MQTT Client wird Paho mqttv3_1.2.0 verwendet
So schaut der Client aus der die Daten empfängt, ich habe lediglich die URL und die Topics geändert,
Java:
private void mqttConnect() {
mqttDisConnect();
String broker = "tcp://url:1883";
String clientId = "JavaMQTT" + String.valueOf(System.currentTimeMillis()); // .generateClientId ();;
MemoryPersistence persistence = new MemoryPersistence();
try {
mqttClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(2500);
System.out.println("Connecting to broker: " + broker);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("# Connection Lost Start #####################################");
System.out.println("# Connection lost to broker: " + broker);
System.out.println(throwable.getCause());
throwable.printStackTrace();
System.out.println("# Connection Lost End ####################################");
lblStatus.setText("Lost");
lblStatus.setForeground(Color.RED);
try {
sendTelegramMessage("MQTT Client - Connection LOST"); // Das ist nur für die Benachrichtigung gedacht
mqttSendMsgToWS("{ \"Event\" : \"LOST\", \"Message\" : \"Connection to Server lost - Reconnect in some seconds\" }", "service"); // Das ist nur für die Benachrichtigung gedacht
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String t, MqttMessage m) {
int iiii = m.toString().getBytes().length;
setReceivedBytes(iiii);
try {
handleMqttAnswer(t, m);
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
});
mqttClient.connect(connOpts);
mqttClient.subscribe("topic0/#", 0);
mqttClient.subscribe("topic1/#", 2);
mqttClient.subscribe("topic2/#", 2);
this.lblStatus.setText("Started");
this.lblStatus.setForeground(new Color(0, 128, 0));
sendTelegramMessage("MQTT Client started"); // Das ist nur für die Benachrichtigung gedacht
mqttSendMsgToWS("{ \"Event\" : \"START\", \"Message\" : \"Connected to Server\" }", "service"); // Das ist nur für die Benachrichtigung gedacht
} catch(MqttException me) {
System.out.println("# MqttException Start #####################################");
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
System.out.println("# MqttException End #####################################");
} catch (Exception e) {
e.printStackTrace();
}
}
der Client der die Daten sendet schaut ähnlich aus nur halt ohne Topic Subsciption, auchhier habe ich nur die Zugangsdaten und die URL geändert
Java:
private void mqttConnectRR() {
String broker = "tcp://url:1883";
String clientId = "JavaMQTTforRR" + String.valueOf(System.currentTimeMillis()); // .generateClientId ();;
MemoryPersistence persistence = new MemoryPersistence();
try {
mqttClientWS = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
connOpts.setUserName("meinUsername");
connOpts.setPassword("meinPasswort".toCharArray());
connOpts.setMaxInflight(2500);
System.out.println("Connecting to broker WS: " + broker);
mqttClientWS.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("# WS Connection Lost Start #####################################");
System.out.println("# WS Connection lost to broker: " + broker);
System.out.println(throwable.getCause());
throwable.printStackTrace();
System.out.println("# WS Connection Lost End ####################################");
lblStopped.setText("Lost");
lblStopped.setForeground(Color.RED);
wsAssumedStarted = false;
try {
sendTelegramMessage("Last Heard Server - Connection Lost");
mqttSendMsgToWS("{ \"Event\" : \"LOST\", \"Message\" : \"Connection to RR Server lost - Reconnect in some seconds\" }", "service");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String t, MqttMessage m) {
/*
Currently there are no messages send to this system
*/
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
});
mqttClientWS.connect(connOpts);
sendTelegramMessage("Last Heard Server started");
mqttSendMsgToWS("{ \"Event\" : \"START\", \"Message\" : \"Connected to RR Server\" }", "service");
this.lblStopped.setText("Started");
this.lblStopped.setForeground(new Color(0, 128, 0));
} catch(MqttException me) {
System.out.println("# WS MqttException Start #####################################");
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
System.out.println("# MqttException End #####################################");
} catch (Exception e) {
e.printStackTrace();
}
}
Ein erster Ansatzpunkt vorher war
MqttConnectOptions.setMaxInflight() gewesen welches per default auf 10 steht und ich deswegen öfters mal Hänger hatte, also habe ich es testweise auf setMaxInflight(2500) geändert (in einer Anleitung stand was von 64000 was wohl geht)
Danach ist dann erst mal alles stabil gelaufen
nebenbei bemerkt habe ich 2 ExecuterService
Java:
private ExecutorService executor = Executors.newFixedThreadPool(10);
private ExecutorService executorLH = Executors.newFixedThreadPool(15);
Der 10er Pol ist dafür da Aufgaben abzuarbeiten, also Anfragen welche über MQTT vom Sender kommen. Das sind aber nicht viele, evtl so 5 in der Minute, manchmal auch Minuten lang nichts. Die Aufgaben selbst dauern nur ca 1 Sekunde.
Der 15er Pool ist für das weiter senden der Nachrichten per QOS2 an den anderen Broker, das sind so 2-10 Nachrichten pro Sekunde
Hier bei MQTT liegt eine meiner Vermutungen das dort irgendwie der nennen wir es mal Cache nicht gelöscht wird durch den Garbage Collector oder ich mache was falsch. Irgendwo hatte ich auch mal gelesen das es öfters wohl Probleme mit QOS 2 Messages geben soll.
Da ja wie man sieht doch ganz schon was an Nachrichten rein kommt und einige auch doppelt mache ich einen Duplicatecheck welcher ungefähr 1/3 der Nachrichten raus filtert so das sie nicht weiter bearbeitet werden müssen geschweige denn weiter gesendet werden.
Das Ganze mache ich mit einer Arraylist
Java:
public boolean checkIfDoubleData (ArrayList<DoubleData> doubleDataArrayList, String message) {
for(DoubleData dd : doubleDataArrayList) {
if(dd.getContent().equals(message)) return true;
}
return false;
}
Hierbei ist message einfach nur die MqttMessage.getPayload()
ob das jetzt eine tolle Lösung ist weiß ich nicht, pro Sekunde kommen ca 300 Nachrichten rein und nach 3 Sekunden lösche ich die alten Einträge in der Arraylist
Java:
public ArrayList<DoubleData> removeExpiredDoubleData(ArrayList<DoubleData> doubleDataArrayList) {
//int before = doubleDataArrayList.size();
doubleDataArrayList.removeIf(data -> data.getTimestamp() <= (getUnixTime() - 3));
//int after = doubleDataArrayList.size();
//System.out.println(before + " --> " + after);
return doubleDataArrayList;
}
Hier noch das DoubleData Objekt
Java:
public class DoubleData {
private String content = "";
private int timestamp = 0;
public DoubleData(int timestamp, String content) {
setContent(content);
setTimestamp(timestamp);
}
/**
* @return the content
*/
public String getContent() {
return content;
}
/**
* @param content the content to set
*/
public void setContent(String content) {
this.content = content;
}
/**
* @return the timestamp
*/
public int getTimestamp() {
return timestamp;
}
/**
* @param timestamp the timestamp to set
*/
public void setTimestamp(int timestamp) {
this.timestamp = timestamp;
}
}
Hier wäre ein weiter Ansatzpunkt von mir das dort der Garbage Collector nicht aufräumt oder ich was falsch gemacht habe.
Im Endeffekt ist ein ganz ganz leichter Anstieg vom RAM erwartet da sobald ein neuer User in Datenstrom auf taucht wird dafür ein Objekt angelegt und dieses dann in eine Arrayliste gepackt und wenn der User wieder aktiv ist wird dieses Objekt aktualisiert,
Aktuell sind das ca 11.000 User, wenn ich die ArrayList serilized speichere ist die Datei ca 1,4 MB groß, das ganze mache ich so mittels stream wobei ich hier jetzt auch nicht weiß ob Stream da gut ist oder nicht, dachte mir zumindest das bei der Anzahl an Einträgen Stream effectiver ist
Java:
public ActiveCalls checkIfCallSignIsCachedBase(String id){
List<ActiveCalls> result = this.callDataArrayList.stream()
.filter(line -> line.getId().equals(id))
.distinct()
.sorted(Comparator.comparing(ActiveCalls::getCallSign))
.collect(Collectors.toList());
for (ActiveCalls entry : result){
if(entry.getId().equals(id)){
return entry;
}
}
return null;
}
und
Java:
ActiveCalls activeCall = checkIfCallSignIsCachedBase(sourceID);
if(activeCall == null) {
// new entry
ActiveCalls newActiveCall = new ActiveCalls("unwichtig");
callDataArrayList.add(newActiveCall);
}else {
// update entry
// unwichtig
}
hier sehe ich eigentlich kein Potential für ein Memeory Problem
Also was meint ihr ? Habe ich was falsch gemacht was dazu führen kann ? Oder ist es es evtl ein Problem mit dem Paho Clienten oder mit dem Java Garbage Control ?