Thread vorübergehend stilllegen und wieder aufwecken

Sekundentakt

Bekanntes Mitglied
Hallo Gemeinde,

ich arbeite derzeit an einer für den Anfang recht komplexen Multithreading-Struktur, die sich über insgesamt 8 Threads erstreckt.

Kurz etwas zur Aufteilung:
Alle Threads laufen in der Main-Methode eines Process-Threads ab:
1x DataLoader - Thread (liest Daten aus einer Datenbank
2x PreProcessor - Thread (bereiten Daten auf)
3x Worker - Thread (verarbeitet die Daten)
1x Controller - Thread (kontrolliert die Daten und verteilt sie zurück auf die Worker oder auf den DataWriter)
1x DataWriter - Thread (schreibt die Daten in die Datenbank)

Ich habe hier mehrere Abhängigkeiten.
Durch mehrere Testläufe habe ich festgestellt, dass der DataLoader-Thread und die PreProcessor-Threads wesentlich schneller Daten in ihre Queues pumpen, als die Worker verarbeiten können. Das führt früher oder später zu einer Heap-Exception. Weiterhin habe ich festgestellt, dass den Workern und dem DataWriter in einigen Fällen zu wenig Verarbeitungszeit zugestanden wird, was im Endeffekt ebenfalls zu einer Heap-Exception führt.

Meine Idee ist es jetzt, in der run()-Methode des Controller-Threads nachzuprüfen, ob bestimmte Bedingungen darauf hinweisen, dass die Worker nicht hinterherkommen.
Ist das der Fall, möchte ich, dass der Controller die DataLoader- und PreProcessor-Threads warten (wait()) lässt.

Das mache ich z.B. so:
Code:
			while(someObject == null)
			{
				synchronized(this.dataLoader)
				{
					//Aufruf von this.dataLoader.wait() + try/catch-Block...
				}
			}
Das Problem ist, dass der Controller-Thread ebenfalls die Arbeit einstellt.
Statt nach dem Aufruf des synchronized-Blockes weitere Bedingungen für die anderen Threads zu überprüfen, bleibt er einfach stehen.
Der Controller-Thread ist der einzige, der seinerseits die besprochenen Producer wieder aufwecken darf, was wiederum an bestimmte Bedingungen geknüpft ist, die er jetzt nicht mehr überprüfen kann.

Der Controller-Thread erhält Zugriff auf die angesprochenen Producer-Threads, indem in der Main-Methode des Process-Threads folgendes geschieht:

Code:
public static void main(String[] args) throws SQLException, ClassNotFoundException, InterruptedException 
{
DataLoaderThread dataLoaderThread = new DataLoaderThread(...someDataForTheConstructor...);
																	
		DataPreProcessorThread dataPreProcessorThread = new DataPreProcessorThread(...otherDataForTheConstructor...); 

ControllerThread controller = new ControllerThread (dataLoaderThread, DataPreProcessorThread);

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(dataLoaderThread);
//...
executorService.execute(dataPreProcessorThread);
executorService.execute(controller);
//...
}

Was mache ich hier grundlegend falsch???
Danke für eure Mühen!
 

Marco13

Top Contributor
So grob und oberflächlich betrachtet klingt das nach einem (etwas komplizierteren) Producer-Consumer-Muster. Es gibt in java.util.concurrent z.B. sowas wie die "BlockingQueue". Die bietet im wesentlichen solche Methoden wie
- Lege Daten in die Queue wenn dort noch Platz ist - ansonsten warte
- Nimm Daten aus der Queue wenn dort welche sind - ansonsten warte

Damit kann man sowas oft komplett ohne eigene synchronisation schreiben...
 

Sekundentakt

Bekanntes Mitglied
Hallo Marco,

prinzipiell liegst Du da richtig, ich denke ich schaue mir das mal genauer an.
Was mich bisher aber davon abhielt, war, dass der BlockingQueue eine wesentlich geringere Durchsatzrate nachgesagt wird. Wahrscheinlich habe ich mich aber zu stark auf die LinkedQueue konzentriert.

Das Problem wäre damit zwar gelöst, die Frage aber trotzdem nicht ganz geklärt :).
Wieso hält mein controller-Thread an? Der soll doch gar nicht warten?
Ich kann das derzeit nicht so ganz nachvollziehen. . .
 

andiv

Bekanntes Mitglied
Ich empfehle an dieser Stelle mal das Buch "Java Concurrency in Practice". Jeder der Multithreading in Java "richtig" einsetzen will, sollte das meiner Meinung nach lesen. Die Autoren waren alle maßgeblich an der Entwicklung von java.util.concurrent.* beteiligt.
Die Autoren empfehlen an dieser Stelle auch, wie schon gesagt wurde, die Verwendung einer BlockingQueue.

Um was über den Controllerthread sagen zu können müsstest du mal mehr Code zeigen. Vorallem die Stellen wo blockierende Methoden aufgerufen werden oder syncronisiert wird (syncronized blockiert schließlich auch).
 

