Zeitkritische Threads

komplexor

Mitglied
Hallo,

habe folgende Aufgabe. Ich soll ein Programm entwickeln, das einen Stream über ein Socket einliest und einen Teil des Inhalts in die Datenbank sichert. Dieser Stream kommt i.d.R. alle 15 Sek, kann aber auch mal ausfallen oder leicht verzögert ankommen. Soweit erst mal dazu.
Eine weitere Aufgabe des Programms soll es sein, eine Datei zu parsen und in die Datenbank zu schreiben.
Diese Datei wird von einem externen Programm alle 10 min. neu erstellt und innerhalb dieser 10 min. mit Daten gefüllt.Hierbei überprüfe ich alle 60 sek. mit der Methode .lastModified() ob der letzte Dateizugriff länger als 20 sek. beträgt, wenn ja, bearbeite die Datei. Dabei kann der Inhalt der Datei bis zu 50000 Zeilen enthalten, die geparst werden müssen und in die DB geschrieben werden, das kann je nach Zeilenanzahl, zwischen 15-20 sek dauern.
Das alles soll natürlich parallel in einem Programm laufen.

Jetzt das Problem, bzw. eine grundlegende Frage:
Ist es überhaupt möglich mittels Threads zu gewährleisten, das der Stream, der ca. alle 15 sek. kommt, trotzdem empfangen und bearbeitet wird, während noch die Datei mit ihren ca. 50000 Zeilen verarbeitet und in die DB geschrieben wird, was durchaus länger als 15 sek. dauern kann???
Es ist also ein zeitkritisches Problem, bei dem keine Daten verloren gehen dürfen.

Falls es realisierbar ist, würde ich mich über ein ungefähres Schema freuen.

Besten Dank im Vorraus
 

AmunRa

Gesperrter Benutzer
Nun du könntest das Parsen der Daten in einem Eigenen Thread laufen lassen. Das lesen aus dem Stream lässt du auch in einem Thread laufen und das lesen der Datei ebenfalls. Alle Daten die diese Threads lesen speicherst du in einer Queue. Die Queue ist sozusagen die Komunikationssc nittstelle der Threads. Diese Queue musst du natürlich Threadsave machen.

Vl schreibst du die noch eine Eigene Klasse, die die Eingelesenen Daten kapselt und nur diese Objekte speicherst du in der Queue.

Wenn nun die Daten schneller gelesen werden, als sie von dir geparst werden können, ist dies nun kein Problem mehr, da die Daten ja in der Queue gespeichert werden, dann hinkt, halt im Moment der Parser hinterher, wenn aber später dann weniger Daten zu lesen sind und daher der Parser mehr Zeit hat arbeitet er das nach
 

komplexor

Mitglied
@AmunRa

das hört sich schon mal nicht schlecht an. Das Prinzip von Queue und Threadsave war mir noch nicht bekannt, ich werd mich mal diesbezüglich schlau lesen und mich daran versuchen.
Meld mich bei (miss)erfolg.
 

moormaster

Top Contributor
Mal ne kleine Zwischenfrage:

woher weiss Dein Programm denn, wann die Datei, die Du da einliest, zu Ende geschrieben wurde? Es kann doch passieren, dass Du anfängst die Datei zu parsen noch während das andere Programm dabei ist, alle Daten dort reinzuschreiben... geht da nicht unter Umständen was verloren?
 

komplexor

Mitglied
Das ist natürlich eine Sache, die vorher überprüft werden muss. Ich nutze zum einen lastModified() und prüfe damit ob der letzte Dateizugriff länger als 20 sek. her ist. Da dies aber in diesem Fall nicht immer ausreicht, prüfe ich zusätzlich noch mit file.length() ob die Dateigröße sich verändert hat.
 

komplexor

