Nun ist mein Problem aber, dass ich ebenfalls EJB verwende.
Das heißt:
Ich möchte auch innerhalb der oben genannten Klasse (der Link) auf eines meiner anderen EJB Klassen machen, was aber so dann nicht geht....
Wie kann man das genau machen?
Wenn ich das richtig sehe habe ich:
MyEmailIdleService
a) Ruft alle Emailadresse ab, bei denen eine Prüfung erfolgen soll
b) Macht dann einen Thread per Emailadresse auf und überwacht diesen
Denke mal nicht - ich hatte die Anforderung bisher nicht, in einer Java EE-Anwendung Postfächer überwachen zu müssen. Da JavaMail sowieso zum Einsatz kommt, frage ich mich halt, warum man nicht verwendet, was die Lib bietet. Sieht auf den ersten Blick jedenfalls recht einfach aus.
In der Doku steht: "The IdleManager is created with a Session, which it uses only to control debug output. A single IdleManager instance can watch multiple Folders from multiple Stores and multiple Sessions."
Aber ich möchte die Möglichkeit haben einzelne Emailadresse auch nicht mehr zu überwachen...
Das brauche ich doch pro Emailadresse einen IdleManager?
Damit ich die stop() - Methode anwenden kann?
Wie kann ich das machen?
Ich könnte ja zB alle IdleManager Objekte in einer Liste speichern. Leider hat IdleManager keine ID, sonst könnte ich sowas machen wie
public void stop(String id){
// Suche ID aus der Liste heraus und wenn auf dieses Objekt dann stop() an...
Du brauchst die stop()-Methode dafür nicht. Die Überwachung durch den IdleManager funktioniert andersrum. Auch das steht in der Doku (den wichtigen Part habe ich mal hervorgehoben): "Note that, after processing new messages in your listener, or doing any other operations on the folder in any other thread, you need to tell the IdleManager to watch for more new messages. Unless, of course, you close the folder."
Heißt: sobald Du irgendwas mit einem überwachten Folder machst, musst Du dem IdleManager erneut sagen, dass er den Folder überwachen soll.
ja, aber wenn ich dann von "außen" eingreifen möchte und die Überwachung eines einzelnen Postfachs stoppen möchte, dann brauche ich doch eine Methode wie stop() ?
Also so ganz komme ich damit leider noch nicht klar.... Ich habe jetzt doch mal den Beispielcode aus meinem ersten Post genommen, da das Grundprinzip hier schon passt - und auch Dinge wie alle 9 Minuten der Server connected wird um einen Timeout vermeiden etc.
Das Überwachen der Email funktioniert, aber mein Problem ist:
a) Aufruf einer anderen EJB dann
Vielleicht mal vom Grundprinzip, was ich machen will:
Ich will mehrere Emailadressen überwachen, ob neue Email vorhanden sind.
Wenn ja (und hier kommt dann der Aufruf einer anderen EJB zum Einsatz), möchte ich in meinem Backend prüfen, ob es zu dem Betreff und dem Sender in meiner Datenbank bereits einen Eintrag gibt.
Wenn ja, dann werden weitere Dinge geschehen (Speicherung in DB etc. - soll jetzt aber nicht das Thema sein)...
Im Moment sieht es nun so aus:
1) TicketEmailWatcherService...
-> Ich denke, diese muss ich später noch mit @Startup deklarieren, dass diese beim Starten der Applikation gestartet wird und die startListener() - Methode mit @PostConstruct ?
Hier hole ich mir die Emailadressen aus der Datenbank, die überprüft werden sollen.
Ebenfalls erstelle ich das Store - Objekt, um mich später zu connecten....
Grundproblem:
Am liebsten wäre mir in der "TicketEmailWatcherService", dass ich dann eine Liste der Emails zurückbekomme, die neu sind.
Und dann kann ich mit den "Message" - Objekten machen, was ich will. Nachricht in der DB speichern usw.
Aber wie kann ich das machen??
Die EJB Spec "verbietet", dass Du selbst Threads erstellst. Natürlich kann Dich die Spec nicht daran hindern, aber Du solltest keinesfalls aus einem solchen Thread Dinge verwenden, die unter der Fuchtel des Application Servers stehen. Außerdem ist es keine gute Idee, für jede Verbindung einen eigenen Thread zu verbraten.
Wenn Du trotzdem daran festhalten willst, würde mir spontan einfallen, eine LinkedBlockingQueue aus diesen Threads heraus zu füllen und diese Queue aus einem managed Thread (ManagedExecutorService) heraus zu lesen. Die EJB und Deine Threads teilen sich also eine Queue.
Nur als kleine Info am Rande: achte doch bitte darauf, deine Beiträge in einem halbwegs passenden Unterforum zu erstellen, dies hier ist ebenso wie deine anderen Beiträge alles andere als ein Anfängerthema
a) Was benötige ich dann noch von dem obigen Code? Muss ich das mit den Threads komplett löschen?
b) Was macht: Object obj = messages.take();
c) Wie kann ich einzelne Postfächer stoppen? (Watch stoppen) ?
d) Was ist mit: queue.offer() gemeint?
a) alles - es ging ja nur darum, eine Verbindung zwischen den von Dir und den vom Application Server verwalteten Threads herzustellen, um auf die Ressourcen (wie EJBs) zugreifen zu können. Diese Verbindung erfolgt über eine Queue. Daher:
Das Prinzip ist ganz einfach: Du hast einen Postfach-Thread. Der erhält irgendwann neue Nachrichten und schiebt diese in einen FIFO-Puffer (Queue). Auf der anderen Seite hast Du einen vom Application Server verwalteten Worker-Thread, der einfach eine Nachricht nach der anderen aus der Queue liest und damit irgendwas macht.
OK - also habe ich immernoch Threads, die dann aber vom Application Server bearbeitet werden...
Und wie sieht der Aufbau dann der Klassen aus? Welche Modifikation brauche ich in den oberen Klassen?
a) Wie starte ich denn den Listener, der prüft, ob neue Email da sind oder nicht?
b) Im Idealfall rufe ich dann in der public void onMailReceived(Object[] messages) eine Methode via EJB auf, die dann weiteres macht (Ticket erstellen aus den Infos der Message)
Demnach bräuchte ich noch eine Instanzierung in public class MailPushEventHandler extends MailPushEmpfaenger ?
c) Wie muss doSomethingWith(obj); abgeändert werden?
a) Das passiert bei der Erzeugung Deines Handlers doch automatisch
b) Nein, im Idealfall rufst Du in onMailReceived die offer-Methode der Queue auf.
c) doSomethingWith(obj) ist lediglich ein Platzhalter, den Du halt mit den für Dich passenden Anweisungen ersetzen musst.
Das habe ich in #16 bereits beschrieben. Der Code aus #16 muss in Deine EJB (TicketEmailWatcherService). Bei der Erzeugung des MailPushEventHandlers musst Du die messages-Queue mitgeben (-> Anpassung des Konstruktors von MailPushEventHandler). Und in MailPushEventHandler#onMessageReceived legst Du das zu verarbeitende Objekte in die Queue mit der offer-Methode.
welche Klassen stecken denn hinter: private Queue<Object> messages; ?
Bekomme in Eclipse einen Fehler, dass er "take" nicht findet...
"The method take() is undefined for the type Queue<Object>"
Fast. Nimm den try-catch-Block (EDIT: der auf Ebene von initStartTicketListener) nach unten, dann ist messages auch schon initialisiert, wenn Du den Konstruktor von MailPushEventHandler aufrufst
Aktuell entnimmst Du nur das Objekt aus der Queue, machst aber damit noch nichts. Du könntest an der Stelle die Nachricht in der DB speichern, oder was auch immer Du damit anfangen willst.
Nein, das obj aus der Queue ist ja bereits eine neue Message (oder was auch immer Du in die Queue schiebst):
Java:
executor.submit(()->{while(running){try{Object obj = messages.take();// Rufe andere EJB Klasse auf und speichere obj in DB}catch(MessagingException e){
e.printStackTrace();}catch(IOException e){
e.printStackTrace();}}}catch(InterruptedException e){}}});}
Dann bleibt jetzt aber noch die Frage wie ich die Überwachung von einzelnen Postfächer stoppe?
In der TicketEmailWatcherService schwebt mir sowas vor wie:
public void stop(String emailAddress){
???
}
Was ich brauche ist eine eindeutige ID / Emailadresse, in der ich das "watching" dann eines MailPushEventHandler stoppe?
Läuft nun super. Vielen vielen Dank.
Muss ich den Handler noch initialisieren? Wenn ja, wie? Map<String, MailPushEventHandler> handlers
Ich habe noch ein Problem bei folgendem Code, welcher allerdings nur ein paar Mal auftritt.
Ich schätze, wenn ich die Email schon gelesen habe oder ähnliches.
D. h. dass Du nicht je Verbindung einen Thread hast, sondern ein Thread für zig Verbindungen gleichzeitig zuständig ist. Der IdleManager aus #2 arbeitet mit non-blocking I/O.
Wenn Du den IdleManager (s. #2) verwendest und keine Threads, bekommst Du non-blocking I/O gratis. Vielleicht habe ich Dich auch missverstanden und Du willst wissen, wie man selbst mit non-blocking I/O verwendet. Unter http://tutorials.jenkov.com/java-nio/index.html scheint es ein Tutorial dazu zu geben.
ja, ich verwende den IdleManager.
Dann passt es doch schon bereits? Oder verstehe ich dich jetzt falsch?
Die Frage, die ich mir eben stelle ist, wenn ich bspw. 1000 Emailadressen überwache, ob ich Probleme bekomme im Server.
Dass es dann vllt zu einer Verzögerung bei der Überwachung kommt, ist dann verständlich.
Die Frage ist dann eben, ob mehr Server (mehr Kapazität, mehr Threads) dies dann ausgleichen?
Die Frage, die ich mir eben stelle ist, wenn ich bspw. 1000 Emailadressen überwache, ob ich Probleme bekomme im Server.
Dass es dann vllt zu einer Verzögerung bei der Überwachung kommt, ist dann verständlich.
Die Frage ist dann eben, ob mehr Server (mehr Kapazität, mehr Threads) dies dann ausgleichen?
Habe hierzu nochmal eine allgemeine Frage.
Ich würde gerne in meiner Applikation einen "Scheduler" haben. Also um konkret zu sein:
- Ich habe eine DB - Tabelle "MySchedules". Hier steht dann das "plannedExecutionDate".
Im Moment habe ich eine Methode mit @Schedule annotiert, die dann jede Minute läuft.
Hier wird geprüft, ob es in der DB Einträge in der "MySchedules" gibt, die noch nicht losgelaufen sind und das "plannedExecutionDate" kleiner als die jetztige Uhrzeit ist.
Wenn das zutrifft, dann wird das Objekt von "MySchedules" in eine Liste von MySchedules hinzugefügt und anschließend eine entsprechende Methode aufgerufen (Kunde erstellen whatever):
Java:
@Timeout@AccessTimeout(value =20, unit =TimeUnit.MINUTES)@Schedule(minute ="*/1", hour ="*", persistent =false)publicvoidexecute()throwsException{
LOGGER.info("START SCHEDULER");try{if(currentlyRunning){
LOGGER.warn("Scheduler is currently running...");return;}if(schedulingService ==null)
schedulingService =newSchedulingService();
currentlyRunning =true;/**
* Alle noch nicht ausgeführten Schedules bekommen
*/List<ScheduleExecution> scheduleExecutionList = scheduleExecutionService
.findAllScheduleExecutionOutstanding();if(scheduleExecutionList ==null|| scheduleExecutionList.size()==0){
LOGGER.info("NO PLANNED SCHEDULES FOUND");
currentlyRunning =false;return;}
LOGGER.info("FOUND SCHEDULES: "+ scheduleExecutionList.size());for(ScheduleExecution scheduleExecution : scheduleExecutionList){synchronized(this){
schedulingService.addTaskToExecutor(taskId,newRunnable(){@Overridepublicvoidrun(){try{// executeAction(scheduleExecution);}catch(Execption e){
LOGGER.error(ExceptionUtils.getFullStackTrace(e));}}},0);}}
currentlyRunning =false;}catch(Exception e){
LOGGER.error(ExceptionUtils.getFullStackTrace(e));
currentlyRunning =false;}
LOGGER.info("END SCHEDULER");}
Meine Klasse "SchedulingService" sieht so aus:
Java:
publicclassSchedulingService{privatestaticMap<String,ScheduledFuture> scheduleFutures =newHashMap<>();privateScheduledExecutorService scheduledExecutorService =null;privateint executorPoolSize =5;/**
*
* Will create ScheduledService Object
*
*/publicvoidcreateScheduledExecutor(int executorPoolSize){
scheduledExecutorService =Executors.newScheduledThreadPool(executorPoolSize);addRecurringTaskToExecutor("removeStopedScheduledFuturesFromMap",newRunnable(){@Overridepublicvoidrun(){List<String> tasksToBeRemoved =newArrayList<>();synchronized(scheduleFutures){
scheduleFutures.keySet().stream().filter(task ->{return scheduleFutures.get(task).isDone();}).forEachOrdered(task ->{
tasksToBeRemoved.add(task);});
tasksToBeRemoved.stream().forEach(key ->{
scheduleFutures.remove(key);});}}},0,10);}/**
*
* Will Stop the ScheduledService
*
*/publicvoidstopScheduledExecutor(){
scheduledExecutorService.shutdown();}/**
*
* return ScheduledService Object if exists otherwise first create the object
* then return
*
*/publicScheduledExecutorServicegetExecutorServiceObject(){if(scheduledExecutorService ==null){createScheduledExecutor(this.executorPoolSize);}return scheduledExecutorService;}/**
*
* Add new tasks to executors
*
*/publicvoidaddRecurringTaskToExecutor(String taskID,Runnable event,int initialDelay,int periodicDelay){synchronized(scheduleFutures){System.out.println("Scheduling new task : "+ taskID);
scheduleFutures.put(taskID,getExecutorServiceObject().scheduleAtFixedRate(event, initialDelay,
periodicDelay,TimeUnit.SECONDS));}}publicvoidaddTaskToExecutor(String taskID,Runnable event,int initialDelay){synchronized(scheduleFutures){System.out.println("Scheduling new task : "+ taskID);
scheduleFutures.put(taskID,getExecutorServiceObject().schedule(event, initialDelay,TimeUnit.SECONDS));}}publicvoidaddTaskToExecutor(String taskID,Runnable event){addTaskToExecutor(taskID, event,0);}/**
*
* Will remove tasks from scheduler
*
*/publicvoidremoveTaskFromExecutor(String taskID,boolean forceStop){synchronized(scheduleFutures){System.out.println("Stopping "+ taskID);if(scheduleFutures.containsKey(taskID))
scheduleFutures.get(taskID).cancel(forceStop);elseSystem.out.println(taskID +" Already Stopped");}}}
Nun meine Frage, macht das Sinn?
Weil du vorher was geschrieben hast, dass man nicht selbst Threads anlegen soll?