nickname

Bekanntes Mitglied
Hi,

bei deinem Controller ist es so, dass er seine Time Slice nicht mehr hat. Es wurde im die Prozessorzeit
für eine anderen Thread entzogen, und wenn kein anderer Thread diese Zeit hat, dann befindet er
sich im "blocked" Modus oder im Zustand "dead". Wenn sich ein Thread z.B. in "sleep()" befindet, dann gibt er den "kritischen Bereich" der durch "synchronized" geschützt
wird nicht für diesen Zeitraum frei, bis dieser Thread fertig ist.
Einen Thread kannst du mit "interrupt()" wecken, dann wird aber von "sleep(), join() oder wait()"
eine "InterruptExc..." geworfen und dieser Thread kommt dann in den Zustand "ready-to-run", wo
er auf die Prozessorzeit wartet (meistens in einer Queue"

Wenn du dir die Methode join() anschaust, dann kannst du sehen, dass ein Thread, der diese für den
aktiven aufruft , solange wartet bis dieser seine Arbeit beendet hat.
Damit kanst du kontrolliert auf das Enden des aktiven Threads warten.
Du kannst dann aber auch die Zeit in join() einstellen, wie lange er warten soll.

Du hast aber auch die Möglichkeit, einem Thread eine höhere Priorität zu geben. Dabei must du aber
beachten, dass dann vllt die anderen Threads seltener oder (was passieren kann) gar nicht drankommen. Am besten vergibst du dann einem intensiven Thread, der viel schreiben muss eine
niedrigerere Prioritätsstufe als einem lesenden Thread.
Es gibt (bei Windoof) 3 Stufen der Priorität 1, 5 und 10. Alle Thread haben als Standard die Stufe 5.
Und bei LINUX gibt es glaube ich keine, will mich aber da nicht festnageln lassen:rtfm:
Wenn du eine Queue mit Threads hast, dann kannst du eigentlich nie wissen, welcher Thread
die nächste Time Slice bekommt. Time Slices sind Zeitscheiben, die durch denn Scheduler verteilt werden und auch wieder entzogen werden (Stichwort: präemtives-Scheduling)
Mit der Methode notify() weckst du einen Thread und bringst ihn damit in den Zustand "ready-to-run"
und mit notifyAll() machst du das mit allen. Auch hierbei kannst du dann nicht wissen, welcher Thread
dann seine Arbeit aufnehmen wird. Schau dir auch mal die Methode "isInterrupted()" und "interrupted()" an.

Leider gibt dein bisschen Code nicht viel von deinem Problem wieder. Etwas mehr wäre schon ganz gut.

gruß, nickname
 

Sekundentakt

Bekanntes Mitglied
Hallo nickname,

ich glaube, du hast meinen Code falsch verstanden.
Der Controller ruft für den DataLoader und PreProcessor die Methode wait() auf.
Das Problem ist, dass er den zweiten Aufruf schon gar nicht mehr durchführt.
Die Ausgabe "Jetzt der PreProcessor" erscheint nie auf dem Bildschirm.

Code:
			while(someObject == null)
			{
				synchronized(this.dataLoader)
				{
					//Aufruf von this.dataLoader.wait() + try/catch-Block...
				}
                                System.out.println("Jetzt der PreProcessor...");
				synchronized(this.PreProcessor)
				{
					//Aufruf von this.PreProcessor.wait() + try/catch-Block...
				}
			}
Der Controller soll die Producer (DataLoader und PreProcessor) lahmlegen, wenn bestimmte Bedingungen wahr sind. Nachdem er das gemacht hat, soll er aber auch noch checken, ob die Ergebnisse der Worker nicht auf einer BlackList stehen. Die BlackList besteht dabei aus Doubletten.
Wenn die Ergebnisse nicht auf der BlackList stehen, pumpt der Controller die Daten in eine Queue für den DataWriter. Nach einem bestimmten Zeitintervall, sollen dann die Producer wieder geweckt werden, damit keine Wartezeiten bei den Workern und somit beim Rest entstehen.
Das alles geschieht aber nicht, weil der Controller einfach nicht mehr weitermacht.
Ich will ja nicht wait() für den Controller aufrufen, sondern den Controller dazu veranlassen, die Producer für eine Zeit zu stoppen.

Wenn du eine Queue mit Threads hast, dann kannst du eigentlich nie wissen, welcher Thread
die nächste Time Slice bekommt.
Ich habe viele Queues. In diesen Queues warten aber keine Threads.
In diese Queues werden die Daten hineingeschrieben, die die Threads planmäßig verarbeitet haben.
Zum Beispiel schreibt der DataLoader-Thread in eine loadedDataQueue, die PreProcessor holen ihrerseits Daten aus dieser Queue und schreiben sie nach ihrer Arbeit in die readyToWorkOnQueue. Einige besondere Werte kommen dann noch in eine needToCheckQueue für den Controller.
Die Worker holen ihre Daten aus der readyToWorkOnQueue und schreiben sie nach ihrer Arbeit in eine workedOnQueue.
Der Controller ruft Daten aus der needToCheckQueue auf, checkt ob sie auf eine BlackList gehören.
Nach einer bestimmten Zeitspanne hört er mit dem Laden aus der needToCheckQueue auf und kümmert sich um die angestauten Ergebnisse aus der workedOnQueue.
Die werden jetzt gegen die BlackList geprüft.
Wenn sie nicht drauf stehen, gehen sie in die readyToWriteQueue. Von dort aus holt der DataWriter seine Daten.

Ich habe also unterm Strich ein ganzes Netz von Queues, in denen teilweise Daten für die Arbeit aber auch zur Kontrolle zwischen den einzelnen Threads ausgetauscht werden. Dieses "Netz" arbeitet auch so wie es soll.

Alternativ zum wait-Aufruf kam mir gestern auch noch die Idee einer volatile-Variable, die abgefragt wird.
Dabei übergebe ich Controller.shouldStop per Referenz dem DataLoader-Konstruktor.
Das Problem ist, wenn ich im Controller das shouldStop-Flag auf TRUE setze, sieht es der DataLoader trotzdem nicht, weswegen mir das auch nicht geholfen hat. Sonst hätte ich dort nämlich sleep() aufrufen können.

Code:
private volatile boolean shouldStop;
Public DataLoader(boolean flag)
{
   this.shouldStop = flag;
}

Danke!
 

andiv

Bekanntes Mitglied
Also wenn du im Controller-Thread [c]this.dataLoader.wait()[/c] aufrufst, dann wartet meines Wissens der ControllerThread und nicht der DataLoaderThread (oder bin ich hier durcheinandergekommen?). Und wenn dann nirgends notify() oder notifyAll() aufgerufen wird, dann wartet dein ControllerThread ewig.

Allgemein gibt es die Möglichkeit mit interrupt() den DataLoaderThread zu unterbrechen (dazu muss dieser aber auch auf die InterruptedException reagieren, bzw. regelmäßig isInterrupted() abfragen). Die andere Möglichkeit ist die volatile boolean-Variable, da muss der DataLoaderThread aber auch regelmäßig die Variable abfragen (z.B. immer am Anfang einer Schleife).

Sonderlich toll fände ich das aber nicht. Was sind das denn für Bedingungen, bei denen der DataLoaderThread unterbrochen werden muss?
Ich könnte mir auch vorstellen alle deine Queues durch BlockingQueues zu ersetzen, wenn der Controller dann keine Daten mehr aus seiner Queue holt werden früher oder später auch die DataLoader blockieren, weil alle deine Queues voll sind.
 

ThreadPool

Bekanntes Mitglied
Sekundentakt hat gesagt.:
Das Problem ist, dass er den zweiten Aufruf schon gar nicht mehr durchführt.
Die Ausgabe "Jetzt der PreProcessor" erscheint nie auf dem Bildschirm.

Was richtig sein sollte, weil

Sekundentakt hat gesagt.:
Der Controller ruft für den DataLoader und PreProcessor die Methode wait() auf.

nicht funktioniert. Und du somit genau das Gegenteil von

Sekundentakt hat gesagt.:
Ich will ja nicht wait() für den Controller aufrufen

tust.
 

Sekundentakt

Bekanntes Mitglied
@ ThreadPool wie wäre es denn richtig???

@ andiv
Also wenn du im Controller-Thread this.dataLoader.wait() aufrufst, dann wartet meines Wissens der ControllerThread und nicht der DataLoaderThread (oder bin ich hier durcheinandergekommen?). Und wenn dann nirgends notify() oder notifyAll() aufgerufen wird, dann wartet dein ControllerThread ewig.
Wenn ich den DataLoader-Thread im Konstruktor des Controller-Threads per Referenz übergebe

Code:
private DataLoaderThread dataLoader;
Public ControllerThread(DataLoaderThread dataLoader)
{
    this.dataLoader = dataLoader;
}

und später im Code...

Code:
			while(someObject == null)
			{
				synchronized(this.dataLoader)
				{
					//Aufruf von this.dataLoader.wait() + try/catch-Block...
				}
			}
aufrufe, dann sollte doch eigentlich der per Referenz übergebene dataLoader gestoppt werden.
Wenn das nicht der Fall ist, hoffe ich, dass ThreadPool mir die Augen öffnet :).

