callable in ExecutorService killen?

mohrenkopf

Mitglied
Gibt es eine irgend eine Möglichkeit, einzelne oder alle Callables die in einem CachedThreadPool laufen sofort zu killen? Ich brauche sowas Ähnliches wie Thread.stop(), von mir aus auch als dirty hack oder per reflection etc...

Hintergrund ist eine Art automatisches Testtool für fremden Code. Und Dieser fremde Code ist mit großer Wahrscheinlichkeit fehlerhaft und enthält auch mal die ein oder andere Endlosschleife. Und ich habe auf keine Zeile Code Einfluss. Der Fremdcode implementiert ein Interface (Command-Pattern) mit einer Methode public void execute(), die von meinem Code in einem eigenen Thread ausgeführt wird.

PS1: Ja, ich weiß, dass man das normalerweise "nicht macht", ich weiß auch sehr gut warum nicht und ich weiß auch sehr gut, wie man es normalerweise richtig macht, dazu gibt es wirklich genügend Threads. Falls ich eine gute Antwort bekomme: ich sag's auch keinem weiter, wirklich!

PS2: Bitte, bitte sorgt euch auch nicht um meine Sicherheit etc, ich habe selbstverständlich einen eigenen Classloader, einen Securitymanager etc.

Vielen Dank für Eure Antworten
Mohrenkopf
 

Marco13

Top Contributor
Wo du' schon erwähnst: Thread.stop() vielleicht? Es ist deprecated und unsafe, aber ein schneller Hack, der "Normalerweise" einen Thread brutaler stoppt als alles andere....
 
G

Gast2

Gast
Wenn du ein Callable ausführt bekommst du ein Future Objekt zurück. Auf dem kannst du cancel() aufrufen was den Task killt.
 

mohrenkopf

Mitglied
die ExecutorService haben doch ihre stopmethoden - warum nutzt du die einfach nicht ?

zb awaitTermination

Weil die im geschilderten Fall eben einfach nicht funktionieren!

ExecutorService.shutdownNow() bewirkt ja nur, dass isInterrupted() true liefert, d.h. das ganze funktioniert nur dann, wenn die laufenden Threads aktiv isInterrupted() abfragen.

Das ist ja im Normalfall auch richtig, aber ich brauche eine Art kill() ohne aktive Mitarbeit der Threads, ich gehe einfach mal davon aus, dass mein Runnable so aussieht, ohne dass ich etwas dagegen tun kann:

Java:
//Ein böses Runnable mit Endlosschleife
Runnable r = new Runnable()
{
  @Override
  public void run()
  { 
    while(true)
    {
      System.out.println( "Endlosschleife");
      if(Thread.interrupted())
        System.out.println( "Interrupted, ist mir aber egal!" );
    }
  }
};

Ein executorService.shutdownNow() hilft da nicht wirklich weiter...

awaitTermination() blockt auch den aktuellen Thread auch noch solange, bis das Runnable endlich aufhört (was eben nie passiert), hilft auch nicht weiter.

Thread.stop() wäre genau mein Ding aber ich habe eben einen ExecutorService und da gibt's das leider nicht.


Weitere Ideen?

mohrenkopf
 
B

bygones

Gast
awaitTermination() blockt auch den aktuellen Thread auch noch solange, bis das Runnable endlich aufhört (was eben nie passiert), hilft auch nicht weiter.
das stimmt so aber nicht, awaitTermination hat als parameter einen Timeout, also entweder bis das Callable fertig ist/sind oder eben bis der Timeout eintritt
 
B

bygones

Gast
also wenn ich es so starte
Java:
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.invokeAll(Arrays.asList(new Foo()), 3, TimeUnit.SECONDS);
System.out.println("ok");

class Foo implements Callable<String> {
        @Override
        public String call() throws Exception {
            boolean b = true;
            while (b) {
                System.out.println("still running");
                Thread.sleep(500);
            }
            return "this";
        }
    }
wird das Callable nach 3 sekunden korrekt unterbrochen und ein get auf das future objekt wirft dann die exception.
 

FArt

Top Contributor
also wenn ich es so starte
Java:
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.invokeAll(Arrays.asList(new Foo()), 3, TimeUnit.SECONDS);
System.out.println("ok");

