Du verwendest einen veralteten Browser. Es ist möglich, dass diese oder andere Websites nicht korrekt angezeigt werden. Du solltest ein Upgrade durchführen oder ein alternativer Browser verwenden.
ThreadsMain Thread warten auf abgebrochen Task warten lassen
Die Tasks habe ich alle in einer Liste. Einzelne Tasks kann ich so abbrechen:
Java:
myTaskList.get(0).getMyTask().cancel(true);
Der User kann einzelne Tasks abbrechen. Mein Problem ist jetzt das ich im Main Thread warten muss, falls der Task der abgebrochen werden soll gerade ausgeführt wird. Ich dachte erst das geht mit get() aber da krieg ich ne CancellationException() (wie erwartet) und mein Main Thread läuft fröhlich weiter.
Im Prinzip will ich folgendes:
- User will Task abbrechen
- wenn abzubrechender Task derjenige ist der gerade im Thread (also im Scheduler) läuft, Main Thread solange warten lassen bis Task abgebrochen/beendet ist.
Spontaner Gedanke: Kann man nicht bei ThreadPoolExecutor#beforeExecute eine Semaphore holen und sie bei afterExecute wieder loslassen? (Ist ziemlich ins Blaue geraten - wenn's nicht passt muss man nochmal genauer schauen... am besten an einem KSKB...)
Nochmal: Das mit der Sempahore war nur eine spontane Idee, das muss man sich wirklich genauer ansehen. Aber das, was du da jetzt machst, ist im Prinzip "busy waiting", und nicht so schön... Kannst du ein KSKB posten, wo man beispielhaft den Ablauf sieht, und wo gewartet werden soll?
Hmja, ich denke, dass man da eine Sempahore verwenden könnte...
Java:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SchedulerTest {
public SchedulerTest() {
final Semaphore semaphore = new Semaphore(1);
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1)
{
protected void beforeExecute(Thread t, Runnable r)
{
System.out.println("Task trying to acquire semaphore");
try
{
semaphore.acquire();
System.out.println("Task acquired semaphore");
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
super.beforeExecute(t, r);
};
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
System.out.println("Task releasing semaphore");
semaphore.release();
}
};
// Tasks erstellen
List<MyTask> myTasks = new ArrayList<MyTask>();
for (int i = 0; i < 10; i++) {
MyTask ms = new MyTask(scheduler);
ms.start();
myTasks.add(ms);
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
// alle beenden
// for (ListIterator<MyScheduler> iter = mySchedulers.listIterator();
// iter.hasNext();) {
// MyScheduler ms = iter.next();
// ms.getmyTask().cancel(true);
// }
// erste beenden
myTasks.get(0).getMyTask().cancel(true);
// HIER WARTEN FALLS ZU BEENDENDER TASK GERADE IM SCHEDULER LÄUFT
System.out.println("Main trying to acquire semaphore");
try
{
semaphore.acquire();
System.out.println("Main acquired semaphore");
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
scheduler.shutdownNow();
System.out.println("RDY");
}
public static void main(String[] args) {
SchedulerTest test = new SchedulerTest();
}
}
class MyTask {
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> myTask;
public MyTask(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
public void start() {
myTask = scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("START");
// Working
double erg = 1;
while (erg < 100000000000L) {
erg *= 1.000000009;
}
System.out.println("Schleife Ende");
}
}, 0, 100000, TimeUnit.MILLISECONDS);
}
/**
* @return the as
*/
public ScheduledFuture<?> getMyTask() {
return myTask;
}
}
... aber es wäre vielleicht gut, wenn du genauer beschreiben würdest, welche Funktionalität da umgesetzt werden soll. Speziell macht das so eben keinen Sinn, wenn man einen pool mit mehreren Threads hat - weil jeweils nur einer die Semaphore haben könnte. Das könnte man natürlich auch lösen, aber ... es wäre gut, das eigentliche Ziel (bzw. gewünschte Verhalten) genauer zu kennen.
EDIT: Nur nebnebei: Du solltest das nicht 1:1 übernehmen - das sollte nur ein möglicher Ansatz sein. Insbesondere müßte man aufpassen, dass der main-Thread sich nicht die Semaphore krallt, und damit die Tasks am Arbeiten hindert!
Der Scheduler hat nur einen Thread.
Mein eigentliches Programm besteht aus einer Liste von Aktien (gespeichert in einer Aktientabelle in einer DB). Und jede Aktie hat quasi einen Task im Scheduler um den aktuellen Aktien-Kurs alle xx Minuten abzufragen und in einer Kurstabelle zu speichern. Der User kann Aktien hinzufügen (womit dem Scheduler ein Task hinzugefügt wird) und Aktien löschen (womit auch der Abfrage Task aus dem Scheduler gelöscht wird). Mein Problem tritt auf wenn der User eine Aktie löschen will die gerade im Scheduler-Thread aktiv ist (also den aktuellen Kurs abfragt), da ich die Aktie ja nicht einfach löschen kann während der Thread die Aktie benutzt (Fremdschlüssel der Aktie in der Kurstabelle und bei Löschung der Aktie werden auch alle Kursdaten gelöscht). D.h. ich muss den Task beenden bevor die Aktie gelöscht wird, deshalb das warten im Main-Thread.
Die Lösung macht leider auch nicht ganz das was ich wollte. Da so der Main Thread immer wartet, aber er soll nur warten wenn der Task der abgebrochen werden soll gerade im Thread ist. Der Code mit dem Semaphore wartet aber auch wenn bspw. der 3. Task gerade ausgeführt wird und der 5. beendet werden soll bis der 3. fertig ist.
EDIT: Oh, den letzten Beitrag hatte ich nicht gesehen (Reload vergessen ) aber... vielleicht passt's trotzdem:
OK... es ist schwer (bis unmöglich) sich da jetzt ohne nähere Infos und ganz spontan "die beste" Strategie auszudenken, aber aus dem Bauch heraus klingt es, als könnte man da auch GANZ anders drangehen ... sowas wie
Java:
List<Runnable> thingsThatShouldBeRun = new CopyOnWriteArrayList<Runnable>()
scheduler.scheduleAtFixedRate(createRunnableThatOnlyCallsAll(thingsThatShouldBeRun));
und dann einfach die thingsThatShouldBeRun verändern - aber wenn es sowieso immer nur einen Thread im Executor gibt, weiß ich nicht, warum man nicht einfach ein Runnable (das eine Liste von anderen Runnables ausführt) mit einem normalen Timer durchnudeln läßt ... dann spart man sich das ganze Warten und Canceln und Semaphoren und so....
Aber ... das ist KEINE Empfehlung!!! ... vielleicht passt das auch nicht auf dein Problem... :bahnhof:
Die Aufgabe besteht eigentlich nur darin periodisch den Aktienkurs von bestimmten Aktien von einem Server im Internet abzufragen. Ich habe eine Liste mit Aktien die alle periodisch (bspw. alle 20 Minuten) den aktuellen Kurs abfragen und in eine DB speichern. Da das ja alle xx Minuten immer wieder die selbe Aufgabe ist, dachte ich mir ich nehme mir den Scheduler da der ja dafür gedacht ist periodisch immer wieder den selben Kram auszuführen. Eigentlich ganz simpel, das Problem tritt eben dann auf wenn der Benutzer eine Aktie löschen will (und diese grade im Thread aktiv ist).
Ich hab eigentlich noch nie wirklich was mit Threads gemacht deswegen kann ich da jetzt wenig zu sagen :-(.
Kannst du das vielleicht etwas näher erklären?
Hm... der Scheduler KÖNNTE(!) (auf Basis des bisher beschriebenen) einfach "mehr" sein, als benötigt wird: Man kann auch in einem Timer (Java 2 Platform SE 5.0) tasks periodisch ausführen lassen - mit einem Thread (der Scheduler würde auch mehrere unterstützen, und noch einiges mehr). Nochmal: IMMER auf Basis des bisher beschriebenen würde jetzt aus meiner Sicht nichts dagegen sprechen, dass mit einem Timer zu machen, der einen Task ausführt, der nur eine Liste von Unter-Tasks enthält....
Falls ich das jetzt richtig verstanden habe würde dann der Ober-Task periodisch bspw. alle 20 Minuten die Liste der Untertasks ausführen (bzw. abarbeiten). Wenn das so wäre dann würde aber eine neu erstellte Aktie im Extremfall 20 Minuten warten bis sie das erste mal aktualisiert wird (Weil der Obertask ja nur alle 20 Minuten startet). Nur die viel wichtigere Frage - würde das mein Problem lösen? - Wäre es also einfach einen bestimmten Task aus dieser Taskliste zu killen bzw. zu beenden auch wenn er gerade läuft (und im Main Programm dann zu warten falls er gerade läuft)?
Ahja, das mit der Verzögerung stimmt. Man könnte auch jeden einzelnen "Unter"-Task als eigenständigen TimerTask einfügen, und es gibt eine "purge"-Methode zum Wegwerfen von Tasks, aber das Umzustellen würde sich wohl nicht lohnen.
Auch wenn dir die (wenig hilfreichen) Antworten und die Nachfragerei schon auf die Nerven gehen, was ganz nebenbei: WARUM soll der Main-Thread warten?
Wie auch immer, man könnte sicher sowas ähnliches machen wie das mit der Semapore, wo aber das beschriebene Problem nicht auftrit. Mit einem Lock und passenden Conditions oder so... das müßte man sich näher ansehen, vielleicht schau' ich bei Gelegenheit nochmal, weiß aber nicht, wann ich Zeit habe
Mir geht nichts auf die Nerven, freue mich ja über die Hilfe
Der Main-Thread soll warten, weil es sich wie gesagt um Aktien handelt die alle 20 Minuten aktualisiert werden. Die Aktien sind in einer Tabelle (Aktientabelle) in einer DB gespeichert und die Kurse ebenfalls (Kurstabelle). Wenn der User jetzt eine Aktie entfernen (löschen) will ist es ja möglich das diese Aktie gerade im Thread aktualisiert wird, was ja dann zum "Knall" führt (Aktien ID wird grad als Fremdschlüssel für Kurstabelle genutzt und in Kurstabelle wird gerade der aktuelle Kurs geschrieben zur gleichen Zeit wird aber die Aktie und damit alle Kurse gelöscht). Um das zu verhindern muss ich ja den Task vor dem löschen killen bzw. beenden.
Hm. Eigentlich dachte ich, dass Datenbanken da schon eigene Mechanismen mitbringen, um solche Sachen sauber ausführen zu können. Aber damit kenn' ich mich nicht aus.
Das mit dem "Nerven" war vielleicht eine Art Projektion, mich nervt es nämlich, dass das offenbar schwieriger ist, als es sein sollte. Das 'cancel' ändert nur einen State, irgendwo tief in kompliziert-versteckten concurrency-Klassen. Welche Auswirkungen das tatsächlich hat, ist kaum praktisch nachvollziehbar. Viele Einflußmöglichkeiten hat man nicht, weil man mit der privaten Klasse "ScheduledFutureTask" vom ScheduledThreadPoolExecutor nicht viel anfangen könnte... Hab' jetzt eine Weile rumgekrampft, und die einzige mögliche Lösung, die mir überhaupt eingefallen ist, wäre, irgendeine Synchronisationshilfe (z.B. eben Sempahore) an eine eigene Implementierung von ScheduledFuture weiterzureichen, die man im das eigentliche ScheduledFuture vom ScheduledThreadPoolExecutor drumwickelt, und in der man 'cancel' so überschreibt, dass auf diese Synchronisationshilfe gewartet wird.
Achtung, das ist mit ziemlicher Sicherheit Unfug, auf jeden Fall aber ziemlich "WTF?! :autsch:"
Java:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SchedulerTest {
public SchedulerTest() {
final Semaphore semaphore = new Semaphore(1);
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1)
{
protected void beforeExecute(Thread t, Runnable r)
{
System.out.println("Task trying to acquire semaphore");
try
{
semaphore.acquire();
System.out.println("Task acquired semaphore");
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
super.beforeExecute(t, r);
};
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
System.out.println("Task releasing semaphore");
semaphore.release();
}
};
// Tasks erstellen
List<MyTask> myTasks = new ArrayList<MyTask>();
for (int i = 0; i < 10; i++) {
MyTask ms = new MyTask(scheduler, semaphore);
ms.start();
myTasks.add(ms);
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
// alle beenden
// for (ListIterator<MyScheduler> iter = mySchedulers.listIterator();
// iter.hasNext();) {
// MyScheduler ms = iter.next();
// ms.getmyTask().cancel(true);
// }
// erste beenden
myTasks.get(0).getMyTask().cancel(true);
// HIER WARTEN FALLS ZU BEENDENDER TASK GERADE IM SCHEDULER LÄUFT
System.out.println("Main läuft weiter");
scheduler.shutdownNow();
System.out.println("RDY");
}
public static void main(String[] args) {
SchedulerTest test = new SchedulerTest();
}
}
class MyTask {
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> myTask;
private Semaphore semaphore;
public MyTask(ScheduledExecutorService scheduler, Semaphore semaphore) {
this.scheduler = scheduler;
this.semaphore = semaphore;
}
public void start() {
ScheduledFuture<?> delegate = scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("START");
long startMS = System.currentTimeMillis();
while (System.currentTimeMillis() < startMS + 5000)
{
// Do nothing
}
System.out.println("Schleife Ende");
}
}, 0, 10000, TimeUnit.MILLISECONDS);
myTask = new TestScheduledFuture(delegate, semaphore);
}
/**
* @return the as
*/
public ScheduledFuture<?> getMyTask() {
return myTask;
}
}
class TestScheduledFuture implements ScheduledFuture<Object>
{
private ScheduledFuture<?> delegate;
private Semaphore semaphore;
public TestScheduledFuture(ScheduledFuture<?> delegate, Semaphore semaphore)
{
this.delegate = delegate;
this.semaphore = semaphore;
}
@Override
public long getDelay(TimeUnit unit)
{
return delegate.getDelay(unit);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
System.out.println("Calling cancel");
try
{
System.out.println("Canceller trying to acquire semaphore");
semaphore.acquire();
System.out.println("Canceller acquired semaphore");
}
catch (InterruptedException e)
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
System.out.println("Calling cancel on delegate");
boolean result = delegate.cancel(mayInterruptIfRunning);
System.out.println("Canceller releasing semaphore");
semaphore.release();
return result;
}
@Override
public int compareTo(Delayed o)
{
return delegate.compareTo(o);
}
@Override
public boolean isCancelled()
{
return delegate.isCancelled();
}
@Override
public boolean isDone()
{
return delegate.isDone();
}
@Override
public Object get() throws InterruptedException, ExecutionException
{
return delegate.get();
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
return delegate.get(timeout, unit);
}
}
Wenn ich vor dem Problem stünde, würde ich jetzt wohl antizipieren, dass der ScheduledThreadPoolExecutor eben einfach nicht für diese Aufgabe gedacht ist, und versuchen, die erforderliche Funktionalität auf einfachst-mögliche Weise per Hand selbst zu bauen... Hab' da mal ein bißchen rumprobiert, ...
Java:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
public class SchedulerTest2
{
public static void main(String[] args)
{
SimpleScheduler simpleScheduler = new SimpleScheduler();
SimpleTask simpleTask0 = simpleScheduler.schedule(createSimpleRunnable(), 5000);
//pause(2000);
simpleScheduler.schedule(createSimpleRunnable(), 7000);
//pause(2000);
simpleScheduler.schedule(createSimpleRunnable(), 9000);
pause(100);
System.out.println("Calling cancel");
simpleTask0.cancel();
System.out.println("Calling cancel DONE");
}
private static void pause(long ms)
{
try
{
Thread.sleep(ms);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static Runnable createSimpleRunnable()
{
return new SimpleRunnable();
}
}
class SimpleScheduler
{
private List<SimpleTask> simpleTasks = new ArrayList<SimpleTask>();
private Thread thread;
public SimpleTask schedule(Runnable runnable, long periodMS)
{
if (thread == null)
{
thread = new Thread(new Runnable()
{
@Override
public void run()
{
doRun();
}
});
//thread.setDaemon(true);
thread.start();
}
SimpleTask simpleTask = new SimpleTask(runnable, periodMS);
synchronized (simpleTasks)
{
simpleTasks.add(simpleTask);
Collections.sort(simpleTasks);
simpleTasks.notifyAll();
}
return simpleTask;
}
private void doRun()
{
while (true)
{
synchronized (simpleTasks)
{
// Remove all cancelled tasks
List<SimpleTask> toRemove = new ArrayList<SimpleTask>();
for (int i=0; i<simpleTasks.size(); i++)
{
SimpleTask simpleTask = simpleTasks.get(i);
if (simpleTask.isCancelled())
{
toRemove.add(simpleTask);
}
}
simpleTasks.removeAll(toRemove);
// Wait until there are tasks to work on
while (simpleTasks.size() == 0)
{
try
{
simpleTasks.wait();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
}
// Fetch the first task from the list
SimpleTask simpleTask = simpleTasks.get(0);
if (simpleTask.isCancelled())
{
continue;
}
long waitTime = simpleTask.getNextExecutionMS() - System.currentTimeMillis();
System.out.println("Next task is "+simpleTask);
if (waitTime <= 0)
{
System.out.println("Running "+simpleTask);
simpleTask.run();
Collections.sort(simpleTasks);
}
else
{
try
{
simpleTasks.wait(waitTime);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
}
}
}
}
}
class SimpleTask implements Runnable, Comparable<SimpleTask>
{
private Runnable runnable;
private long periodMS;
private boolean isCancelled = false;
private boolean isRunning = false;
private long nextExecutionMS = 0;
public SimpleTask(Runnable runnable, long periodMS)
{
this.runnable = runnable;
this.periodMS = periodMS;
this.nextExecutionMS = System.currentTimeMillis();
}
long getNextExecutionMS()
{
return nextExecutionMS;
}
public synchronized void cancel()
{
while (isRunning)
{
try
{
wait();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return;
}
}
isCancelled = true;
}
public boolean isCancelled()
{
return isCancelled;
}
@Override
public synchronized void run()
{
long startTimeMS = System.currentTimeMillis();
isRunning = true;
runnable.run();
isRunning = false;
nextExecutionMS = startTimeMS + periodMS;
notifyAll();
}
@Override
public int compareTo(SimpleTask other)
{
if (getNextExecutionMS() < other.getNextExecutionMS())
{
return -1;
}
if (getNextExecutionMS() > other.getNextExecutionMS())
{
return 1;
}
return 0;
}
@Override
public String toString()
{
return "SimpleTask for "+runnable+", due in "+(nextExecutionMS-System.currentTimeMillis());
}
}
class SimpleRunnable implements Runnable
{
private static int currentID = 0;
private int id;
public SimpleRunnable()
{
id = currentID++;
}
public void run()
{
System.out.println(this+" starting");
long startMS = System.currentTimeMillis();
while (System.currentTimeMillis() < startMS + 1000)
{
// do nothing
}
System.out.println(this+" finished");
}
@Override
public String toString()
{
return "SimpleRunnable "+id;
}
}
... aber das ist noch ziemlich unausgegoren, und um es RICHTIG zu machen, fehlen mir sowohl Zeit als auch Motiviation.
Vielleicht hat ja noch jemand anderes eine einfachere Idee, oder jemand weiß, wo es die magische
Oh das ist ja doch etwas mehr. Werd ich mir morgen mal genauer ansehen. Wie im ersten Post erwähnt gibt es eigentlich das get:
Java:
V get()
throws InterruptedException,
ExecutionException
Waits if necessary for the computation to complete, and then retrieves its result.
Returns:
the computed result
Throws:
CancellationException - if the computation was cancelled
ExecutionException - if the computation threw an exception
InterruptedException - if the current thread was interrupted while waiting
nur sobald man das cancel macht scheint für ihn die "computation" complete zu sein.
Wenn das mit dem Code auch nicht hinhaut dann werd ichs eben mit dem sleep und der Schleife machen, denn so langsam geh ich an dem Kram zu Grunde :bloed:
Mal was ganz anderes: Muss diese sychronisation wirklich auf Ebene der Tasks und des Executors gemacht werden? Also, kann man das Schreiben in die DB und das löschen/canceln von Tasks nicht an einer gemeinsamen Stelle machen, wo man dann u.U. vergleichsweise "trivial" sicherstellen könnte, dass das nicht gleichzeitig passiert?
Konnte leider nicht mehr editieren. Aber so kanns ja nicht gehen, dann müsst ich die Funktionen ja in die Aktien-Klasse schreiben damit das funzt, aber eine Aktie kann sich ja nicht selber löschen. Wenn ichs in die Model-Klasse schreibe, die den ganzen DB-Kram macht gilt das ja wieder für alle. Also sobald irgendeine Aktie aktualisiert wird kann keine andere gelöscht werden und andersrum.
Es könnte aber die Verwaltung etwas einfacher machen: Man könnte sich z.B. speichern, welche Aktie gerade am aktualisieren ist (schon an diese Information kommt man mit dem Executor schon schlecht ran, nur mit beforeExecute/afterExecute), und das löschen würde kein explizites Canceln erforderlich machen. Aber ich will dich nicht zu irgendwelchem Umstrukturierungsmaßnhamen verleiten, von denen ich nicht wissen kann, ob sie sich nachher als hilfreich rausstellen oder nicht....
Warum sagt eigentlich sonst niemand was hierzu....!?
Ja nur wie gesagt, dann müsste ich das ja beides in die Aktienklasse packen, nur dann hätte die Aktienklasse ja eine Funktion um sich selbst zu löschen :autsch:
Da ich das doch per Aktie machen muss, wenn ich das bspw. in meine Model-Klasse packe die die ganzen Datenbank-Abfragen macht, dann würde das ja wieder für alle gelten. Also sobald eine Aktie aktualisiert kann keine andere gelöscht werden, weil ich ja ein Objekt dieser Klasse habe, die die Abfragen für alle Aktien macht.
Du meintest wegen des 'synchronized'? Ob man dort synchronized oder eigene locks/conditions verwendet, mal außen vor gelassen: Man kann auch auf andere Objekte synchronisieren
synchronized(irgendeineAktie) { ... }
Haaalllooo?! Liest das hier sonst noch irgendjemand? Irgendjemand, der noch Ideen dazu hat? :bahnhof: