Parallelisierung

aknayirp

Mitglied
Hey, ich habe ein kleines Problem mit meinem Programm.

Ich möchte das Tasks von verschiedenen Threads bearbeitet werden. Es gibt verschiedene Berechnungsschritte und wenn die Threads mit den Tasks des ersten Berechnungsschritts fertig sind, werden die des Tasks des zweiten Berechnungsschritts in den Pool geladen.
ThreadPool erstellen tue ich mit ExecutorService.
Habe auch eine Klasse Tasks, die immer vom Hauptthread erstellt und in den Pool geladen wird.
Der Konstruktur der Klasse Task bekommt immer das gleiche Matrix-Objekt, weil damit gearbeitet werden soll. Wie es in Java üblich ist passiert dies per Referenz also einfache Übergabe.

Wenn ich jetzt nur einen Thread benutze und immer nur ein Task hochgeladen wird und nach dem bearbeiten des Tasks erst der nächste Task hochgeladen wird, dann funktioniert alles. Aber wenn ich alles Tasks des ersten Schrittes hochlade und dann warte, kommt nicht das richtige Ergebnis raus.

In der Klasse Matrix selbst habe ich eine Memberfunktion, mit der ich Punkte setzte. Dies benutze ich dann in den Tasks.
Weiß leider nicht genau, woher das kommt.

Hätte noch ne zweite Frage: Wie erstelle ich ein Future-Array um damit gewisse Rückgabewerte nach dem ersten Berechnungsschrittes zu erhalten und den Hauptthread zu stoppen bis die Tasks fertig sind?

MfG
 
Zuletzt bearbeitet:

Bug Fisher

Bekanntes Mitglied
Der Konstruktur der Klasse Task bekommt immer das gleiche Matrix-Objekt, weil damit gearbeitet werden soll. Wie es in Java üblich ist passiert dies per Referenz also einfache Übergabe.

Was wäre denn, wenn sagen wir einmal 2 Threads aus deinem Pool (verschränkt) auf dem gleichen Matrix-Object Änderungen vornehmen ? Könnte das Probleme machen oder ist das prinzipiell ausgeschlossen ? ;)
 

aknayirp

Mitglied
Also die Threads greifen zwar gleichzeitig drauf zu und schreiben auch was rein, aber immer an verschiedenen Stellen. Es darf nach dem Algorithmus nichtt passieren, dass die auf zwei gleiche Punkte schreiben.

P.S: Nochmal zur weiteren Erklärung. Ich habe zwei For schleifen. Die äußere zeigt an, welcher Bearbeitungsschritt und die innere die Tasks für den ersten zweiten usw Bearbeitungsschritt. Wenn ich nur einen Thread habe aber in der inneren Schleife nach dem submit future.get anwede und der Hauptthread wartet und lädt erst dann den nächsten Task hoch, dann klappt es.
Aber wenn ich das future.get außerhalb der inneren Schleife anwende, also alle Task des jeweiligen Bearbeitungsschrittes hochgeladen werden, dann nicht.

Das müsste doch heißen das am Task selbst alles in Ordnung ist und irgendwas beim hochladen schief geht oder?
Ich meine ich hab auch nur ein Thread, also kann es nicht an paralleliserung scheitern.
 
Zuletzt bearbeitet:

Bug Fisher

Bekanntes Mitglied
Aber wenn ich das future.get außerhalb der inneren Schleife anwende, also alle Task des jeweiligen Bearbeitungsschrittes hochgeladen werden, dann nicht.

Jedes submit() kehrt mit einem Future zurück, dessen Aufruf Future#get blockierend ist.
Also du brauchst für jedes submit auch das richtige Future; bei dir klingt das so, als ob du zwar mehrere submits machst, aber hinter der Schleife NUR EIN Future zum synchronisieren auf den Plan schickst.
 

aknayirp

Mitglied
Ja ich glaube hier liegt genau das Problem, denn ich verstehe das nicht ganz. Also ich habe einfach nur um den Algorithmus zu testen das so geschrieben: future = pool.submit(Task)
Also du hast schon recht ich habe nur ein future, was jedes mal überschrieben wird, aber ist das nicht egal, weil ich teste es ja nur mit einem Thread.

P.S: Ich meine das Future benutze ich nicht um ein return zu bekommen bzw. schon aber das hat nichts mit der Matrix zu tun.
 