Das Problem beim interrupt ist, dass ich nicht direkt von Thread ableite und meine per Referenz übergebenen volatile-Variablen nicht so sichtbar werden, wie ich es mir gehofft habe. ThreadPool scheint aber irgendwo meinen Fehler entdeckt zu haben, da warte ich erst mal auf seine Antwort.

Alternativ könnte ich auch flagQueues zwischen dem Controller und den Producern legen. Wenn in diesen flagQueues der Wert false auftaucht, stoppen sie für eine Weile (z.B. via sleep oder so). Das fände ich aber sehr schön und wäre eine - m.E. - sehr dreckige Implementationsvariante einer interrupt()-Methode, die nur nötig wird, weil meine volatile-Variablen falsch arbeiten.

Ich könnte mir auch vorstellen alle deine Queues durch BlockingQueues zu ersetzen, wenn der Controller dann keine Daten mehr aus seiner Queue holt werden früher oder später auch die DataLoader blockieren, weil alle deine Queues voll sind.
Bei den BlockingQueues suche ich im Moment noch nach einen Hinweis darüber, ob die size-Methode upToDate arbeitet oder auch nur einen Richtwert liefert. Bei einem unzuverlässigen Richtwert kann ich mich nicht darauf verlassen, dass ich Wartezeiten bei den Workern vermeide.

