Hallo Gemeinde,
ich arbeite derzeit an einer für den Anfang recht komplexen Multithreading-Struktur, die sich über insgesamt 8 Threads erstreckt.
Kurz etwas zur Aufteilung:
Alle Threads laufen in der Main-Methode eines Process-Threads ab:
1x DataLoader - Thread (liest Daten aus einer Datenbank
2x PreProcessor - Thread (bereiten Daten auf)
3x Worker - Thread (verarbeitet die Daten)
1x Controller - Thread (kontrolliert die Daten und verteilt sie zurück auf die Worker oder auf den DataWriter)
1x DataWriter - Thread (schreibt die Daten in die Datenbank)
Ich habe hier mehrere Abhängigkeiten.
Durch mehrere Testläufe habe ich festgestellt, dass der DataLoader-Thread und die PreProcessor-Threads wesentlich schneller Daten in ihre Queues pumpen, als die Worker verarbeiten können. Das führt früher oder später zu einer Heap-Exception. Weiterhin habe ich festgestellt, dass den Workern und dem DataWriter in einigen Fällen zu wenig Verarbeitungszeit zugestanden wird, was im Endeffekt ebenfalls zu einer Heap-Exception führt.
Meine Idee ist es jetzt, in der run()-Methode des Controller-Threads nachzuprüfen, ob bestimmte Bedingungen darauf hinweisen, dass die Worker nicht hinterherkommen.
Ist das der Fall, möchte ich, dass der Controller die DataLoader- und PreProcessor-Threads warten (wait()) lässt.
Das mache ich z.B. so:
Das Problem ist, dass der Controller-Thread ebenfalls die Arbeit einstellt.
Statt nach dem Aufruf des synchronized-Blockes weitere Bedingungen für die anderen Threads zu überprüfen, bleibt er einfach stehen.
Der Controller-Thread ist der einzige, der seinerseits die besprochenen Producer wieder aufwecken darf, was wiederum an bestimmte Bedingungen geknüpft ist, die er jetzt nicht mehr überprüfen kann.
Der Controller-Thread erhält Zugriff auf die angesprochenen Producer-Threads, indem in der Main-Methode des Process-Threads folgendes geschieht:
Was mache ich hier grundlegend falsch???
Danke für eure Mühen!
ich arbeite derzeit an einer für den Anfang recht komplexen Multithreading-Struktur, die sich über insgesamt 8 Threads erstreckt.
Kurz etwas zur Aufteilung:
Alle Threads laufen in der Main-Methode eines Process-Threads ab:
1x DataLoader - Thread (liest Daten aus einer Datenbank
2x PreProcessor - Thread (bereiten Daten auf)
3x Worker - Thread (verarbeitet die Daten)
1x Controller - Thread (kontrolliert die Daten und verteilt sie zurück auf die Worker oder auf den DataWriter)
1x DataWriter - Thread (schreibt die Daten in die Datenbank)
Ich habe hier mehrere Abhängigkeiten.
Durch mehrere Testläufe habe ich festgestellt, dass der DataLoader-Thread und die PreProcessor-Threads wesentlich schneller Daten in ihre Queues pumpen, als die Worker verarbeiten können. Das führt früher oder später zu einer Heap-Exception. Weiterhin habe ich festgestellt, dass den Workern und dem DataWriter in einigen Fällen zu wenig Verarbeitungszeit zugestanden wird, was im Endeffekt ebenfalls zu einer Heap-Exception führt.
Meine Idee ist es jetzt, in der run()-Methode des Controller-Threads nachzuprüfen, ob bestimmte Bedingungen darauf hinweisen, dass die Worker nicht hinterherkommen.
Ist das der Fall, möchte ich, dass der Controller die DataLoader- und PreProcessor-Threads warten (wait()) lässt.
Das mache ich z.B. so:
Code:
while(someObject == null)
{
synchronized(this.dataLoader)
{
//Aufruf von this.dataLoader.wait() + try/catch-Block...
}
}
Statt nach dem Aufruf des synchronized-Blockes weitere Bedingungen für die anderen Threads zu überprüfen, bleibt er einfach stehen.
Der Controller-Thread ist der einzige, der seinerseits die besprochenen Producer wieder aufwecken darf, was wiederum an bestimmte Bedingungen geknüpft ist, die er jetzt nicht mehr überprüfen kann.
Der Controller-Thread erhält Zugriff auf die angesprochenen Producer-Threads, indem in der Main-Methode des Process-Threads folgendes geschieht:
Code:
public static void main(String[] args) throws SQLException, ClassNotFoundException, InterruptedException
{
DataLoaderThread dataLoaderThread = new DataLoaderThread(...someDataForTheConstructor...);
DataPreProcessorThread dataPreProcessorThread = new DataPreProcessorThread(...otherDataForTheConstructor...);
ControllerThread controller = new ControllerThread (dataLoaderThread, DataPreProcessorThread);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(dataLoaderThread);
//...
executorService.execute(dataPreProcessorThread);
executorService.execute(controller);
//...
}
Was mache ich hier grundlegend falsch???
Danke für eure Mühen!