Zuletzt bearbeitet:

Bug Fisher

Bekanntes Mitglied
Es wäre viel einfacher, wenn du etwas beispielhaften Code hättest.

Aber grundsätzlich: Jeder Task, egal ob Runnable oder Callable<> liefert genau ein Future. Future#get bringt den aufrufenden Thread dazu, auf den ausführenden zu warten. Man nennt das join oder rendez-vouz.

Der Thread, der die submits auf deinem Executor Objekt ausführt muss danach also auf ALLE Future's warten, nicht auf ein beliebiges.

Erzeuge eine Menge von Futures, die genau so viele Elemente enthält, wie du verschiedene Tasks hast.
 

aknayirp

Mitglied
Java:
for (int i = 0; i < maxIterations; i++) {

			Future<?> future = null;
			for (int j = 0; j < numberofRays; j += rcount) {
				future = pool.submit(new Task(eingabeparameter));

				try {
					future.get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			}

		}

Also so siehts ungefähr aus. Habe die unwichtigen Sachen hier mal rausgelassen. So würde das halt funktionieren aber wenn ich das future.get() rausnehmen, dann klappt es nicht.
Also weil ich das future immer überschreibe, kann der am Ende nicht synchronisieren?
Ich weiß leider nicht genau, wie ich eine Menge von Futures erstellen soll. Kann z.B. kein Array davon erstellen.
 

aknayirp

Mitglied
Hier habe ich das jetzt so geändert, aber es funktioniert trotzdem nicht. Habe diesmal ein bisschen mehr Code drin gelassen, vllcht hat es mit den anderen Schleifen zutun die auch innen sind.
Wenn ich in der inneren Schleife, in der die Tasks submitted werden, Thread.sleep(1) anwende, dann sieht das Ergebnis besser aus. Wenn ich dann noch länger pausiere, dann verbessert sich das Ergebnis immer mehr. Also muss es ja mit dem hochladen der Tasks zu tun haben.
Wenn das immer noch nicht ausreicht, dann kann ich auch den Task hochladen.

Java:
for (int i = 0; i < maxIterations; i++) {

			int numberofRays = art.getProjection(i).length - 2;
			if (numberofRays > rcountset) {
				rcount = rcountset;
				if (numberofRays % rcountset == 0) {
					dim = numberofRays / rcountset;
				} else {
					dim = numberofRays - (numberofRays % rcountset);
					dim = (dim / rcountset);
				}
				future = new Future[dim];
			} else {
				rcount = numberofRays;
				future = new Future[1];
			}

			allCoordinates = new Coordinate[numberofRays];
			allCoordinates = art.spCoordinate(i);
			coordinate = new Coordinate[rcount];
			for (int j = 0; j < numberofRays; j += rcount) {
				// Copy needed SPs of Rays for the next Task
				for (int k = j, l = 0; l < rcount; k++, l++) {
					coordinate[l] = allCoordinates[k];
				}

				future[counter] = pool.submit(new Task(art.getProjection(i), allcoordinate, pic, rcount, j));

				if (j + rcount > numberofRays - rcount) {
					rcount = numberofRays - j;
				}
				counter++;
				/*
				 * try { Thread.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block
				 * e.printStackTrace(); }
				 */
			}
			counter = 0;
			for (int j = 0; j < numberofRays; j += rcount) {
				try {
					subtotal = future[counter].get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
				counter++;
			}
			counter = 0;
 
Zuletzt bearbeitet:

Bug Fisher

Bekanntes Mitglied
Ich kenne deine Variable 'dim' nicht; aber nochmal; ich wiederhole mich;
wenn du dim submits machst, erhälst du dim Futures auf denen du get aufrufen solltest.

Deine Problembeschreibung passt exakt dazu, dass du eben genau das nicht tust.

Hier:

Java:
counter = 0;
for (int j = 0; j < numberofRays; j += rcount) {
try {
subtotal = future[counter].get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
counter++;
}

Offensichtlich wird der Schleifenkörper nicht genau so oft durchlaufen wie count.

Anders ausgedrückt:
Ist die Anzahl der Iterationen von
Java:
for (int j = 0; j < numberofRays; j += rcount)
gleich count bzw. future.length ?
 

aknayirp

Mitglied
Ne ist es leider nicht, deshalb musste ich eine Variable counter einführen. Aber ich habe gesehen, dass es mit
ArrayList<Future<Double>> futures = new ArrayList<Future<Double>>();
viel einfacher geht.
Hier nochmal einfacher, aber klappen tut es trotzdem nicht. Das Ding ist auch, dass ich alles die ganze Zeit mit nur einem Thread ausprobieren, also dürfte es ja nicht an multithreading liegen oder?
Ich meine das was du erklärt hast, habe ich ja jetzt gemacht: ich warte nach einer iteration bis alle Ergebnisse da sind um dann beim nächsten submit auch die aktualisierte Matrix mitzugeben.

Java:
ArrayList<Future<Double>> futures = new ArrayList<Future<Double>>();
		int counter = 0;
		int test = 0;
		long startTime = System.currentTimeMillis();
		for (int i = 0; i < maxIterations; i++) {

			int numberofRays = art.getProjection(i).length - 2;
			// System.out.println(numberofRays);
			if (numberofRays > rcountset) {
				rcount = rcountset;
			} else {
				rcount = numberofRays;
			}

			// -2 da in den ersten 2 stellen die Steigung steht
			allCoordinates = new Coordinate[numberofRays];
			allCoordinates = art.spCoordinate(i);
			coordinate = new Coordinate[rcount];

			for (int j = 0; j < numberofRays; j += rcount) {
				// Copy needed SPs of Rays for the next Task
				for (int k = j, l = 0; l < rcount; k++, l++) {
					coordinate[l] = allCoordinates[k];
				}

				Future<Double> future = pool.submit(new Task(art.getProjection(i), coordinate, pic, rcount, j));
				futures.add(future);
				/*
				 * try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
				 */
				if (j + rcount > numberofRays - rcount) {
					rcount = numberofRays - j;
				}
				counter++;
			}

			for (int m = test; m < futures.size(); m++) {
				try {
					System.out.println(futures.get(m).get());
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			}
			test = counter;

		}

Die Liste wird ja nach einer Iteration immer größer also habe ich Veriable test eingeführt.
Ich habe auch mal die Zeit in den Task getestet. Wenn ein Task immer nur im Pool ist und erst nach Beendigung der nächste Task in den Pool geladen wird, dann habe ich knapp eine Zeit von 500-600 ms für alle Iteration. Das wäre wenns funktioniert und wenn alle Task einer Iteration oben sind, dann dauert es irgendwie nur 120ms oder noch weniger. Da stimmt doch irgendwas nicht...
Vllcht reden wir auch einander vorbei; ich dachte eigentlich so würde ich auf die Task warten jetzt.

P.S: Könnte es damit zusammenhängen, dass nicht nach jeder Iteration die Matrix aktualisert werden soll, um dann die nächste zu starten sondern nach jedem Task die Matrix aktualiesiert werden soll in einer Iteration? Das würde erklären wieso es funktionert, wenn ich ein Thread.sleep anwende.
Ich lasse auch als Rückgabewert jetzt j ausgeben und bekomme auch die Werte auch in einer Reihenfolge, dass bedeutet ja das mein get() eigentlich alles richtig macht oder?
 
Zuletzt bearbeitet:

arilou

Bekanntes Mitglied
Auf jeden Fall riecht's nach Race Condition und/oder falscher Synchronizierung. Vielleicht solltest du statt Thread-Frameworks mit Pools und ähnlichem das Ganze mal von Hand aufbauen (eine eigene Thread-Klasse, Zugriff auf die Daten mit synchronized-Methoden, ...) , mit eigenem Nachdenken, wann wer die Daten manipulieren darf/soll.
 

aknayirp

Mitglied
Auf jeden Fall riecht's nach Race Condition und/oder falscher Synchronizierung. Vielleicht solltest du statt Thread-Frameworks mit Pools und ähnlichem das Ganze mal von Hand aufbauen (eine eigene Thread-Klasse, Zugriff auf die Daten mit synchronized-Methoden, ...) , mit eigenem Nachdenken, wann wer die Daten manipulieren darf/soll.

Hat sich erledigt.

Code:
coordinate = new Coordinate[rcount];

Diese Zeile musste nochmal in die innere For-Schleife. Da das Array dem Task immer übergeben wurde, ist man mit den Referenzen durcheinander gekommen.
Also ziemlich blöder Fehler :oops:
MfG
 
Zuletzt bearbeitet:

Neue Themen


Oben