Sonderlich toll fände ich das aber nicht. Was sind das denn für Bedingungen, bei denen der DataLoaderThread unterbrochen werden muss?
Ich renne - bei 512 MB zugesichertem Arbeitsspeicher - in eine Heap-Exception.
Zum Zeitpunkt, an dem diese Exception geworfen wird, wurden insgesamt 18.000 Datensätze vom DataWriter in die DB geschrieben. 1.000 Datensätze sind soweit, dass sie in die DB geschrieben werden KÖNNTEN, und über 300.000 von den Producern produzierten warten noch darauf verarbeitet werden zu können - und es werden von Sekunde zu Sekunde mehr. Ich möchte, dass die Producer unterbrochen werden, sobald sie einen gewissen Vorrat angeschaufelt haben und dieser nicht bis auf ein bestimmtes Niveau reduziert wurde.

Eigentlich war die ganze Aufgabe mit den Zufallsdaten etc. nur eine Übung für den Einstieg ins Multithreading gedacht, jetzt hab ich mich aber ehrgeizig dran verbissen und will, dass das Ganze auch zufriedenstellend läuft :).
 

andiv

Bekanntes Mitglied
ch renne - bei 512 MB zugesichertem Arbeitsspeicher - in eine Heap-Exception.
Zum Zeitpunkt, an dem diese Exception geworfen wird, wurden insgesamt 18.000 Datensätze vom DataWriter in die DB geschrieben. 1.000 Datensätze sind soweit, dass sie in die DB geschrieben werden KÖNNTEN, und über 300.000 von den Producern produzierten warten noch darauf verarbeitet werden zu können - und es werden von Sekunde zu Sekunde mehr. Ich möchte, dass die Producer unterbrochen werden, sobald sie einen gewissen Vorrat angeschaufelt haben und dieser nicht bis auf ein bestimmtes Niveau reduziert wurde.

Na dann passt mein Vorschlag doch perfekt, ersetze alle Queues durch BlockingQueues mit sinnvollen Grenzen. Das sorgt dafür, dass nicht mehr Vorrat gesammelt wird, als verarbeitet werden kann.
 

ThreadPool

Bekanntes Mitglied
Wenn du wait() an irgendeinem Objekt aufrufst, passieren ungefähr folgende Sachen

1. bevor wait() aufgerufen werden kann muss sich das Lock auf dieses Objekt geholt werden

2. durch den Aufruf von Objekt.wait() sagst du das der Aufruferthread in die "Bedingungs-Warteschlange" des Sychronisations-Objekts eingehängt werden soll

3. der Aufruferthread wird in den "Waiting"-Zustand versetzt

4. der Aufruferthread gibt das Lock auf das Objekt frei

In deinem Fall heisst das, der Controller Thread geht in die while-Schleife, holt sich das Lock auf dataLoader und ruft an dataLoader wait() auf. Damit sagt der Controllerthread: Ich möchte am Objekt dataLoader warten bis irgendjmd am dataLoader-Objekt "notify" bzw. "notifyAll" aufruft und ich hoffentlich wieder in den Zustand "Runnable" versetzt werde.

Anschließend wird der Controller-Thread in seiner Ausführung in den "Warten"-Zustand versetzt, in die "Bedingungswarteschlange" von dataLoader geschrieben sowie sein Lock auf dataLoader aufgehoben.
 

Sekundentakt

Bekanntes Mitglied
Na dann passt mein Vorschlag doch perfekt, ersetze alle Queues durch BlockingQueues mit sinnvollen Grenzen. Das sorgt dafür, dass nicht mehr Vorrat gesammelt wird, als verarbeitet werden kann.
Weißt Du zufällig, ob dort aktiv gewartet wird?
Mit aktiv meine ich, dass der Thread zum Beispiel (übertrieben gesagt) 2 Sekunden Arbeitszeit vom Scheduler erhält und dann diese 2 Sekunden lang versucht was in die Queue zu schreiben.
Oder checkt der Scheduler einaml "die Queue is noch voll, ich teile die Arbeitszeit jetzt nem andern Thread zu und frage gleich noch mal nach"?