Mitglied
also leider komme ich nicht wirklich weiter, da ich nicht weiß wie ich wo ansetzen soll um eine parallelität zu gewährleisten. ergänzend muß ich noch hinzufügen, dass ich ca. 80 Streams entgegen nehmen muß, wobei jeder einzelne Stream ca. alle 15 sek. ankommt und Daten sendet die geparst werden.
Für jeden Stream starte ich nun ein Thread, der auf einen anderen Port lauscht und die eingehenden Daten verarbeitet. Diese Aufgabe wäre der erste Part des Monitoringprogramms.
Wie bereits am Anfang erläutert, besteht die zweite Aufgabe des Programms nun darin, neben den Streams eine Datei zu Parsen die lokal ca. alle 10 min. erstellt wird. In dieser Datei befinden sich ca. 80.000 zeilen, von denen dann ca. 50.000 in die Datenbank geschrieben werden. Dieser Vorgang für eine Datei benötigt um die 20 sek. Bearbeitungszeit.
Aktuell habe ich mein Programm soweit, dass die Threads zwar gestartet werden, aber in der Zeit wo die Datei geparst wird, werden keine Streams verarbeitet.
Die Frage lautet nun, wie die zeitaufwendige Aufgabe kurz unterbrochen werden kann, sobald von einem der 80 Streams etwas ankommt.
Hier mal der Code, der übersichtshalber hab ich es etwas verkürzt (z.B. keine initialisierung der Variablen oder auslassen unwichtiger methoden):

Also, was muß man hier ergänzen bzw. verändern, damit auch alles reibungslos und ohne Datenverlust
paralell abläuft?

Java:
public class EpochAndSatLog {
	
	String host = "192.168.0.135";

	//Standardkonstruktor
	public EpochAndSatLog(){
		while (Thread.currentThread().isAlive()) {
//Start der einzelnen Threads für die unterschiedlichen Streams
			new Thread(new SocketConnection(host, 33333));
			new Thread(new SocketConnection(host, 33334));
			new Thread(new SocketConnection(host, 33335));
//Start des Threads zum verarbeiten der Datei
			Thread delayLog = new Thread(new StationDelayLog());
		}
	}
	
}

//Klasse zum verarbeiten von Streams
public class SocketConnection implements Runnable {
		
	public SocketConnection(String pHost, int pPort) {
		this.host = pHost;
		this.port = pPort;
		connectSocket(host, port);
	}
	
	@Override
	public void run() {
		connectSocket(host, port);			
	}
	
	//Methode stellt Socketverbindung her
	private void connectSocket(String host, int port) {
		try {
			socket = new Socket(host, port);
			readStream(socket);
		} catch (IOException e) {
			System.err.println("Fehler in connectSocket, Host "+host+" oder Port "+port+" nicht gefunden");
			return;
		}
	}
	
        public void readStream(Socket s) {
		try {
			bufferedReader = new BufferedReader(new InputStreamReader(
					s.getInputStream()));
		} catch (IOException e) {
			System.err.println("Fehler beim einlesen des Streams in SocketConnection               
                       (s.getInputStream)");
			return;
		}
		
		while (bufferedReader != null && diff<=20000) {
			startTime = System.currentTimeMillis();
			char[] charBuffer = new char[100000];
			sectionStr = "";
			try {
				bufferedReader.read(charBuffer);
			} catch (IOException e) {
				System.err.println("Fehler bei bufferedReader.read(charBuffer)");
				//readStream(socket);
				return;
			}
			readedBufferStr = new String(charBuffer).trim();
			readedBufferStr = readedBufferStr + "\n";
			uncompleteTokensStr = restStr + readedBufferStr;
			uncomplEpocheTknz = new StringTokenizer(uncompleteTokensStr,
					delimiter, true);

			// setzt unvollständige Sections zusammen
			while (uncomplEpocheTknz.hasMoreTokens()) {
				sectionStr = uncomplEpocheTknz.nextToken();
				if (uncomplEpocheTknz.hasMoreTokens()) {
					sectionStr = sectionStr + uncomplEpocheTknz.nextToken();
				}
				if (sectionStr.contains(delimiter)) {
					complSectionsStr = complSectionsStr + sectionStr;
				} else {
					restStr = sectionStr;
				}
			}// while(uncomplEpocheTknz.hasMoreTokens())
			
			endTime = System.currentTimeMillis();
			diff = endTime - startTime;
						
			complSectionsTknz = new StringTokenizer(complSectionsStr, delimiter);
			

			if (complSectionsTknz.countTokens() > 2) {
				prsThread = new Thread(new Parser(complSectionsTknz, delimiter));
				restStr = "";
				complSectionsStr = "";
			}
		}// while(bufferedReader != null)
	}
}

//Klasse zum Verarbeiten der Datei
public class StationDelayLog implements Runnable {
	// Konstruktor
	public StationDelayLog() {
		try {
			checkFile();
		} catch (IOException e) {
			System.out.println("checkFile aus Konstruktor fehlgeschlagen");
			e.printStackTrace();
		}
	}// Konstruktor

