Threadpool

N

NeedThreadPool

Gast
Hallo liebe Leute,

ich hab hier gerade ein Thema bei dem ich eure Hilfe gebrauchen könnte.
Ich möchte ein Programm "Multi-Threaded" gestalten da ich extrem viele
gleichartige Aufgaben mit mehreren Objekten ausführen muss.

Jetzt habe ich eine Klasse Worker die Runnable implementiert.
Dieser Worker übernimmt ein Element aus einer Liste und arbeitet mit dem
Element einige Arbeitsschritte ab.
Die Elemente aus der Liste könnte ich natürlich also teilweise gleichzeitig abarbeiten.
Ich sage extra teilweise, denn in der Liste könnten theoretisch auch 10.000 Elemente enthalten sein.

Jetzt habe ich gelesen, dass es ein Konstrukt gibt, welches sich ThreadPool nennt.
Ich kann wohl dem ThreadPool ein Runnable übergeben, und der ThreadPool managed
dann wie viele Threads es geben darf, wann ein Runnable abgearbeitet ist und der Thread
ein neues übernehmen kann usw.

Ich konnte leider bisher kein Beispiel finden, dass ich so wirklich verstanden habe
(ja ich habe einiges gelesen aber der Groschen will nicht fallen).

Vielleicht ist es jemandem möglich ein einfaches Beispiel zu konstruieren, damit
ich langsam lernender Anfänger eine Chance habe das zu verstehen ;)

Also nochmal kurz zusammengefasst:
- Ich habe eine Liste mit theoretisch unbegrenzter Anzahl an Elementen
- Jedes Element wird einem Worker übergeben, welcher Runnable implementiert
- Jedes Element aus der Liste MUSS abgearbeitet werden, bevor das Programm fertig ist

Hoffe ihr könnt mir helfen bzw. habt Lust dazu ;)
 

timbeau

Gesperrter Benutzer
Wenn du alles berechnen willst macht Multithreading keinen Sinn.

Ob 1 Thread 100% CPU bekommt oder 2 Threads 50% macht keinen Unterschied. Nur bei I/O macht mE Multithreading Sinn.

Ansonsten hier ein simples Beispiel

http://www.java-forum.org/hausaufgaben/127027-consumer-producer-zufallszahlen.html