@ThreadPool: Das erklärt natürlich so einiges!
Dann müsste ich also den Producer-Threads jeweils per Referenz den Controller-Thread übergeben, damit der Controller-Thread die Producer wieder aufwecken kann?
Weißt Du vielleicht, wieso das mit meinem volatile-Flag nicht funktioniert?

Sagen wir, ich habe im Controller-Thread (der per Referenz an die Producer übergeben wird) das volatile-boolean Flag shouldWait.
shouldWait ist standardmäßig auf false.
Wie erreiche ich es jetzt, dass, wenn der Controller-Thread shouldWait = true setzt, auch die Producer-Threads dieses Flag korrekt abfragen können?

Gruß und Danke für die Hilfe!
 

andiv

Bekanntes Mitglied
Weißt Du zufällig, ob dort aktiv gewartet wird?
Mit aktiv meine ich, dass der Thread zum Beispiel (übertrieben gesagt) 2 Sekunden Arbeitszeit vom Scheduler erhält und dann diese 2 Sekunden lang versucht was in die Queue zu schreiben.
Oder checkt der Scheduler einaml "die Queue is noch voll, ich teile die Arbeitszeit jetzt nem andern Thread zu und frage gleich noch mal nach"?

Also in der Dokumentation BlockingQueue (Java Platform SE 6) hab ichs jetzt nicht gefunden. Wenn ich mich recht erinnere ist das ein Implementationsdetail. Die JVM kann entweder den Thread für ein Weilchen schlafen legen und dann wieder abfragen oder auch aktiv warten, d.h. die Abfrage ständig wiederholen. Ich meine die JVM würde das sogar optimieren, d.h. wenn es in der Vergangenheit länger gedauert hat, dann wird der Thread schlafen gelegt, wenn es dagegen meistens kurz danach wieder weitergeht, dann wird aktiv gewartet. Spielt das bei dir denn eine entscheidende Rolle?
 

Sekundentakt

Bekanntes Mitglied
Spielt das bei dir denn eine entscheidende Rolle?
Ja, tut es.
Ich habe bei meinen Workern ein TimeOut eingebaut - warten sie länger als 6 Sekunden, schalten sie sich ab. Das ist dazu gedacht, zu erkennen, wenn zuviele Worker Daten aus der Queue holen, sodass sie die meiste Zeit "null" aus der Queue holen. Dadurch spare ich Leerlaufzeiten ein.
Wenn jetzt der DataLoader sinnlos an der Queue warten würde, um reinzuschreiben, ist das für mein Zeitverschwendung. Die Rechenzeit wäre in dem Falle besser bei den PreProcessoren aufgehoben, damit diese die Queue für die Worker befüllen.

Das, was das Ganze so schwierig macht, ist eigentlich viel eher ein nicht weiter optimierbares SQL-Statement, dass alleine zum Berechnen des Ergebnisses schon 4-5 Sekunden braucht.

Das Umstellen auf BlockingQueues war erfolgreich, der Datendurchsatz war enorm. Allerdings lag das daran, dass ich permanent die selben Daten bearbeitet habe ;).
Nachdem der Bug gefixt wurde, muss ich momentan feststellen, dass meine PreProcessoren zu langsam sind, die Worker deswegen warten müssen und alle nacheinander in ein TimeOut rennen.

Ich melde mich noch mal, wenn ich neue Erkenntnisse zum Thema Queues & Co habe. :)

EDIT: So, ich habe jetzt mein Bottleneck:
Die PreProcessoren warten ab einem bestimmten Zeitpunkt auf Ergebnisse vom DataLoader. Die Queue zwischen den beiden ist nämlich leer.
Kann ich jetzt irgendwie den Scheduler dazu zwingen, dem DataLoader Rechenzeit zu geben?
Im Moment gehe ich so vor: Stellt ein PreProcessor fest, dass in der Queue nichts drin ist, rufe ich TimeUnit.SECONDS.sleep(3); auf. Davon erhoffe ich mir, dass der Scheduler Rechenzeit von den ohnehin arbeitsunfähigen PreProcessoren abzieht.
Ich müsste dem Scheduler jetzt nur noch mitteilen, dass diese Rechenzeit zwingend beim DataLoader landen müsste.
 
Zuletzt bearbeitet:

Marco13

Top Contributor
Welchen "Scheduler" meinst du denn da jetzt genau? Wenn nirgendwo mehr Daten vorhanden sind, alle warten, und niemand mehr etwas tun kann, außer dem "Anfang der Kette" (d.h. dem preprocessor) sollte der doch automatisch der einzige sein, der noch was tut.... ???:L
 

andiv