class Foo implements Callable<String> {
        @Override
        public String call() throws Exception {
            boolean b = true;
            while (b) {
                System.out.println("still running");
                Thread.sleep(500);
            }
            return "this";
        }
    }
wird das Callable nach 3 sekunden korrekt unterbrochen und ein get auf das future objekt wirft dann die exception.
Logisch: Thread (Java 2 Platform SE v1.4.2) .
 

mohrenkopf

Mitglied
also wenn ich es so starte
Java:
...
            while (b) {
                System.out.println("still running");
                Thread.sleep(500);
            }
wird das Callable nach 3 sekunden korrekt unterbrochen und ein get auf das future objekt wirft dann die exception.

Danke, aber ohne Thread.sleep(500); funktioniert das leider auch nicht.

mohrenkopf
 
Zuletzt bearbeitet:

FArt

Top Contributor
Annahme:
Thread#stop ist ok und du bist dir voll über die Konsequenzen bewusst, besonsers wenn du keine Kontrolle über den Code hast, der in dem Thread ausgeführt wird.

Umsetzung:
Leite z.B. von ThreadPoolExecutor ab und baue deine eigene Implementierung. Insbesondere kannst du eine eigene Worker-Implementierung definieren, die eine kill-Methode implementiert, die Thread#stop aufruft. Passend dazu benötigst du wohl noch eine eigene Future-Implementierung.


Ich habe den Ansatz nur mal grob im Code durschgeschaut, sollte aber funktionieren. Nimm dir mal eine Stunde Zeit für einen proof of concept.


Insgesamt dürfte das Problem aber tiefschichtiger sein. Gerade die Nummer mit dem eigenen Classloader ist natürlich wichtig, aber wiederum alles andere als trivial (wenn man es richtig machen möchte) ;-)

Eine Alternative wäre evtl., den Aufruf in einer eigenen VM als Prozess zu forken. Process#destroy räumt garantiert alles ab und du musst dir um Classloading, Seiteneffekte und Ressourcen keinen Kopf machen.
 

mohrenkopf

Mitglied
ES GEHT! Danke FArt, so funktioniert es!

Ist vorerst wirklich nur eine Machbarkeitsstudie, habe mal folgendes gemacht:
ThreadPoolExecutor erweitert, da fast alles (z.B. die workers) private final sind, habe ich einfach mal den kompletten Sourcecode von ThreadPoolExecutor per C&P kopiert.

Java:
public class KillThreadPoolExecutor extends ThreadPoolExecutor
{
   //kompletten source von ThreadPoolExecutor reinkopiert
}

Neue Methode:
Java:
public void killAll_YES_I_KNOW_WHAT_I_AM_DOING()
    {
        for(Worker w : this.workers)
        {
            w.killWorker();
        }
    }

Und da die innere Klasse Worker erhält eine neue Methode:

Java:
private final class Worker implements Runnable
{
       ...
        //NEU:
        private void killWorker()
        {
            this.thread.stop();
        }
       ...
}

Eine Endlosschleife vom Typ while(true) wird damit brutal aber effektiv beendet, ist ja im Endeffekt auch wieder nur Thread.stop().

Wenn man nun die Reihenfolge shutdownNow(), awaitTermination(angemessener Timeout), killAll() einhält, dann hat man's ja "erst im Guten" versucht.

Die Copy-Paste-Aktion erscheint mir noch "dezent" unelegant, aber anders sehe ich wenig Möglichkeiten, die interne private final Worker-Klasse zu erweitern, geschweige denn an die Referenzen der Workers überhaupt ranzukommen. Und extends ThreadPoolExecutors muss es trotzdem sein, weil an andere Methoden 'this' übergeben wird und die eben einen ThreadPoolExecutor erwarten.



Vielen Dank an alle, die mitgedacht haben
 

mohrenkopf

Mitglied
So, der Vollständigkeit halber noch der Code. Nochmal Danke an alle, die geantwortet haben und damit mit zur Lösung beigetragen haben.:toll:

[WR]Der folgende Code ist ganz ganz ganz arg Bäh! Bitte nicht nachmachen![/WR]