Bzw
Java:
// init executor
		execService = Executors.newFixedThreadPool(threadNumbers);

		// init threads
		for (int i = 0; i < threadNumbers; i++) {
			execService.execute(new Thread(...);
		}
 
S

SlaterB

Gast
wenn du dich für fertige Konstrukte wie vielleicht ExecutorService interessierst, dann suche danach,
dazu sollte es auch Beispiele/ Tutorials geben,

interessanter finde ich persönlich die einfache Eigenprogrammierung von Threads,
da könntest du dich nach den vielen 'Producer Consumer'-Beispielen richtigen, etwa
Producer Consumer Test : Producer ConsumerThreadsJava

Producer brauchst du vielleicht nicht, nur initial viele Elemente in den gemeinsamen Daten, hier CubbyHole,
eine Liste sicherlich,
sowie mehrere Consumer, die darauf zugreifen, Daten holen, verarbeiten usw.

Ende ist wenn alle Threads fertig sind, mit join() darauf warten

@timbeau
mehrere CPUs sind ja schon denkbar
 
N

NeedThreadPool

Gast
Bzw
Java:
// init executor
		execService = Executors.newFixedThreadPool(threadNumbers);

		// init threads
		for (int i = 0; i < threadNumbers; i++) {
			execService.execute(new Thread(...);
		}


Wird das hier nicht sequentiell abgearbeitet?
Wenn ich nacheinander dem Executor einen Thread übergebe?

Und müssten dann die Elemente aus der Liste in einer Art Warteschlange
realisiert sein? In dem Beispiel das du gepostet hast wird ja wieder
etwas verwendet von dem ich keine Ahnung habe, galube BlockingQueue nannte
sich das oder ähnlich.
 
S

SlaterB

Gast
> In dem Beispiel das du gepostet hast wird ja wieder etwas verwendet von dem ich keine Ahnung habe,
> galube BlockingQueue nannte sich das oder ähnlich.

keine Zeit, noch einmal 3 sec nachzuschauen?
in meinem Link geht es übrigens ohne das, dafür wait/ notify, irgendwas muss man schon kennen/ nachschlagen

sequentiell ist nur die Initialisierung, egal auf welche Weise,
du könntest noch vermuten dass jeder übergebene Thread einzeln läuft, aber die sollen schon vom Pool bei Gelegenheit bearbeitet werden,
deshalb auch besser keine Threads übergeben sondern Runnable wie es in der Anleitung zu diesem Konstrukt steht
 
N

NeedThreadPool

Gast
Jo, das mit wait(), notify() ist bekannt und auch das ich Runnables übergebe.

Aber noch mal zu meiner Liste, ich muss das Element aus meiner Liste welches Runnable implementiert an den executor übergeben =>

Code:
executor.execute(MeineListe.getElement());

Also muss ich zwangsweise meine Liste als Warteschlange realisieren ja?
 

timbeau

Gesperrter Benutzer
Ich habe auch keinen Thread übergeben, mein Fehler. :oops: Ich habe eine Klasse die Runnable implementiert übergeben und hier fürs Forum das abgeändert.

Bei meinem Beispiel wird eine BlockingQueue gefüllt und von den Executor-Threads abgearbeitet.
 
S

SlaterB

Gast
an einen Executor Runnable zu übergeben, die wer weiß wie lange laufen und verschiedene Aufgaben erledigen
ist meiner Ansicht nach auch wieder nicht ideal,
das vermischt die Konzepte, wirklich nicht verwunderlich wenn jetzt Verwirrung herrscht,


an einem ExecutorService könnten 1000 einzelne Mini-Aufträge als Objekte übergeben werden,
eine Liste, ob blocking, golden oder umweltfreundlich, braucht man dann überhaupt nicht mehr,
der ExecutorService verwaltet intern alle Aufgaben und reicht sie an seine x arbeitenden Threads weiter,
je nachdem wann diese Zeit haben

oder als Gegenentwurf erfindet man sich selber x Threads die jeweils beliebig viele Aufträge abarbeiten,
dann braucht man auch noch selber implementiert eine Verwaltung der Daten,
quasi Nachbau eines ExecutorService, 'Producer Consumer' sind dazu die Beispiele,

hoffentlich bisschen klarer und nicht noch ne Stufe verwirrender jetzt ;)
 

timbeau

Gesperrter Benutzer
Wie übergibt man dann aber Daten an Runnables die im Executor laufen? Daten die vll. noch garnicht feststehen?
 
S

SlaterB

Gast
so abstrakt weiß ich kaum was du meinst, wenn etwas noch nicht feststeht wieso dann überhaupt schon etwas übergeben,
vielleicht gibt es gar später gar nichts zu tun,

oder man stattet das Runnable allein mit der Auftragsnummer 225 aus, was immer das dann später bedeutet

so wie es hier aussieht mit 10.000 initial bekannten Elementen kann man diese glaube ich auf 10.000 Runnable verteilen,
auch wenn das nicht mein Vorschlag ist, ich bin ja für vorerst ganz ohne ExecutorService

wenn man jedenfalls 8 Threads die ganze Zeit laufen lassen will, dann kann man doch einfach 8 Threads statten,
statt 8 langlebige Runnable an einem ExcceutorService der Größe 8 zu übergeben,
das ist hier glaube ich der Punkt auf den es hinausläuft?
 

timbeau

Gesperrter Benutzer
Sorry für das Abstarkte. Mein Programm hier läuft gerade darauf hinaus, dass 1 Thread eine Datei ausliest und diese in eine BlockingQueue schreibt, gesplittet in Strings ("\n"). Aus dieser Queue holen sich 4 Runnables die Strings und verarbeiten die mit Batch-Processen und schreiben ihr Ergebnis (Process-InputStream) in eine weitere Queue.

Also GZip -> 1 Thread -> BlockingQueue -> 4 Runnables mit Executor -> BlockingQueue -> 1 Thread -> neues GZip

Die Runnables beende ich wenn alles bearbeitet ist. Nicht gut? ;)
 