Bekanntes Mitglied
Warum musst du explizit sleep() verwenden? Verwendest du nicht die blockiernden Methoden put() und take()? Das tolle an denen ist ja, dass wenn die Queue leer ist, der Thread automatisch schlafen gelegt wird bis wieder was in der Queue ist. Der Scheduler wird dann automatisch die Rechenzeit den Threads zuteilen, die noch arbeiten können, eben weil ihre Queues noch nicht leer/nicht voll sind.
 

FArt

Top Contributor
Ich gehe noch mal darauf ein, was Marco13 am Anfang angemerkt hat:
diesen Krempel muss man nicht mehr mit notify() und wait() herunterklöppeln. Im Concurrency-Package gibt es alle nötigen Helferlein (BlockingQueue, Semaphore, Barrier, Executor, usw.).
Diese Dinger lassen sich nach den eigenen Bedürfnissen anpassen (Threadpools usw.)
Ein einfacher Weg damit umzugehen, wenn die Consumer viel langsamer sind als die Producer: die Producer künstlich ausbremsen (könnte man auch mit einer Barriere machen) oder noch mehr Consumer laufen lassen (das sollte grundsätzlich einigermaßen ausgewogen sein). Bei einem Speicherproblem ist eine einfache Lösung auch oft, die Queue persistent zu realisieren.

Wer mit Mutlithreading keine Erfahrung hat (und die Einschlägige Litereatur, die andiv bereits erwähnt hat, vielleicht noch nicht ausreichend kennt) sollte von "Handarbeit" die Finger lassen, außer er hat ein gutes Logging eingebaut und liebt es, Logfiles zu lesen...
 

Sekundentakt

Bekanntes Mitglied
Mein Problem habe ich folgendermaßen gelöst:

Statt die im Eingangsposting genannten Threads in der Main-Methode der Process-Klasse aufzurufen, habe ich das Ganze jetzt verzahnt. Dadurch gibt es quasi nur noch Leerlaufzeiten in der Process-Klasse, welche jetzt auch die Rolle des Controller-Threads übernimmt. Nämlich dann, wenn die Kontroll-Methode nichts zu tun hat. Und das geschieht, wenn ich meinem Logging glauben darf, nur ein einziges Mal, maximal.

Jedesmal, wenn der DataLoader ein Ergebnis in die (LinkedBlocking-)Queue schreibt, wird ein Aufruf eines .execute()-Befehls eines ExecutorServices gestartet. Dieser ExecutorService hat eine gefixte Anzahl an Threads. Die PreProcessoren werden dann nur noch als Jobs geladen.
Dieser Execute-Befehl startet einen PreProcessor. Der läuft so lange, bis er aus der DataLoader-Queue ein "null" herausholt (ich arbeite hier ohne TimeOut).
Jeder PreProcessor schreibt seinerseits Daten in eine dataForWorkerQueue und startet ebenfalls via ExecutorService - diesmal mit sequenziell abarbeitenden Threads - einen Worker-Thread, der seinerseits so lange arbeitet, bis er aus der dataForWorkerQueue ein "null" herausholt.

Der Controller wurde entfernt. Stattdessen übernimmt - wie erwähnt - eine Methode der Process-Klasse die Kontrolle der Queues, d.h. es wird nur noch gecheckt, ob die Ergebnisse valide sind.

Die Process-Klasse schreibt dann jedes valide Ergebnis in eine DataWriter-Queue und ruft seinerseits jedesmal, wenn er in die Queue geschrieben hat, einen DataWriter-Job auf.

Alle Threads, die hier gestartet werden, basieren auf Klassen, die ein einziges mal Instanziiert werden z.B.:
Code:
PreProcessor preProcessor = new PreProcessor(...some data to construct the PreProcessor...);
Die Konstruktor-Daten sind jeweils Queues, mit denen der PreProcessor "verbunden" ist.
Dadurch habe ich eine relativ begrenzte Zahl von Threads, die parallel arbeiten.

Ich bin mir sicher, dass ich das Ganze noch verbessern kann. Zum Beispiel, indem ich den PreProcessor Flags in eine ControllerQueue schreiben lasse, die angeben, ob sich Daten in der WorkerQueue befinden. Wenn das der Fall ist, starte ich über die Process-Klasse einfach einen neuen Worker-Thread.
Wichtig wäre hier zu messen, wie viele Worker-Threads in der Regel benötigt werden, damit das Ganze fließend abläuft. Denkbar wäre es hier, z.B. mit FutureTasks zu arbeiten, die via isDone() gefragt werden, ob sie noch zu tun haben. Stellt man dann fest, dass die dataForWorkerQueue trotz arbeitender Worker stetig anwächst, wird noch ein Worker instanziiert.

Soweit aber erst mal zu meiner Lösung.
Danke euch, für eure Hilfe!