	@Override
	public void run() {
		try {
			checkFile();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	private void checkFile() throws IOException {
		newFile = getNewFile();

			if (newFile.exists()) {
				fileSize_1 = newFile.length();
				currentTimeInMillis = System.currentTimeMillis();
				timeDiffOfLastModified = currentTimeInMillis - newFile.lastModified();
				//kurz warten um Änderung der Dateigröße wahrzunehmen
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
					e.printStackTrace();
					return;
				}
				fileSize_2 = newFile.length();
				
//			 	wenn letzter Dateizugriff vor 20 sek. und Dateigröße unverändert 
//				bearbeite Datei
//				oder falls Datei endlos beschrieben wird (Fehler im anderen Programm), 
//                              erzwinge einlesen nach 10 min. (Fehler von Geo++)				
				if ((timeDiffOfLastModified > 20000 && (fileSize_1 == fileSize_2)) || 
                                     timeDiffOfLastModified > 600000) {
						System.out.println("Datei " + newFile.getPath()
							+ " wird bearbeitet...");
						
						long startTime = System.currentTimeMillis(); //Zeitmessung Start
							prsThread = new Thread(new Parser(newFile));	
						long endTime = System.currentTimeMillis();//Zeitmessung Ende 

						// Gibt die benötigte Zeit für den Eintrag in die DB
						System.out.println((endTime - startTime) / 1000 + " sec für 
                                                    Bearbeitung");
						System.out.println("Datei " + newFile.getPath()	+ " bearbeitet!");
						newFile = getNewFile();
				} else {//File not finish
					System.out.println("Datei "+getFolderPath()+" wird noch beschrieben!");
					return;
				}
			} else {// !newFile.exist
				newFile = getNewFile();
			}
	}//END: checkFile()

	private File getNewFile() {
           .....
	}
 

AmunRa

Gesperrter Benutzer
Naja so wie ich es dir vorgeschlagen habe.

Stecke mal alle Daten die du empfängst in eine Queue.

und das Parsen muss dann auch in einem eigenen Thread laufen.

Edit: als Queue könntest du eine

java.util.concurrent.LinkedBlockingDeque da diese bereits Thread-safe ist soweit ich weiß
 
Zuletzt bearbeitet:

komplexor

Mitglied
@AmunRa
und wo füge ich die queue ein, bzw. wie adde ich dann die noch ausstehende aufgabe, bzw. hole sie zur weiteren bearbeitung raus. wie unterbreche ich das zeitaufwändige datei einlesen und verarbeite die datei bei zeit weiter? das verstehe ich noch nicht.
 

AmunRa

Gesperrter Benutzer
Für die Queuu würd ich halt eine eigenen Klasse machen auf die alle Zugreifen können.

wie man daten in eine Queue Steck solltest du wohl über die API finden. aber die Methode heißt put
lesen kannst du die Daten wieder mit der take methode
naja das lesen in der Datei wird wohl ebenfalls ein eigener Thread machen und da ich nicht weiß wie die Daten in dem File aussehen kann ich dir hier keine Tips geben.
 

komplexor

Mitglied
ok, das mit der Queue werd ich wohl irgendwie hinbekommen, allerdings weiß ich immer noch nicht, wie und an welcher stelle ich die zeitaufwendige dateibearbeitung unterbreche sobald ein stream seine daten sendet. Wenn ich dich richtig verstehe, soll an dieser stelle dann das bisherige Ergebniss in der queue zwichengelagert werden. Wenn nun alles Streams empfangen werden, wird im durchschnitt alle 0,1875 Sekunden ein Stream verarbeitet, das heißt, dazwischen kann/soll die große Datei verarbeitet werden.
Sorry wenn ich mich etwas quer anstelle, aber das ist mein erstes großes Projekt und ich bin noch nicht so vertraut mit threads.
btw: den Inhalt der Datei darf ich leider nicht herausgeben.
 
Zuletzt bearbeitet:

AmunRa

Gesperrter Benutzer
ich versteh dich da noch immer nicht. In die Queue sollen die Rohdaten der Streams gepackt werden. und der Parser liest diese Daten aus der Queue und verarbeitet diese. (Was mit dem Ergebnis dieser Verarbeitung passiert ist nicht festgelegt, darüber gibt es noch keine Aussage). In einem anderen Thread liest. du die Daten aus der Datei.

Ich nehme mal an, dass diese Daten auch irgendwie Blockweise gelesnen werden müssen (z.B. Zeile für Zeile oder vl eine andere Trennung). Diese Blöcke steckst du auch in die Queue und irgendwann wenn eben dieser Block an der reihe ist nimmt der Parser den Block aus der Queu heraus und verabeitet diesen,

Du musst nichts unterbrechen oder so, da das Lesen und das Verarbeiten getrennt voneinander funktionieren soll. und nur duruch die Queue eine kommunikation statt findet.

Für genauere Tips musst du uns z.B mal beschreiben wie deine Datei aufgebaut ist.
 

komplexor

Mitglied
Also die Datei hat in etwa diesen Aufbau mit ca. 80.000 zeilen

....
0000 xyz 122999.0 123000.1 1.149 1.149
0000 xyz1 122999.0 123000.2 1.179 1.179
0000 xyz2 123000.0 123000.2 0.199 0.199
0000 xyz3 123000.0 123000.2 0.199 0.199
#
# bla bla
#
# bla bla bla
# bla bla
0000 xyz2 122999.0 123000.2 1.209 1.209
0000 xyz23 123000.0 123000.2 0.209 0.209
....

wobei hier nur die Zeilen ohne # wichtig sind...anschließend wird jede zeile zerlegt und die einzelnen Werte in die db geschrieben.
wäre natürlich super, wenn ich gar nicht erst unterbrechen muß, allerdings läuft das prog aktuell so, dass es einen stream einließt, anschließend zur dateibehandlung wechselt und die aufgabe erst zu ende bringt, bevor er was anderes macht und in dieser Zeit werden keine streams empfangen.
 

AmunRa

Gesperrter Benutzer
wie gesagt stream Lesen und datei lesen jeweils in einem eigenen Thread laufen lassen. Warum soll denn auch der Stream wissen, wieviel noch in der Datei steht. im Datei lesen thread aber trotzdem manchmal ein kurzes Thread.sleep einfügen

naja und das schöne ist du liest einfach zeilen weise ein und steckst jede Zeile einzelnen in die Queue.
 

komplexor

Mitglied
scheinbar reden wir ständig aneinander vorbei, oder ich hab nen denkfehler.
Daher versuche ich das Problem noch ein mal zu beschreiben.
Ich starte bereits mehrere Threads, auch das Parsen läuft als eigenständiger thread sofern er gebraucht wird. Allerdings ist es bei Threads so, das sie ihre Aufgabe erst einmal abarbeiten, sofern sie nicht von außen unterbrochen werden und genau HIER hackt es bei mir.
ich starte das Hauptprogramm, darin werden dann die klassen als seperater thread gestartet, jeder thread arbeitet zunächst seinen Teil ab, bevor der nächste seine aufgabe erledigen kann.
Also das hauptprogramm startet, dabei werden zunächst die threads für die Streams gestartet..dann der thread für die große datei. bearbeitet er dann diese datei, passiert nichts anderes, die threads für die streams werden in dieser zeit nicht verarbeitet und er somit sind die informationen für diese zeit verloren.
 
Ähnliche Java Themen
  Titel Forum Antworten Datum
rode45e Java Threads Allgemeine Java-Themen 4
M Threads Allgemeine Java-Themen 1
L Threads Threads in Chatroom Allgemeine Java-Themen 30
berserkerdq2 run-methode eines Threads so programmieren, dass 30x die Sekunde etwas ausgeführt wird. Allgemeine Java-Themen 44
berserkerdq2 Threads, wie genau läuft das in Java ab? (Ich kann Threads erstellen und nutzen, nur das Verständnis) Allgemeine Java-Themen 6
CptK Backpropagation parallelisieren: Kommunikation zwischen den Threads Allgemeine Java-Themen 7
J Eine Frage zu den Threads und Task Allgemeine Java-Themen 1
W Wieviele Threads sind sinnvoll? Allgemeine Java-Themen 8
W Alternative für Threads Allgemeine Java-Themen 6
V Threads Probleme beim Aufrufen von Methoden einer anderen Klasse (Threads) Allgemeine Java-Themen 14
T Multithreading: Wie viele Threads sollte ich erstellen? Allgemeine Java-Themen 12
G Threads vom Mainprogramm steuern Allgemeine Java-Themen 8
S BlockingQueue mit dynamischer Anpassung der Anzahl von Producer und Consumer Threads Allgemeine Java-Themen 1
x46 Threads Threads anhalten Allgemeine Java-Themen 1
J Threads verbessern die Performance NICHT ? Allgemeine Java-Themen 8
W Threads Problem Allgemeine Java-Themen 15
T Threads Tic Tac Toe mit Threads Allgemeine Java-Themen 1
M Threads über Kommandozeile Allgemeine Java-Themen 5
mrbig2017 Threads Chat Programm mit Threads? Allgemeine Java-Themen 2
J Threads - java.lang.IllegalThreadStateException Allgemeine Java-Themen 6
J Internet Broswer in Threads öffnen Allgemeine Java-Themen 1
B Threads Multithreading Threads sollen warten Allgemeine Java-Themen 12
N 1000 MQTT Messages die Sekunde - 1000 Threads erstellen ? Allgemeine Java-Themen 10
D Threads Parallel laufende Threads Allgemeine Java-Themen 4
J Unvorhersehbares Verhalten - benutze ich die falsche Bedingungsprüfung oder brauche ich Threads? Allgemeine Java-Themen 12
D Eine Forschleife mit Threads abarbeiten um es zu schneller zu machen. Ist das möglich? Allgemeine Java-Themen 20
S Wie kann ich eine kleine Stelle in meinem Code mit multiplen Threads abarbeiten..? Allgemeine Java-Themen 20
P Threads Parallelisierte DB-Abfragen mit variabler Anzahl an Threads Allgemeine Java-Themen 4
J Threads Threads Allgemeine Java-Themen 9
Viktim Threads Liste In unterschiedlichen Threads bearbeiten Allgemeine Java-Themen 23
E Threads Ausführung in Threads ist langsamer als ohne Threads Allgemeine Java-Themen 13
A Anzahl an Threads Systemweit Allgemeine Java-Themen 2
Tausendsassa Input/Output Problem mit der gleichzeitigen Ausgabe zweier Threads Allgemeine Java-Themen 8
S Alle Methodenaufrufe eines Threads notieren..? Allgemeine Java-Themen 7
M Threads JPanel eingeforen mit Threads Allgemeine Java-Themen 2
F Threads Allgemeine Java-Themen 6
F Threads Allgemeine Java-Themen 2
M Sinn von Threads? Allgemeine Java-Themen 1
J Wie erschaffe ich einen sicheren Datenaustausch zwischen Thread und Nicht-Threads Allgemeine Java-Themen 8
L Abfragen ob Threads fertig Allgemeine Java-Themen 3
P Threads Java Zugreifen Allgemeine Java-Themen 6
K Problem: Java-Klasse mit mehreren Threads als eigenen Prozess starten Allgemeine Java-Themen 3
K KeyEvent in Threads Allgemeine Java-Themen 11
V Threads Weshalb funktionieren meine Threads nicht? Allgemeine Java-Themen 2
Thallius Speicherverhalten von Properties und mehreren Threads Allgemeine Java-Themen 5
L Threads beenden Allgemeine Java-Themen 4
P Threads Threads nicht gleichzeitig starten Allgemeine Java-Themen 3
S Threads Threads werden nicht beendet Allgemeine Java-Themen 2
S Start des zweiten Threads erst nach Beenden des ersten Threads Allgemeine Java-Themen 13
N Threads statische Methoden in Threads Allgemeine Java-Themen 5
P 4 Threads in einer Methode Allgemeine Java-Themen 2
M Eclipse Mehrere Threads, mehrere Konsolen Allgemeine Java-Themen 4
OnDemand Threads und synchronized Allgemeine Java-Themen 9
R LinkedList und Threads: Strukturprobleme bez. löschen von Elementen Allgemeine Java-Themen 3
R LinkedList und Threads - welche Methode ist besser? Allgemeine Java-Themen 2
OnDemand Threads und synvhronized Allgemeine Java-Themen 2
S Problem mit Threads Allgemeine Java-Themen 1
W Threads Threads warten lassen Allgemeine Java-Themen 5
H Optimierung durch Threads Allgemeine Java-Themen 31
B Threads halten sich irgendwie auf... Allgemeine Java-Themen 6
M Threads Allgemeine Java-Themen 8
K JNI: Methoden aus unterschiedlichen Threads aufrufen Allgemeine Java-Themen 3
A Applet Alle Threads beim schließen des Applets beenden Allgemeine Java-Themen 8
A Problem mit der Synchronisierung von Threads Allgemeine Java-Themen 15
R SecurityManager für einzelne Klassen/Threads? Allgemeine Java-Themen 38
O Threads und If Befehle Allgemeine Java-Themen 7
P Threads abwechseln lassen mit wait() und notify() Allgemeine Java-Themen 2
H Sehr viele Threads effizient Verwalten Allgemeine Java-Themen 13
C Threads und Exceptions Allgemeine Java-Themen 7
H java.lang.OutOfMemoryError bei der wiederholten Erzeugng von Threads Allgemeine Java-Themen 8
S Threads Abarbeitungsstatus von Threads in Datei schreiben Allgemeine Java-Themen 2
M Zugriff zweier Threads auf diesselbe Methode Allgemeine Java-Themen 16
E Threads Sudoku Threads Allgemeine Java-Themen 8
M Java Threads - Wait Notify - Verständnisproblem Allgemeine Java-Themen 5
Gossi Threads mit unterschiedlichen Aufgaben in einer Klasse? Allgemeine Java-Themen 9
G Threads Ablauf von Threads im Spezialfall Allgemeine Java-Themen 4
V Threads bei quadcore Allgemeine Java-Themen 10
V 1000 Threads oder Iterativ? Allgemeine Java-Themen 11
4 Simple(?) Frage zu Threads Allgemeine Java-Themen 14
B Threads Game of Life - Threads Allgemeine Java-Themen 49
R Threads Exceptions von Threads abfangen im ThreadPool Allgemeine Java-Themen 5
S Threads Ende sämtlicher Threads abwarten Allgemeine Java-Themen 6
S Frage zu Threads (Sichtbarkeit und Verhalten) Allgemeine Java-Themen 11
M Java-Threads und Datentypen-Zugriffe Allgemeine Java-Themen 7
P Threads- Programming Allgemeine Java-Themen 2
G Threads Klasse Sound und Threads bleiben hängen Allgemeine Java-Themen 4
C Threads Zwei Threads greifen auf LinkedList zu. Allgemeine Java-Themen 12
M OutOfMemoryError in nebenläufigen Threads Allgemeine Java-Themen 6
M Threads dauerhafte bewegung mit threads Allgemeine Java-Themen 11
frankred Threads Auf eine Gruppe von Threads warten Allgemeine Java-Themen 11
J Eure Meinung: Threads verwenden, oder nicht? Allgemeine Java-Themen 6
K Warum wartet diese Funktion auf beenden des Threads? Allgemeine Java-Themen 3
F Mehrere Threads - ein Stack Allgemeine Java-Themen 6
O Wie kann ich das Ende eines Threads melden? Allgemeine Java-Themen 7
J Writer und Threads Allgemeine Java-Themen 2
G mehrere Threads starten/stoppen Allgemeine Java-Themen 4
E Verständnisfrage bezüglich Threads Allgemeine Java-Themen 4
K Threads - Swing - Synchronisation nötig? Allgemeine Java-Themen 8
S [THREADS] Thread sinnvoll beenden Allgemeine Java-Themen 2
P Buffer - Consumer Producer - Threads synchronisieren Allgemeine Java-Themen 15

Ähnliche Java Themen

Neue Themen


Oben