S

SlaterB

Gast
ich schaue immer wieder in das verlinkte Thema, aber von Executor steht dort eh nix und von dir auch kein Posting ;)
mag für dich gerne funktionieren,
ich denke aber hier mit einfach nur Verarbeitung von 10.000 festen Daten liegt etwas anderer Fall vor
 
N

NeedThreadPool

Gast
Jup anderer Fall, die Daten stehen vorher fest.
Ich nehme mir nur ein Element aus der Liste, überprüfe das Element und entscheide dann
ob ich es in der einen oder der anderen Liste speichere.

Danke für eure Hilfen, werde mal ein wenig damit experimentieren, da es tatsächlich
immer noch nicht so klar ist wie die Liste strukturiert sein muss aus der meine Elemente
kommen^^
 
N

NeedThreadPool

Gast
Hi, ich wieder ;)

Ich habe mal versucht mir ein Beispiel-Programm zu basteln weil ich das echt
kapieren möchte.

Ich hätte hier eine Liste in Form eines Java-Vectors:

Java:
import java.util.Vector;

public class MyElements {
	private Vector<Integer> vecs = new Vector<Integer>();
	
	public MyElements() {
		for(int i = 0; i < 100; i++) {
			vecs.add(i);
		}
	}
	
	public Vector<Integer> getVecs() {
		return vecs;
	}
}

Über die Sinnigkeit einer solchen Klasse sei mal nichts gesagt, wie gesagt
nur zum Testen ;)

Ich habe meinen "Worker" der Runnable implementiert:

Java:
public class Test implements Runnable {
	private String name;
	private int workNumber;
	
	public Test(String name, int workNumber) {
		this.name = name;
		this.workNumber = workNumber;
	}

	public void run() {
		System.out.println("Ich bin Test " + name + " und arbeite mit " + workNumber);
	}
}

Und hier noch die Main, die den Executor enthält:

Java:
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Main {
	public static void main(String[] args) {
		MyElements elements = new MyElements();
		Executor execService = Executors.newFixedThreadPool(2);
		
		for (int i = 0; i < elements.getVecs().size(); i++) {
			execService.execute(new Test("" + i, elements.getVecs().get(i)));
			elements.getVecs().remove(i);
		}
	}
}

Als Ausgabe bekomme ich jetzt das hier:

Java:
Ich bin Test 0 und arbeite mit 0
Ich bin Test 1 und arbeite mit 2
Ich bin Test 2 und arbeite mit 4
Ich bin Test 3 und arbeite mit 6
Ich bin Test 4 und arbeite mit 8
Ich bin Test 5 und arbeite mit 10
Ich bin Test 6 und arbeite mit 12
Ich bin Test 7 und arbeite mit 14
Ich bin Test 8 und arbeite mit 16
Ich bin Test 10 und arbeite mit 20
Ich bin Test 11 und arbeite mit 22
Ich bin Test 9 und arbeite mit 18
Ich bin Test 12 und arbeite mit 24
Ich bin Test 13 und arbeite mit 26
Ich bin Test 14 und arbeite mit 28
Ich bin Test 15 und arbeite mit 30
Ich bin Test 16 und arbeite mit 32
Ich bin Test 17 und arbeite mit 34
Ich bin Test 19 und arbeite mit 38
Ich bin Test 18 und arbeite mit 36
Ich bin Test 20 und arbeite mit 40
Ich bin Test 21 und arbeite mit 42
Ich bin Test 22 und arbeite mit 44
Ich bin Test 23 und arbeite mit 46
Ich bin Test 25 und arbeite mit 50
Ich bin Test 24 und arbeite mit 48
Ich bin Test 27 und arbeite mit 54
Ich bin Test 28 und arbeite mit 56
Ich bin Test 26 und arbeite mit 52
Ich bin Test 29 und arbeite mit 58
Ich bin Test 30 und arbeite mit 60
Ich bin Test 31 und arbeite mit 62
Ich bin Test 33 und arbeite mit 66
Ich bin Test 34 und arbeite mit 68
Ich bin Test 35 und arbeite mit 70
Ich bin Test 36 und arbeite mit 72
Ich bin Test 37 und arbeite mit 74
Ich bin Test 32 und arbeite mit 64
Ich bin Test 38 und arbeite mit 76
Ich bin Test 39 und arbeite mit 78
Ich bin Test 40 und arbeite mit 80
Ich bin Test 41 und arbeite mit 82
Ich bin Test 42 und arbeite mit 84
Ich bin Test 43 und arbeite mit 86
Ich bin Test 44 und arbeite mit 88
Ich bin Test 45 und arbeite mit 90
Ich bin Test 46 und arbeite mit 92
Ich bin Test 47 und arbeite mit 94
Ich bin Test 48 und arbeite mit 96
Ich bin Test 49 und arbeite mit 98