EDIT:
Gibt's eigentlich eine Möglichkeit zu messen, wie viele Threads gerade laufen?
Ich habe ja ne recht tiefe Hierachie von Threads.
Process ruft DataLoader auf.
DataLoader ruft mehrere PreProcessoren (falls nötig) auf.
PreProcessor ruft ggf. ebenso mehrere Worker auf.
DataWriter wird jedesmal von Process instanziiert, wenn es für ihn etwas zu tun gibt.

Ich würde gerne mal in den Raum werfen, wie viele Threads diese Lösung insgesamt parallel aufruft und laufen lässt (mit parallel meine ich: wie viele werden denn vom Scheduler überhaupt gemanaged? Zwischenzeitlich "sterben" einige Threads, wenn sie auf "null" - Werte treffen).
Der Masterthread (Process-Klasse) instanziiert schließlich ein Child, dessen Children mehrere eigene Children aufrufen. :rtfm:
 
Zuletzt bearbeitet:
Ähnliche Java Themen
  Titel Forum Antworten Datum
R 11 GB File lesen ohne zu extrahieren Filedaten Bereich für Bereich adressieren dann mit Multi-Thread id die DB importieren Allgemeine Java-Themen 3
urmelausdemeis Exception in thread "main" java.lang.Error: Unresolved compilation problem: Allgemeine Java-Themen 7
smarterToby Wie stoppe ich diesen Thread Allgemeine Java-Themen 4
A Thread.sleep Problem Allgemeine Java-Themen 2
J Thread started nur einmal Allgemeine Java-Themen 19
W Server-Thread schreibt nicht alle Dateien Allgemeine Java-Themen 6
OnDemand Logfile pro User / Thread Allgemeine Java-Themen 7
OnDemand Thread / Service abbrechen Allgemeine Java-Themen 3
Thallius Ist meine static Helper Class Thread save? Allgemeine Java-Themen 9
P Swing Exception in thread "AWT-EventQueue-0" java.lang.IndexOutOfBoundsException: npoints > xpoints.length || npoints > ypoints.length Allgemeine Java-Themen 5
B Thread.sleep() in EJB Container wie lösen? Allgemeine Java-Themen 11
S Ist das Neuzuweisen von Feldern atomic und damit Thread-Safe? Allgemeine Java-Themen 2
S Exception in thread "main" java.lang.NullPointerException at FamilienApp.main(FamilienApp.java:15) Allgemeine Java-Themen 1
J Einen Thread in einer Schleife Allgemeine Java-Themen 2
E HILFE !! Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/io/FileUtils Allgemeine Java-Themen 4
Flynn Thread-Problem... Allgemeine Java-Themen 2
G Thread-Programmierung Allgemeine Java-Themen 5
S Datei wird nicht gefunden Thread.currentThread().getContextClassLoader().getResourceAsStream() Allgemeine Java-Themen 1
G Beendet sich der Thread selbst?! Allgemeine Java-Themen 3
mrbig2017 Sleep wird ignoriert und der Thread wartet nicht Allgemeine Java-Themen 1
S Thread beenden Allgemeine Java-Themen 9
M Array aus Thread Objekten erstellen Allgemeine Java-Themen 2
Aruetiise Swing JOptionPane ohne denn Thread zu pausieren Allgemeine Java-Themen 1
M Nanosekunden-Pause innerhalb einen Thread-Loops Allgemeine Java-Themen 3
E Thread Exception Allgemeine Java-Themen 6
javaerd Binomialkoeffizient ausrechnen, Exception in thread "main" java.lang.StackOverflowError Allgemeine Java-Themen 6
T Merkwürdiges Thread-Verhalten Allgemeine Java-Themen 6
K Thread Problem Allgemeine Java-Themen 6
W Thread sleep 30 sekunden - wenn keine Antwort bis dahin neu senden Allgemeine Java-Themen 2
H Thread bleibt stehen bei jar in jar Allgemeine Java-Themen 1
J Threads HTTP Request (Thread) dauert lange - in Android Allgemeine Java-Themen 3
F CPU Last eines Thread ausfindig machen Allgemeine Java-Themen 0
V Compiler-Fehler Exception in thread "AWT-EventQueue-0" java.lang.IndexOutOfBoundsException: Index: 125, Size: 125 Allgemeine Java-Themen 11
Tausendsassa Threads Einen Thread sich selbst schließen lassen Allgemeine Java-Themen 17
P Threads BufferedImage, Thread Concurrency Allgemeine Java-Themen 1
M Klasse in separaten Thread ausführen.Wie genau? Allgemeine Java-Themen 2
llabusch Thread blockiert Dialog Allgemeine Java-Themen 1
J Thread wait() Allgemeine Java-Themen 2
V Thread.sleep und InterruptedException? Allgemeine Java-Themen 1
G Thread nicht von GC zerstört Allgemeine Java-Themen 6
J Wie erschaffe ich einen sicheren Datenaustausch zwischen Thread und Nicht-Threads Allgemeine Java-Themen 8
Sogomn Thread blocken bis Taste gedrückt Allgemeine Java-Themen 5
T Starten vom Thread Allgemeine Java-Themen 3
T Wait/Notify() bei Thread Allgemeine Java-Themen 6
J Exception in thread "main" java.lang.NoClassDefFoundError Allgemeine Java-Themen 4
M Exception in thread "AWT-EventQueue-0" Allgemeine Java-Themen 6
Q Thread wacht nicht auf Allgemeine Java-Themen 7
T Fragen zum Thread-Thema Allgemeine Java-Themen 4
T Threads Input/Output im Thread - Datei ohne Inhalt Allgemeine Java-Themen 1
T Fragen zum Thread-Thema Allgemeine Java-Themen 9
C Threads Variablen in einem Thread Aktualisieren Allgemeine Java-Themen 17
U Thread beenden Allgemeine Java-Themen 3
W Threads Mit Thread und Runtime externe Programme öffnen Allgemeine Java-Themen 0
N Thread interrupt Status debuggen Allgemeine Java-Themen 6
A Thread: Code paralell ausführen in mehreren Instanzen Allgemeine Java-Themen 1
E Threads linkedlist/multi-thread problem Allgemeine Java-Themen 3
B Erkennen, wann Prozess beendet ist, dann Thread beenden. Allgemeine Java-Themen 6
A Thread Fehler absichtlich provozieren Allgemeine Java-Themen 3
B Threads Java Thread kommunizieren Allgemeine Java-Themen 12
N Thread Sicherheit im komplexen Datenmodell Allgemeine Java-Themen 7
K Thread richtig benutzen Allgemeine Java-Themen 3
K Exception in thread "AWT-EventQueue-1" Allgemeine Java-Themen 2
vandread Problem bei kleiner Thread-Übung Allgemeine Java-Themen 2
G Thread erzeugt nicht plausible NullPointerException Allgemeine Java-Themen 7
H Netbeans Warning bei Thread.sleep in Schleife Allgemeine Java-Themen 4
P [Thread] Scheint nicht Sequenziell zu Arbeiten Allgemeine Java-Themen 9
A eine test thread.join() frage Allgemeine Java-Themen 2
tuttle64 Verständnisprobleme mit Thread Locks Allgemeine Java-Themen 4
G Threads Thread bei Datenabfrage Allgemeine Java-Themen 3
S Thread anhalten per Button ? Allgemeine Java-Themen 3
E Thread Programmierung Allgemeine Java-Themen 2
S Threads ServerSocket-Thread soll schlafen, bis er gebraucht wird Allgemeine Java-Themen 2
V Thread schneller stoppen Allgemeine Java-Themen 2
V anstatt thread.join() einfach while schleife? Allgemeine Java-Themen 8
B Mausbewegung im Thread erkennen (hoch/runter) Allgemeine Java-Themen 6
G Linux/C++/Pthreads auf JVM zugreifen, thread safe? Allgemeine Java-Themen 10
K Threads Probleme mit Thread Allgemeine Java-Themen 13
K Threads Thread überprüfen Allgemeine Java-Themen 3
Z Threads Thread für einen Client Allgemeine Java-Themen 9
M Thread JavaFish Allgemeine Java-Themen 10
G Thread.sleep Allgemeine Java-Themen 12
M Threads Viele Aufrufe aus Thread, komisches Verhalten Allgemeine Java-Themen 8
B Threads Main Thread warten auf abgebrochen Task warten lassen Allgemeine Java-Themen 25
K Timer Thread Allgemeine Java-Themen 8
M Methoden Static Methoden und Thread??? Allgemeine Java-Themen 4
N java.lang.IllegalMonitorStateException: object not locked by thread before notify() Allgemeine Java-Themen 2
C Mehothode in anderenm Thread aufrufen Allgemeine Java-Themen 10
R Thread läuft nicht?! Allgemeine Java-Themen 7
R ThreadPool - vorhandene thread liste überprüfen bzw. aufräumen Allgemeine Java-Themen 3
J Anderem Thread Variable mitgeben Allgemeine Java-Themen 2
C Argument an einen Thread übergeben Allgemeine Java-Themen 4
S java.util.ConcurrentModificationException - aber nur ein Thread Allgemeine Java-Themen 3
G JUnit Test Methoden in anderen Thread verlagern Allgemeine Java-Themen 4
P Java Probleme - java.lang.Thread.run(Unkown Source) Allgemeine Java-Themen 10
L Im Thread auf Eingaben warten Allgemeine Java-Themen 3
P aus Thread auf Form zugreifen Allgemeine Java-Themen 9
C Threads Thread blockieren Allgemeine Java-Themen 4
K Threads Thread für Sleep Allgemeine Java-Themen 6
H Threads Thread stirbt aber Objekte in ihm leben weiter?! Allgemeine Java-Themen 9
K Threads Thread aktualisiert Progressbar nicht Allgemeine Java-Themen 4

Ähnliche Java Themen

Neue Themen


Oben