Mehrere Streams durch einen Stream senden

Status
Nicht offen für weitere Antworten.

VdA

Bekanntes Mitglied
Hi!
Mein Problem:
Ich habe einen Server und einen Client.
Zwischen ihnen sollen verschiedene Daten versendet werden und zwar:
1. gleichzeitig
2. an verschiedene Threads

erster Lösungsansatz:
Für jeden Thread eine neue Verbindung aufbauen -> schwachsinn bei der Kommunikation übers Internet, denn was ist wenn sich zwei clients, die hinter dem selben Router hängen, gleichzeitig sich mit dem Server verbinden? - Totales Stromwirrwarrr

zweiter Lösungsansatz:
Alles durch eine Verbindung(Object(In/Out)putStream) mit Datenpacket-Objekten senden.
Die Datenpakete enthalten einen Namen, an dem der andere Feststellen kann wofür die Daten sind, und eine byte Array mit den Daten.

Das ganze wird dann mit einer Klasse "TrichterOutput" zusammen versendet:
Code:
public class TrichterOutputStream extends OutputStream 
{
	final ObjectOutputStream oout;
	public TrichterOutputStream(ObjectOutputStream out) 
	{
		this.oout=out;
	}

	@Override
	@Deprecated
	/**
	 * Does nothing!
	 * Please use getOutputStream(String name) instead.
	 * */
	public void write(int arg0) throws IOException 
	{

	}
	public OutputStream getOutputStream(final String name) throws IOException
	{
		//creating Pipes and connect them 
		PipedOutputStream out=new PipedOutputStream();
		final PipedInputStream in=new PipedInputStream(out);
		
		Thread lesen=new Thread(new Runnable()
		{
			public void run()
			{
				while(true)
				{
					try {
						byte[] buffer=new byte[1024];
						in.read(buffer);
						oout.writeObject(new DataPacket(name, buffer));
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		lesen.setName("TrichterOutputStream$lesen");
		lesen.start();
		return out;
	}
}


Und auf der anderen Seite wieder getrennt:
Code:
public class TrichterInputStream extends InputStream 
{
	final ObjectInputStream in;
	HashMap<String, PipedOutputStream> outputstreams=new HashMap<String, PipedOutputStream>();
	HashMap<String, PipedInputStream> inputstreams=new HashMap<String, PipedInputStream>();
	public TrichterInputStream(ObjectInputStream in1)
	{
		this.in=in1;
		Thread lesen=new Thread(new Runnable()
		{
			public void run()
			{
				while(true)
				{
					try {
						DataPacket datapacket = (DataPacket) in.readObject();
						if(outputstreams.get(datapacket.getName()) != null)
						{
							outputstreams.get(datapacket.getName()).write(datapacket.getData());
						}
						else
						{
							//creating Pipes and connect them 
							PipedOutputStream out=new PipedOutputStream();
							PipedInputStream in=new PipedInputStream(out);
							
							//adding them to the list
							outputstreams.put(datapacket.getName(), out);
							inputstreams.put(datapacket.getName(), in);
						}
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} catch (ClassNotFoundException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		});
		lesen.setName("TrichterInputStream$lesen");
		lesen.start();
	}
	
	@Override
	@Deprecated
	/**
	 * Will always return -1.
	 * Please use getInputStream(String name).
	 * */
	public int read() throws IOException 
	{	
		return -1;
	}
	
	public InputStream getInputStream(String name) throws IOException
	{
		if(outputstreams.get(name) == null)
		{
			//creating Pipes and connect them 
			PipedOutputStream out=new PipedOutputStream();
			PipedInputStream in=new PipedInputStream(out);
			
			//adding them to the list
			outputstreams.put(name, out);
			inputstreams.put(name, in);
		}
		return inputstreams.get(name);
	}
}

Wie ihr sehen könnt ist das sozusagen ein Stream den man mit mehreren Streams gleichzeitig verketten kann.

Mein Problem besteht nun, dass der Thread auf der lesenden Seite an folgender Stelle nicht weiter läuft:
Code:
TrichterInputStream tin;
//....
//....
//....
ObjectInputStream oin=new ObjectInputStream(tin.getInputStream(type));
Ein Blick in den Stack sagt mir das der ObjectInputStream auf den Header des Streams wartet und der einfach nicht ankommt.

Was kann ich machen damit der Thread da weiterläuft?
Ich mein tin.getInputStream(type) liefert ja ein PipedInputStream da is doch schon ein Header drin oder?

THX for help :D
 
T

tuxedo

Gast
Moin Moin,

wieso macht du's so kompliziert?

Was hälst du von einer "Sende-Queue"?

Deine einzelnen Programmthreads welche senden wollen rufen in einem zentralen SendeThread eine Methode auf und übergeben das zu sendende Objekt. In diesem Sendethread landet erstmal alles in einer Queue. Un über eine while() Schleife in der run() Methode sendest du alles was im Puffer/Queue vorhanden ist.

Das geht natürlich nur, solange du auf Streams in deinen einzelnen Programmthreads verzichten kannst.

Ach ja: Der "Rückwärtsweg" fehlt ja noch. Okay. Da hab ich auch was:

Du brauchst noch einen Empfangsthread. Deine einzelnen Programmthreads welche auch Daten empfangen können sollen, registrieren sich beim Empfangsthread mit Ihrer ID oder ihrem Namen. Der Empfangsthread holt zyklisch Objekte aus dem ObjectInputstream und liest aus den Objekten den Empfänger aus. Dann wird geschaut ob sich dafür ein Programmthread registriert hat. Wenn ja, wird das Objekt an den entsprechenden Programmthread übergeben.

Das hat sogar den Vorteil dass du das Objekt an mehrere, und nicht nur einen Programmthread schicken kannst.

- Alex

P.S. Was für Daten schickst du denn? Wenns nicht gerade Binärdaten sind (Files): Wäre RMI oder SIMON (siehe links in meiner Signatur) nicht ne "einfachere" Lösung??
 

VdA

Bekanntes Mitglied
Mein Problem, und auch der Grund warum ich es so kompliziert mache, ist dass ich über diese Leitung Binärdaten (Bilder, Dateien) und Textnachrichten (mit Reader und Writer) versenden will.
Dabei kann es vorkommen dass zwei Threads Bilder empfangen wollen aber halt nicht das gleiche sondern unterschiedlicheBilder.
Von daher kann ich nich einfach nach Datentyp sortieren.
Die Idee mit der EventQueue halt ich für ganz sinnvoll ich werd mir mal überlegen ob ich die weiter verfolge wenn ich das so wie es jetzt ist nicht verwirklicht kriege.
Ich fand einfach die Idee praktisch, dass sich im Prinzip nichts an dem eigentlichen Programm ändert, wenn ich anstatt von mehreren Verbindungen einfach einen TrichterStream zwischenschalte.
Mal sehn aber trotzdem wär ich dankbar wenn ihr mir nochmal ein Tip gebt wie ich das obere zum laufen kriege da es für mich weniger Arbeit ist da noch ein paar Zeilen code einzufügen, damit es läuft als das ganze jetzt auf eine EventQueue mit listeners umzubauen.
 
T

tuxedo

Gast
Okay, RMI und SIMON fallen nun raus.

Dein obiger Ansatz:

Naja. Ich habs nicht ausprobiert. Aber ich tippe da mal auf noch mehr "Fehlerquellen". Was mit zum Beispiel auffällt:

Das Schreiben in den OOS beim TOS ist nicht "synchronisiert". Weiß nicht ob der OOS da selbst synchronisiert. Glaub's aber fast nicht.

Du musst das TIS und TOS Prinzip ja nicht gleich ganz verwerfen. Du kannst mittels den Pipes auch das Queue Prinzip damit umsetzen:

Du lässt das nach wie vor so, dass man vom TOS und TIS sich eien Stream holen kann. Nur schreibt der Stream nicht gleich auf den OOS, bzw ließt davon, sondern die Daten wandern erstmal in eine Queue wo "EIN" zentraler Thread empfängt, bzw. ließt. Die Daten werden beim TIS im Empfangsthread einfach wieder gelesen, ausgewertet und dann in die entsprechende Pipe geschickt.

Die einzigste "Schwierigkeit" liegt darin, beim TOS die einzelnen Pipe-Streams zu "überwachen" und die Daten in die Queue zu schieben.

- Alex
 

VdA

Bekanntes Mitglied
danke das könnts sein.
Das hab ich ganz übersehen, dass ich ja aus mehreren Threads in den Stream schreibe :autsch:
ich guck mir das jetzt mal an.
 
T

tuxedo

Gast
Hab mir gerade mal die "Mühe" gemacht deinen Code zu testen.

Also bei mir geht's prima via Localhost.

Serveranwendung sieht so aus:

Code:
erverSocket s = new ServerSocket(3001);
		Socket accept = s.accept();
		
		TrichterInputStream tis = new TrichterInputStream(new ObjectInputStream(accept.getInputStream()));
		
		InputStream inputStream = tis.getInputStream("test");
		
		while (true){
			byte[] b = new byte[3];
			inputStream.read(b);
			System.out.println(new String(b));
		}


Client so:

Code:
Socket s = new Socket("localhost",3001);
		
		TrichterOutputStream tos = new TrichterOutputStream(new ObjectOutputStream(s.getOutputStream()));
		
		OutputStream outputStream = tos.getOutputStream("test");
		
		while(true){
			byte[] b = new String("abc").getBytes();
			outputStream.write(b);
		}

Die Data-Klasse hab ich ganz "primitiv" nachgebaut.
 

SebiB90

Top Contributor
@alex
das problem war, das wenn man getInputStream() einem ObjectInputStream übergibt, es nicht funktioniert und nicht wenn der Trichter ein ObjektInputStream ist.
 

VdA

Bekanntes Mitglied
ich hab mir das mal angesehen und festegestellt, dass das Problem nur auftritt wenn ich ans Ende einen ObjectIn/OutputStream ranhänge. :(
Bei normalen (Buffered)In/OutputStreams funktioniert alles.
Warum nur ???:L
 
T

tuxedo

Gast
Das liegt wohlmöglich an den Stream-Typen.

Wenn du "außen" einen Object*Stream verwendest, wird davon ausgegangen, dass an der gegenüberliegenden Seite des Streams auch ein solcher Object*Stream hängt.

In deiner Konstellation geht das aber nicht, da das ganze durch Pipes und das Transport-Objekt unterbrochen ist.

Und warum geht das nicht?

Ganz einfach: Object*Streams kommunizieren miteinander und tauschen direkt Daten aus. Du _musst_ auf beiden Seiten einen korrespondierenden Object*Stream haben. Sonst geht's nicht.

Noch was:
Object*Streams machen ein "übles" caching. Wenn du nicht ab und zu mal reset() am Stream aufrufst läuft die irgendwann der Speicher inder JVM voll.

Würde das ganz ohne Object*Streams machen. Du kannst dir dann auch das mit dem Transportobjekt sparen. Du definierst einfach, dass die ersten x Bytes die durch den Trichter im inneren gehen, die ID des Streams ist. Kannst ja 4 Bytes nehmen und damit ein Integer auf Byte-Ebene abbilden.

Dann hast du das "langsame" serialisieren weg. Und wenn du Files schickst, solltest du diese Ebenfalls nicht mit Object*Streams serialisieren. Denn da wird viel Reflection-Aufwand betrieben um die Objekte in Bytes zu wandeln und umgekehrt.

Schicke deine Bilder lieber mit "nackigen" Input/OutputStreams. Das geht schneller und erspart dir dann auch den lästigen Cache der Object*Streams.

- Alex
 

VdA

Bekanntes Mitglied
also wenn ich mir das ganze Prog so angucke könnte man das auch ohne ObjectStreams machen...... :###
nur wär das Schade, wenn man über meine TrichterStreams keine Objecte versenden könnte, da ich die bestimmt noch öfter in anderen Programmen gebrauchen könnte.

Aber der Tip mit den Bildern ist gut.Ich hab mich schon gefragt, warum das immer so lange dauert bis so ein Bild rüber ist :)
 
T

tuxedo

Gast
Du kannst dennoch Objekte schicken. Allerdings "solltest" du den Umweg dann über einen ByteBuffer gehen. Du kannst da Objekte mit dem OOS reinschreiben und dann als byte[] wieder rausholen, welche du dann über deinen Trichter senden und auf der anderen Seite wieder genau so zurückbauen kannst.

"Seltsamerweise" geht das ohne direkte Verbindung zwischen OOS und OIS ...

Aber wie gesagt: mit OOS serialisieren ist "aufwendig" und wird mit steigender Objekttiefe (viel Vererbung, viele Interfaces, ...) immer schlimmer.

- Alex
 

VdA

Bekanntes Mitglied
mach ich das nicht schon so in TrichterInputStream:
Code:
byte[] buffer=new byte[1024];
in.read(buffer);
pakete.add(new DataPacket(name, buffer));

und TrichterOutputStream?:
Code:
outputstreams.get(datapacket.getName()).write(datapacket.getData());
ich mein der Stream wird in ein byte[] geschrieben und das wird später wieder in einen Stream geschrieben ???:L

Sollte dann der header nicht auch mit reinkopiert werden?
 
T

tuxedo

Gast
Ich weiß es nicht. Ich weiß nur dass das serialisieren in einen ByteBufferStream funktioniert. Vermutlich wird da irgendwas erhalten (stream-informationen?) was bei normalen byte[]'s wegfällt.

Probier doch mal die Sache mit dem ByteBuffer ...


- Alex
 

VdA

Bekanntes Mitglied
ich weiß jetzt woran es lag.
Code:
DataPacket datapacket = (DataPacket) in.readObject();
						if(outputstreams.get(datapacket.getName()) != null)
						{
							if(datapacket.getData() instanceof byte[])
								outputstreams.get(datapacket.getName()).write((byte[])datapacket.getData());
							else
								objectlisteners.get(datapacket.getName()).objectRecieved(datapacket.getData());
						}
						else
						{
							//creating Pipes and connect them 
							PipedOutputStream out=new PipedOutputStream();
							PipedInputStream in=new PipedInputStream(out);
							
							//adding them to the list
							outputstreams.put(datapacket.getName(), out);
							inputstreams.put(datapacket.getName(), in);
							
//Die zeile hier hatte ich vergessen in TrichterInputStream
							outputstreams.get(datapacket.getName()).write((byte[])datapacket.getData());
						}

*in die Ecke leg und schäm* :oops:

EDIT: *wieder aus der Ecke rauskomm, weils immer noch nicht geht* :(
 
Status
Nicht offen für weitere Antworten.
Ähnliche Java Themen
  Titel Forum Antworten Datum
L mehrere Streams über einen Socket? Netzwerkprogrammierung 8
izoards Mehrere TCP Verbindungen auf einen Server [alles Local] Netzwerkprogrammierung 2
Aruetiise Socket Mehrere Clients Netzwerkprogrammierung 4
E Mehrere Sockets Netzwerkprogrammierung 2
C Mehrere Spielewelten im Multiplayer Netzwerkprogrammierung 2
J Framework mehrere Clients/ Server-Broadcast/oracle XE/ XML Netzwerkprogrammierung 1
V Server / mehrere Clients / MySQL / Konzept Netzwerkprogrammierung 2
S Server - Mehrere Klassen sollen Daten senden und empfangen Netzwerkprogrammierung 25
Creylon Socket Mehrere gleichzeitig eingehende Nachrichten Netzwerkprogrammierung 8
N SOCKET mehrere Requests, keep Alive serverspezifisch? Netzwerkprogrammierung 3
7 Mehrere Verbindungen gleichzeitig in einem Thread mit ApacheHTTP Netzwerkprogrammierung 7
Z Socket [Chatprogramm] Mehrere Clients an einen Server Netzwerkprogrammierung 10
V Authentifikation über mehrere Server? Netzwerkprogrammierung 3
G Socket Mehrere Clientanmeldungen am Server Netzwerkprogrammierung 13
D Mehrere Clients über Java-Sockets Netzwerkprogrammierung 13
cedi Socket Mehrere Clients an einem Server Netzwerkprogrammierung 4
B SSH mit Jsch, mehrere Befehle senden Netzwerkprogrammierung 4
F Socket Verbindungen über mehrere Server Netzwerkprogrammierung 4
M Jetty Konfiguration mehrere Handler? Netzwerkprogrammierung 2
F UDP Server - mehrere Pakete auf einmal Netzwerkprogrammierung 12
A Mehrere gleich Packete behandeln Netzwerkprogrammierung 4
L Socket Chat Server für mehrere Clients Netzwerkprogrammierung 7
R mehrere MySQL-Zugriffe Netzwerkprogrammierung 3
B Paralleler Dateitransfer: Ein Client - Mehrere Sockets? Wie connecten? Netzwerkprogrammierung 16
S Mehrere Attachments mit JavaMail API auslesen Netzwerkprogrammierung 3
O Mehrere Datei per DataInput/OutputStream über Socket Netzwerkprogrammierung 12
A Datenverteilung: Mehrere Threads verwenden? Netzwerkprogrammierung 4
T Netzwerkchat Problem: Mehrere Nachrichten ~ Anfängerproblem Netzwerkprogrammierung 3
W Bestimmt IP Adresse verwenden wenn man mehrere hat Netzwerkprogrammierung 5
D Clients sollen mehrere Sessions starten Netzwerkprogrammierung 11
A Jakarta Commons HTTPClient: Mehrere Requests gleichzeitig Netzwerkprogrammierung 2
R Mehrere Dateien über einen Socket senden Netzwerkprogrammierung 2
G Nachricht an mehrere Clients schicken Netzwerkprogrammierung 10
E Mehrere / bestimmte Netzwerkkarten ansteuern Netzwerkprogrammierung 10
F Mehrere Attachments mit JavaMail API Netzwerkprogrammierung 2
G Proxy und mehrere Verbindungen Netzwerkprogrammierung 4
G Mehrere SSL Verbindungen Netzwerkprogrammierung 2
A Mit Client auf mehrere Server zugreifen Netzwerkprogrammierung 5
M Mehrere Ports gleichzeitig abhören Netzwerkprogrammierung 5
TRunKX Ein Port mehrere Verbindungen? Netzwerkprogrammierung 7
G Server an mehrere Clients Netzwerkprogrammierung 15
L JavaMail: Automatisches Email Versand (mehrere Empfänger) Netzwerkprogrammierung 4
U Einen HandlerThread, der mehrere Verbindungen verwaltet? Netzwerkprogrammierung 2
X mehrere Request über eine HttpURLConnection Netzwerkprogrammierung 2
M Verständnisfrage zu den Streams Netzwerkprogrammierung 7
J Threads & Streams Netzwerkprogrammierung 9
N Paket-Analysieren Byte-Streams Netzwerkprogrammierung 12
C Socket Cipher Streams Netzwerkprogrammierung 6
E Verfügbarkeit von Daten in Streams Netzwerkprogrammierung 4
V HTTP Streams setzen Netzwerkprogrammierung 10
N Socket Fehler bei Streams Netzwerkprogrammierung 2
1 SSH-Kommunikation - Ende eines Streams nicht erkenntlich Netzwerkprogrammierung 2
D Socket Streams schliessen .. Exception gewollt? Netzwerkprogrammierung 4
B Server mit meheren Streams/Aufgaben? Netzwerkprogrammierung 9
H RMI RPC "not suitable for streams and.." Netzwerkprogrammierung 2
T HTTP Encoding von Http-Streams Netzwerkprogrammierung 2
M Streams verwenden Netzwerkprogrammierung 3
A Streams per RMI übergeben Netzwerkprogrammierung 6
P problem beim schließen eines Streams Netzwerkprogrammierung 6
K Selbe Streams mehrfach nutzen (zusätl. Thread) Netzwerkprogrammierung 6
J while-Schleife / Abbruchbed. beim Einlesen eines Streams Netzwerkprogrammierung 4
J Länge eines Streams Netzwerkprogrammierung 4
M Streams Bündeln Netzwerkprogrammierung 10
P Probleme mit Input- / Output-Streams Netzwerkprogrammierung 2
M Ende des Streams ohne Schließen/Checksumme mitsenden Netzwerkprogrammierung 2
M Probleme beim Abfangen von Streams Netzwerkprogrammierung 5
8 Socket Streams nur mit Byte? Netzwerkprogrammierung 2
E frage zu streams Netzwerkprogrammierung 2
F ResultSet in Streams Netzwerkprogrammierung 8
C IRC CHAT auslesen -> Sockets/input und output Streams Netzwerkprogrammierung 9
W Wifi verbinden durch SharensPreferens Android Studio Netzwerkprogrammierung 0
JavaWolf165 Mini-Datenbank durch Link Netzwerkprogrammierung 6
gamebreiti Socket Server / Client Anwendung Manipulation von Objekten durch Server Netzwerkprogrammierung 9
E Eingabe (auf Internetseite) durch ein Programm machen lassen Netzwerkprogrammierung 19
S Bilder durch's Netzwerk usw ... Netzwerkprogrammierung 10
C I/O - Synchronisation durch Threads in einem ChatClient Netzwerkprogrammierung 4
Dit_ UDP Verbindung durch Proxy Netzwerkprogrammierung 4
Ollek Download einer Datei durch SFTP mit Java Netzwerkprogrammierung 12
B Server-Programm wird durch "read" beendet Netzwerkprogrammierung 8
Kr0e Ggf. Performanceproblem durch Senden von vielen kleinen Buffern Netzwerkprogrammierung 17
S RMI RMI Aufrufe kommen nicht mehr durch Netzwerkprogrammierung 4
H String Array durch einen Stream schicken. Netzwerkprogrammierung 4

Ähnliche Java Themen

Neue Themen


Oben