Die Endlosschleife
Java:
//InfiniteLoop.java
package demokillthreads;
import java.util.concurrent.Callable;

/**
 * Kleine Endlosschleife
 */
public class InfiniteLoop implements Callable<String>
{
    @Override
    public String call() throws InterruptedException
    {
        int i = 0;
        boolean b = true;
        while ( b )
        {
            System.out.println( "Endlosschleife " + ( i++ ) );
        }
        return "Endlosschleife ist fertig!";
    }
}

Das Demoprogramm dazu
Java:
//DemoKillThreads.java

/*
 * Demo: Endlosschleifen-Thread in KillThreadPoolExecutor brutal killen
 * @author mohrenkopf
 */
package demokillthreads;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;


public class DemoKillThreads
{
    public static void main( String[] args ) throws InterruptedException, ExecutionException
    {
        KillThreadPoolExecutor pool = new KillThreadPoolExecutor();

        //Starte Thread:
        System.out.println( "Start..." );
        pool.submit( new InfiniteLoop() );

        //Pause
        Thread.sleep( 5 );

        //Shutdown-Versuch:
        System.out.println( "Shutdown..." );
        pool.shutdownNow();
        
        //Zeit geben zum Beenden:
        pool.awaitTermination( 50, TimeUnit.MILLISECONDS );

        //Und kill:
        System.out.println( "Kill all" );
        pool.killAll_YES_I_KNOW_WHAT_I_AM_DOING();
        
        System.out.println( "Ende" );
    }
}

Und der modifizierte Teil des ThreadPoolExecutors (Rest ist nur java-source kopiert und hat >30.000 Zeichen).
Java:
//KillThreadPoolExecutor.java

/*
 * Original ThreadPoolExecutor (c) Oracle
 * 
 * Modifikationen by mohrenkopf
 * Alle Originalkommentare sind entfernt, nur meine Veränderungen sind kommentiert
 * 
 */
package demokillthreads;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class KillThreadPoolExecutor extends ThreadPoolExecutor
{

    private static final RuntimePermission shutdownPerm = new RuntimePermission( "modifyThread" );
    volatile int runState;
    static final int RUNNING = 0;
    static final int SHUTDOWN = 1;
    static final int STOP = 2;
    static final int TERMINATED = 3;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final Condition termination = mainLock.newCondition();
    
   /* mohrenkopf: überschriebene Klasse Worker nehmen */
    private final HashSet<KillThreadPoolExecutor.Worker> workers = new HashSet<KillThreadPoolExecutor.Worker>();
    
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
    private volatile int poolSize;
    private volatile RejectedExecutionHandler handler;
    private volatile ThreadFactory threadFactory;
    private int largestPoolSize;
    private long completedTaskCount;
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    /**
     * mohrenkopf:
     * Konstruktor 
     * wie in static factory Executors.newCachedThreadPool()
     * 
     */
    public KillThreadPoolExecutor()
    {
        this( 0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>() );
    }
    
    
    /**
     * mohrenkopf
     * Alle Threads abschießen
     */
    public void killAll_YES_I_KNOW_WHAT_I_AM_DOING()
    {
        for ( Worker w : this.workers )
        {
            w.thread.stop();
            //w.killWorker();
        }
    }
    
   
    
    public KillThreadPoolExecutor( int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue )
    {
        this( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler );
    }

    public KillThreadPoolExecutor( int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory )
    {
        this( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, defaultHandler );
    }

    public KillThreadPoolExecutor( int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            RejectedExecutionHandler handler )
    {
        this( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), handler );
    }

    public KillThreadPoolExecutor( int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler )
    {
        super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler );


        if ( corePoolSize < 0
                || maximumPoolSize <= 0
                || maximumPoolSize < corePoolSize
                || keepAliveTime < 0 )
        {
            throw new IllegalArgumentException();
        }
        if ( workQueue == null || threadFactory == null || handler == null )
        {
            throw new NullPointerException();
        }
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos( keepAliveTime );
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    
     /* mohrenkopf:
     * ab hier nur noch copy&paste von java.util.concurrent.ThreadPoolExecutor (und evtl. ein bisschen refactoring)
     * 
     */
 

Ähnliche Java Themen

Neue Themen


Oben