Daraus ergeben sich zwei Fragen:

1. Hab ich das so richtig gemacht?
2. Wo sind die anderen 50 Ausgaben abgeblieben?
 
S

SlaterB

Gast
> Merke gerade, das Programm terminiert auch nicht....

das könnten so Nebenschauplätze von Executor sein, selber um Programmende kümmern,

---

hättest du nur eine Schleife die bis zum Ende durchläuft kommen alle Elemente sicherlich 1x dran,
wenn du aber zwischendurch noch die Liste veränderst, unten den Boden abschlägst und mit Index zugreift der auf solche Veränderungen reagiert, was stellst du dir dann vor?

abgesehen davon mag es funktionieren, dass nicht alles exakt in Reihenfolge passiert werte ich als gutes Zeichen für Nebenläufigkeit
 

timbeau

Gesperrter Benutzer
Ich hatte das gleiche Problem, anscheinend werden bei einer zu häufigen "execution" nicht alle Aufgaben zwischengespeichert. Daher nehme ich dafür eine Blockingqueue aus der sich die Threads dann bedienen.
 
N

NeedThreadPool

Gast
Na aber ich entferne doch genau das Element, dass ich betrachtet habe.
Ich starte den "Test 0" und entferne Object "0" aus dem Vector.
Verstehe da das Problem nicht warum er mir nicht alle Elemente ausgibt
sondern genau jedes zweite.
 

JavaProfi

Aktives Mitglied
Nachtrag:

Merke gerade, das Programm terminiert auch nicht....

Ein Threadpool existiert nach seiner Geburt grundsätzlich für immer !!!!!
Du musst ihn schon explizit "herunterfahren", nachdem du ihm den letzten Job übergeben hast.
Herunterfahren bedeutet, dass er nach dem shutdown() erst dann alle Threads (in deinem Fall 2) beendet, wenn keine neuen jobs mehr in seiner Queue stehen .

Siehe dazu auch mein Posting hier:

http://www.java-forum.org/allgemeine-java-themen/135469-multi-threading-join.html#post892483

Gruß
JP
 
Zuletzt bearbeitet:
S

SlaterB

Gast
@NeedThreadPool
wenn man das 0-te Element in einer Liste entfernt und die weiteren aufrücken, was steht danach an Index 0 oder gibt es dort dann ein Vakuum?
was steht an Index 1 wo du fortfahren willst?

------

Ich hatte das gleiche Problem, anscheinend werden bei einer zu häufigen "execution" nicht alle Aufgaben zwischengespeichert.
das wäre ja was..
 
N

NeedThreadPool

Gast
Ich hatte das gleiche Problem, anscheinend werden bei einer zu häufigen "execution" nicht alle Aufgaben zwischengespeichert. Daher nehme ich dafür eine Blockingqueue aus der sich die Threads dann bedienen.

Ich kann keine BlockingQueues verwenden, weil ich vorher garnicht weiß,
wie viele Elemente ich haben werden.

Das muss doch irgendwie möglich sein..... verstehe langsam garnichts mehr...
Alles was ich möchte ist doch nur:

- Eine Liste abarbeiten die mit fixen Elementen vorgegeben ist (z.B. 10 Elemente)
- Der Liste können allerdings Elemente hinzugefügt werden (unbegrenzt)
- Ein Worker holt sich ein Element aus der Liste (das dann von der Liste entfernt werden kann)
- Mehrere Worker parallel wären toll
- Es sind die Worker, die Elemente zu der Liste hinzufügen können!!!

Hier mal ein Bildchen was ich machen möchte:

neu0n.jpg


Der Allocator kann beliebig viele Element aus der MyList ziehen und an Consumer übergeben,
die alle Runnable implementieren. Die Consumer können beliebig viele Elemente zur MyList
hinzufügen. (Keine Sorge, um eine Terminierung wird sich gekümmert)

Und nach dem hin und her bin ich jetzt total verwirrt, was funktioniert und was nicht
funktioniert.
 
N

NeedThreadPool

Gast
SlaterB hat da zu schon alles gesagt !!
Nicht in der Schleife mit remove() den Vektor verkürzen !!

Gruß
JP

In meinem Fall (sehen Sie Post über Ihrem) muss ich irgendwie
Elemente aus meiner Liste entfernen und hinzufügen können.

Der Allocator ruft so lange Consumer auf und übergibt ihnen Elemente,
bis die Liste leer ist.
 

JavaProfi

Aktives Mitglied
In meinem Fall (sehen Sie Post über Ihrem) muss ich irgendwie
Elemente aus meiner Liste entfernen und hinzufügen können.

Der Allocator ruft so lange Consumer auf und übergibt ihnen Elemente,
bis die Liste leer ist.

Eine FIFO-Warteschlage (z.B. mit einer LinkedList) programmieren und nicht mit einem Index arbeiten,
sondern mit addLast (hinzufügen) und removeFirst (entnehmen) !!

Gruß
JP
 
Zuletzt bearbeitet:
N

NeedThreadPool

Gast
First In First Out? Okay!

Ich kenne mich mit dem Sprachkonstrukt synchronized nicht aus.
Wenn ich push() und pop() als synchronized definiere,
reicht das dann aus das das ganze Threadsafe ist?
 
N

NeedThreadPool

Gast
Okay, alles klar.

Das Größenproblem muss ich halt dann so löschen, dass wenn die Queue
voll ist, ich die größe der Queue erhöhen muss (neue Queue erzeugen und
Elemente umkopieren). Denke das bekomme ich hin ;)

Danke!
 

JavaProfi

Aktives Mitglied
Okay, alles klar.

Das Größenproblem muss ich halt dann so löschen, dass wenn die Queue
voll ist, ich die größe der Queue erhöhen muss (neue Queue erzeugen und
Elemente umkopieren). Denke das bekomme ich hin ;)

Danke!

WHAT TO HELL!!!
Warum Größenproblem?
Warum "wenn die QUEUE voll ist" ??? Hä ??
Eine Queue (z.B. LinkedList) ändert ihre Größe dynamisch!
Da musst du nichts umkopieren.

Gruß
JP
 
Zuletzt bearbeitet:
N

NeedThreadPool

Gast
Ja aber "dynamisch vergrößern" heißt ja im Prinzip das.
Ist die Datenstruktur zu klein, muss ich sie größer machen.
Macht doch auch jede Datenstruktur von Java so (ArrayList wächst
z.B. immer um 50%, Vector um das doppelte, ...)
 
N

NeedThreadPool

Gast
Gut Okay, ich könnte natürlich die Java LinkedList verwenden
also:

Java:
public Queue queue = new LinkedList<T>();

Dann wüsste ich aber wieder nicht wie ich die Synchroniziere....

So?

Java:
synchronized(queue) {
    queue.offer(...);
    ....
    queue.poll();
    ....
}
 

Empire Phoenix

Top Contributor
Also ich würde nen Threadpool nehmen, (processorcount wird in meinen fall über die System properties rausgelesen)

Java:
	threadpool = new ThreadPoolExecutor(processorcount, processorcount, 1000, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());

//add tasks here
threadpool.execute(task); *10000 oder wie auch immer, solange shutdown nicht aufgerufen wurde, können auch bestehende runnables weitere runnables erstellen.


	MainThreadPool.threadpool.shutdown();
	MainThreadPool.threadpool.awaitTermination(1, TimeUnit.DAYS); //lange warten falls tasks offen, damit programm nicht terminiert wird, solange noch tasks unausgeführt sind
 

Ähnliche Java Themen

Neue Themen


Oben