Riesenringpuffer

  • Themenstarter Gelöschtes Mitglied 9001
  • Beginndatum
Diskutiere Riesenringpuffer im Allgemeine Java-Themen Bereich.
G

Gelöschtes Mitglied 9001

Ganz so trivial ist es dann doch nicht, da der Buffer bei fast jedem Lesevorgang geflusht werden müsste:
Java:
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;

public class MyBuffer {
    private static final int RAM_BUFFER_SIZE = 1024 * 1024 * 1024;
    private BufferedOutputStream buffer1 = null;
    private BufferedInputStream buffer2 = null;
    private boolean hasInput = false;

    public MyBuffer() throws IOException {
        buffer1 = new BufferedOutputStream(new FileOutputStream(new File("my_backup_file.txt")), RAM_BUFFER_SIZE);
        buffer2 = new BufferedInputStream(new FileInputStream(new File("my_backup_file.txt")), 1024);
    }

    public synchronized void write(byte b) throws IOException {
        buffer1.write(b);
        hasInput = true;
    }

    public synchronized int read() throws IOException {
        if (hasInput) {
            buffer1.flush();
            hasInput = false;
        }
        return buffer2.read();
    }

    public static void main(String[] args) throws IOException {
    }
}
Solange die Daten insgesamt die RAM_BUFFER_SIZE nicht überschreiten, ist dies eine Lösung. Wenn mehr Daten anfallen, wird der Outputstream jedoch anfangen, die Daten die Platte zu schreiben, und zwar ab dem Zeitpunkt dann alle Daten, unabhängig davon, wie schnell der Client die Daten abholt. Dies ist aber nicht gewünscht.
So lange der Client die Daten schnell genug abholen kann, sollten die Daten im RAM bleiben. Erst wenn der Client die Daten nicht mehr schnell genug abholen kann, aber soviel Eingabedaten anfallen, dass der RAM sie nicht mehr fassen kann, soll ein Teil der Daten auf der Platte landen.
Prinzipiell ist die Datenmenge, die von der Eingabe zur Ausgabe weitergereicht wird, unendlich.
 
J

JustNobody

Es ist doch nicht so, dass das für mich ein großartiges Problem ist. Ich bin nur an Austausch interessiert und es gibt ja mehrere Lösungsansätze. An denen war ich interessiert. Einen Algorithmus schreiben kann ich selbst (siehe oben), aber es ist eben auch interessant zu lesen, wie andere es machen. Es ist hingegen weniger interessant zu lesen, dass jemand in einem Wisch es „sch...“ findet.
Schön, dass Du Dir nur so Teile rauspickst. Sorry, aber Du kommst hier an und fragst nach einem Algorithmus ohne zu schreiben, was Du ggf. schon probiert hast und was nicht. Du schilderst auch nicht, wo genau das Problem ist.
Und bei Hinweisen zu der generellen Problematik kommt in erster Linie abwertende Antworten ...

Und was ist mit der skizzierten Lösung? Oder hast Du die gar nicht versucht zu verstehen? Ich habe da einen möglichen Algorithmus recht gut skizziert und der sollte so recht gut funktionieren.

Ganz so trivial ist es dann doch nicht, da der Buffer bei fast jedem Lesevorgang geflusht werden müsste:
Also dem kann ich nicht wirklich folgen. Kannst Du das noch etwas ausführen? Denn in Deinem Code Beispiel ist es ja prinzipiell egal, ob er ohne flush blockiert, wenn er Daten in den buffered Stream schreibt (weil der interne Buffer voll ist) oder ob er bei dem flush Aufruf blockiert.

Etwas komplexer wird es maximal durch die Themen:
- Es gibt nichts zu schreiben. Dann muss sich der Thread, der die Daten raus gibt, schlafen legen. Und wenn der lesende Thread etwas gelesen hat, muss der schreibende Thread geweckt werden.... Sprich: Die Anfrage nach neuen Daten blockiert ggf. bis Daten vorhanden sind.
- Wenn Du gepufferte Ein-/Ausgabe hast, dann sind das eigenständige Buffer im Stream. Ob da nun Daten enthalten sind oder nicht (also ein flush erfolgreich war) ist für den Ablauf erst einmal egal. Es ist ja nur ein Transfer in eine Richtung. Da spielt es keine Rolle wenn Daten etwas verzögert geschrieben würden ... Es geht ja um den Fall, dass die Daten nicht schnell genug abgenommen werden....
- Was auch noch fehlte ist natürlich die korrekte Behandlung, wenn nichts mehr gelesen wird. Dann muss man halt alle noch vorhandenen Buffer abarbeiten und dann auch schließen. Aber das sollte keine besondere Herausforderung sein. Die einzige Herausforderung, die ich sehe: Man hat Blöcke einer bestimmten Größe, aber der letzte Block kann nur teilweise gefüllt sein. Also die Lösung grob skizziert: Der Lese-Thread setzt bei einem geschlossenen Eingang ein entsprechendes Ende Flag + Wert, wie voll der letzte Buffer ist. Wenn man den letzten Block nicht explizit als gelesen markiert, muss man da nicht einmal groß Prüfungen einbauen. Der Thread für die Ausgabe muss dann also bei nicht vorhandenen Blöcken nur prüfen: Ist er am Ende? Wenn die Übertragung beendet ist, wird der teilweise gefüllte Block noch gesendet und der Thread beendet sich. (Statt sich schlafen zu legen).
 
T

temi

Ich hab auch mal was ausprobiert, noch nicht vollständig funktionsfähig, aber ich habe jetzt auch keine Lust mehr.
Es handelt sich um eine Queue mit fester Länge (capacity = Anzahl der Datenblocks) und einer maximalen internen Datenmenge (maxSize = Größe in Byte). Generell werden die zu puffernden Arrays so in der Queue gespeichert, wie sie kommen. Wenn die Datenblöcke eine relativ konstante Größe haben, sollte das ausreichend sein, ansonsten müsste man vielleicht eine Art Collector vorschalten, der die gewünschte Blockgröße sammelt. Sobald die maximale interne Datenmenge überschritten ist, wird das zu speichernde Array in eine Datei gespeichert. Der Name der Datei entspricht dem Index in der Queue. Vielleicht hilft es ja, ansonsten hat es mir Spaß gemacht ;)
Java:
public final class ArrayQueue implements Queue {

    private final static String PREFIX = "ix_";
    private final static String SUFFIX = ".queue";
    private final byte[][] queue;
    private final int maxSize;
    private int internalSize;
    private int externalSize;
    private final boolean[] isExternal;
    private long writePosition = 0;
    private long readPosition = 0;
    private boolean writeOverflow = false;
    private boolean readOverflow = false;

    public ArrayQueue(final int capacity, final int maxSize) {
        this.queue = new byte[capacity][];
        this.maxSize = maxSize;
        this.isExternal = new boolean[capacity];
    }

    @Override
    public void put(final byte[] data) {

        byte[] arr = new byte[data.length];
        System.arraycopy(data, 0, arr, 0, data.length);
        int position = scalePosition(writePosition);

        if (data.length > freeSize()) {
            writeToFile(arr);
            externalSize += data.length;
            isExternal[position] = true;
        } else {
            queue[scalePosition(writePosition)] = arr;
            internalSize += data.length;
            isExternal[position] = false;
        }

        incWritePosition();
    }

    @Override
    public byte[] poll() {

        byte[] result = new byte[0];
        int position = scalePosition(readPosition);

        if (isExternal[position]) {
            result = readFromFile(position);
            externalSize -= result.length;
            isExternal[position] = false;
        } else {
            result = queue[position];
            internalSize -= result.length;
        }

        incReadPosition();
        return result;
    }

    @Override
    public int internalSize() {
        return internalSize;
    }

    @Override
    public int externalSize() {
        return externalSize;
    }

    @Override
    public int size() {
        return internalSize + externalSize;
    }

    private int freeSize() {
        return maxSize - internalSize;
    }

    private void incWritePosition() {
        writePosition++;
        if (writePosition == Long.MIN_VALUE) {
            writeOverflow = true;
            writePosition = 0;
        }
    }

    private void incReadPosition() {
        readPosition++;
        if (readPosition == Long.MIN_VALUE) {
            readOverflow = true;
            readPosition = 0;
        }
    }

    private int scalePosition(final long position) {
        return (int)(position % queue.length);
    }

    private void writeToFile(final byte[] data) {
        try (FileChannel channel = new RandomAccessFile(scalePosition(writePosition) + SUFFIX, "rw").getChannel();) {
            channel.write(ByteBuffer.wrap(data));
        } catch (IOException e) {
            System.out.println(e.getStackTrace());
        }
    }

    private byte[] readFromFile(final int position) {
        byte[] result = null;
        try (FileChannel channel = new RandomAccessFile(scalePosition(position) + SUFFIX, "r").getChannel();) {
            ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
            result = new byte[(int)channel.size()];
            buffer.get(result);
        } catch (IOException e) {
            System.out.println(e.getStackTrace());
        }

        return result;
    }
}

Was noch fehlt ist auf jeden Fall das Löschen der Dateien nachdem sie gelesen wurden.
 
Zuletzt bearbeitet:
G

Gelöschtes Mitglied 9001

Schön, dass Du Dir nur so Teile rauspickst. Sorry, aber Du kommst hier an und fragst nach einem Algorithmus ohne zu schreiben, was Du ggf. schon probiert hast und was nicht. Du schilderst auch nicht, wo genau das Problem ist.
Und bei Hinweisen zu der generellen Problematik kommt in erster Linie abwertende Antworten ...
Es kamen hier mehrfach Antworten, die beinhalteten, dass ein Raspi nicht geht, dass man keine unkomprimierten Daten braucht, dass man 96kHz nicht braucht. Diese Antworten werte ich nicht ab, aber ich kann ihnen entgegenhalten, dass der Raspi seine Aufgabe erfüllt: er steht hier neben mir und ich teste ihn ausführlich (das habe ich mehrfach geschrieben), und ich habe z.B. ausführlich erklärt, warum 96kHz notwendig sind. Auch habe ich bereits zu Beginn geschrieben, dass ich bereits Ringpuffer implementiert habe und es mir um eine Art zweistufigen Ringpuffer geht, der ggf. Daten auslagert.
In dem Dschungel von Antworten, die sich lieber mit der Hardwarefrage beschäftigen, gibt es ein paar Lösungsvorschläge, die ich mir angeschaut habe. Und Vorschlag Nummero 2 ist in meine Lösung auch eingeflossen. Ich bin mir nicht sicher, was Du von mir hören willst. „Ja, ihr habt alle recht, ein Raspi ist ungeeignet“ vielleicht? Das wäre aber falsch, da ich ihn ja getestet habe und es funktioniert genau so, wie es soll. Übrigens dank der Hilfe von LimDu's Post.
 
J

JustNobody

Ja, ich muss mir eingebildet haben, dass ich einen Algorithmus wie den folgenden skizziert habe:
Du hast zum einen den eigentlichen Rahmen:
a) Ein Thread der Daten von der Quelle liest und merkt.
b) Ein Thread der ein Ziel beliefert und aus dem gemerkten die Daten holt.

Daten kommen immer in Blöcken so wie ich das verstanden habe. Also musst Du x Blöcke merken können. Dazu hast Du dann ein Array mit x Elementen. Jedes Element ist entweder ein Block mit Daten im Speicher (also z.B. ein byte Array) oder eben eine externe Referenz.

Da Speicher knapp ist, machen wir hier eine Totsünde und wir optimieren:
Das ist ein einfaches Array von Integern.
Du willst x Blöcke im Speicher behalten: Dann bedeutet das, dass Du etwas hast wie
- Ein Array von byte Arrays. Jedes Byte Array hat dann genau einen Block. Und 0 bis (x-1) Elemente sind dann im Speicher.
- Du hast nur ein byte Array - der Index ist dann berechenbar. Der x-te Block ist dann (x-1)*size bis x*size - 1.
Alle Blöcke mit x >= interner Buffer sind dann Dateien: In einem Verzeichnis Deiner Wahl legst Du z.B. buffer<x> an. Also sowas wie buffer000158.
Oder Alternativ eine Datei mit Random access Zugriff. Dann hast Du einmal eine Datei reserviert. Aber bei einer SD Karte weiss ich nicht, ob da eine Optimierung drin ist, dass Du nicht ständig auf die gleichen Speicherstellen schreibst. ==> Das könnte die SD Karte schnell killen! Daher: Analyse (Was Du ja gar nicht willst / magst so wie ich Dich verstanden habe bisher) .... also ggf. ständig neue Dateien erstellen und löschen könnte besser sein....

Nun hast Du also:
a) die eigentliche physikalische Speicherung der Daten.
b) eine Referenzierung über ein großes Array (wo die Integer Zahl gespeichert ist, die den Speicherort kennzeichnet - je nach Anzahl kann man hier Speicher sparen und man speichert nur char Werte, die nur 2 Bytes groß sind. Das reicht evtl. aus?).

Also fehlt nur noch die Verwaltung der freien Elemente. Das ist dann der typische Ringspeicher. Du musst Dir also für interne und externe Werte merken: Wo ist das erste und wo das letzte Element.

Beim merken schaust Du also dann: ist im internen Ringspeicher was frei? -> da speichern und gut ist es, ansonsten speichern wir im externen Ringspeicher.

Dann müssen wir das Speichern sinnvoll gestalten:

Der interne Speicher hat ja schon byte Arrays. Um Daten nicht unnötig kopieren zu müssen wird dann einfach die byte-Array Referenz getauscht. So vermeiden wir unnötiges kopieren und auch der GC muss nicht tätig werden. So Du nicht einzelne Byte-Arrays hast, ist es auch kein Problem. Dann muss das Einlesen der Daten schon an der richtigen Stelle erfolgen. Also ist im internen Speicher was frei, wird direkt in den Speicher dort gelesen. Ist intern nichts frei, wird in den Puffer (der einmal am Anfang deklariert wurde) gelesen um dann die Daten in eine Datei zu schreiben.

Also von der Problematik her ist der Algorithmus aus meiner Sicht trivial und lässt sich so herunter schreiben. Problematisch sind nur die Rahmenbedingungen. Da GC Aktivität gut vermieden werden kann und viel mit native Types (und Arrays von diesen) gearbeitet werden kann, ist Java noch nicht einmal so ein großes Problem. Mit Modulen und Co ist der Speicher-Overhead wohl vertretbar. Wobei ich für mich keinen Grund sehe, das in Java zu machen, oder ist das Lesen oder Schreiben der Daten irgend etwas spezielles, für das es nur Java Libraries geben sollte?
Sorry, aber ich bin raus, wenn Du klare Hinweise zu einem möglichen Algorithmus so ignorierst - wenn Du sowas nicht verstanden haben solltest, dann hättest Du nachfragen können...

Der Algorithmus um die Daten soweit möglich im Speicher zu halten und wo nicht möglich im Dateisystem dürfte recht sauber beschrieben sein. Die Problematik bezüglich dem Flushen ist für mich nicht nachvollziehbar (und leider scheint da auch keine Erläuterung mehr zu kommen).

Die Problematik von temi in #103bezüglich der Blockgröße ist richtig, aber das sollte mit einem Collector zur Not auch kein Thema sein: So man eine read Methode verwendet, die eine Referenz zu dem Buffer, Offset und Länge nimmt, lese ich zur not so lange in einer Schleife, bis die Verbindung geschlossen wurde oder der Buffer voll ist.
Damit hätte man dann die Blöcke jeweils komplett gelesen (so diese noch nicht so vorliegen - etwas das ist als Voraussetzung verstanden hatte: "Daten kommen immer in Blöcken so wie ich das verstanden habe." - aber so das nicht der Fall ist, ist das beschriebene Vorgehen eine Möglichkeit, das einfach aufzubauen.)


Die Idee aus 103 von @temi erzeugt recht viele neue Objekte (z.B. bei jedem put), was ich nicht ganz so gut finde. Denn eine interessante Optimierung ist in meinen Augen, den GC so gut wie möglich zu entlasten.

Aber damit bin ich jetzt hier raus und überlasse euch komplett das Spielfeld.
 
G

Gelöschtes Mitglied 9001

Ja, ich muss mir eingebildet haben, dass ich einen Algorithmus wie den folgenden skizziert habe:

Sorry, aber ich bin raus, wenn Du klare Hinweise zu einem möglichen Algorithmus so ignorierst - wenn Du sowas nicht verstanden haben solltest, dann hättest Du nachfragen können...
Es gab hier mehrere Vorschläge und es tut mir leid, wenn ich nicht auf Deinen Vorschlag eingegangen bin. Wenn Du Anmerkungen zu meinem geposteten Vorschlag hast, bin ich ganz Ohr.
 
J

JustNobody

Ja, wobei das im Normalfall egal wäre. Dann läuft halt der GC. Da aber die Ressourcen gut ausgenutzt werden sollten, würde ich darauf komplett verzichten.

Ich habe einfach einmal meine Idee einfach mal in Java herunter geschrieben. Nichts schönes und ich sehe da noch einige unschöne Dinge. Aber paar kleine Refactorings habe ich gemacht (so z.B. aufgeteilt in mehrere Klassen). Habe aber keine großen Tests geschrieben und auch nicht groß getestet.

Also als erstes habe ich nur für den Memory-Bereich eine Klasse MemoryBuffer geschrieben. Diese initialisiert einmalig alle Buffer. Wenn etwas gespeichert wird, dann ist es ein "Buffer-Tausch". Will halt möglichst wenig Objekte erzeugen, die dann eingesammelt werden müssen. Getter/Setter sieht man nicht - die macht bei mir Lombok aber die Annotations habe ich heraus genommen.

Ach ja: im Code habe ich generell ein paar System.err.println drin umdas etwas nachvollziehbarer zu machen und die Fehlerbehandlung ist sehr rudimentär.

Java:
package de.kneitzel;

public class MemoryBuffer {

    /**
     * All buffers.
     */
    private byte buffers[][];

    /**
     * Number of buffers.
     */
    private int bufferNumber;

    /**
     * Size of a single buffer.
     */
    private int bufferSize;

    /**
     * Last buffer read.
     */
    private volatile int lastRead = -1;

    /**
     * Last buffer written.
     */
    private volatile int lastWritten = -1;

    /**
     * Creates a new instance of MemoryBuffer.
     * @param bufferSize Size of each buffer.
     * @param bufferNumber Number of buffers to store.
     */
    public MemoryBuffer(final int bufferSize, final int bufferNumber) {
        this.bufferNumber = bufferNumber;
        this.bufferSize = bufferSize;

        buffers = new byte[bufferNumber][];

        for (int index=0; index < bufferNumber; index++)
            buffers[index] = new byte[bufferSize];
    }

    /**
     * Gets the next Element for a given pointer.
     * @param pointer Pointer to an element.
     * @return Next pointer to an element.
     */
    protected int getNextElement(final int pointer) {
        int result = pointer+1;
        if (result == bufferNumber - 1) result = 0;
        return result;
    }

    /**
     * Checks if there is any space left.
     * @return true if another buffer can be inserted, else false.
     */
    public boolean hasSpace() {
        // If nothing was read so far: check if we wrote the last element.
        if (lastRead == -1) return lastWritten != bufferNumber - 1;

        return getNextElement(lastWritten) != lastRead;
    }

    /**
     * Puts the given buffer into the buffers list.
     * @param buffer Buffer to store.
     * @return Previously stored buffer to be reused.
     */
    public byte[] put(final byte[] buffer) {
        if (!hasSpace()) throw new IllegalStateException("No space available to add buffer!");
        int nextElement = getNextElement(lastWritten);
        byte[] temp = buffers[nextElement];
        buffers[nextElement] = buffer;
        lastWritten = nextElement;
        return temp;
    }

    /**
     * Checks if we have more elements to read.
     * @return true if elements to read are available.
     */
    public boolean hasElements() {
        return lastRead != lastWritten;
    }

    /**
     * Gets the next buffer if available.
     * @return Next buffer.
     */
    public byte[] get() {
        if (!hasElements()) throw new IllegalStateException("No Elements to get available!");

        lastRead = getNextElement(lastRead);
        return buffers[lastRead];
    }
}

Dann noch einmal für Disk. Dabei habe ich mir aber überlegt: Wozu eine feste Buffergröße? Immer, wenn im MemoryBuffer nichts frei ist, landet es im DiskBuffer. Also hat das keine Grenze. Wenn die Platte voll ist oder so, dann war es das aber.
Unschön: Zum löschen erzeuge ich derzeit eine File Instanz...

Java:
package de.kneitzel;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class DiskBuffer {
    /**
     * Package Size.
     */
    private int packageSize;

    /**
     * Disk folder to store files in.
     */
    private String diskFolder;

    /**
     * Last buffer written.
     */
    private int lastWritten;

    /**
     * Last buffer read.
     */
    private int lastRead;

    /**
     * FileBuffer to use.
     */
    private byte[] fileBuffer;

    /**
     * Creates a new instance of de.kneitzel.DiskBuffer.
     * @param packageSize Size of a package.
     */
    public DiskBuffer(final int packageSize, final String diskFolder) {
        this.packageSize = packageSize;
        this.diskFolder = diskFolder;
        fileBuffer = new byte[packageSize];
        lastRead = -1;
        lastWritten = -1;
    }

    /**
     * Reads the next buffer. The array reference returned is always the same - so use the data before calling this method again!
     * @return The byte Array with the read data.
     */
    public byte[] readBuffer() {
        if (lastRead == lastWritten) throw new IllegalStateException("No package to read!");

        String file = diskFolder + "/" + (lastRead + 1) + ".tmp";
        try (FileInputStream fileStream = new FileInputStream(file)){
            if (fileStream.read(fileBuffer) != packageSize) {
                throw new IllegalStateException("File read has wrong size!");
            }
        } catch (IOException ex) {
            throw new IllegalStateException("Unable to read package!", ex);
        }

        // TODO: Creating new object to delete file also adds work to GC!
        new File(file).delete();

        lastRead++;
        return fileBuffer;
    }

    /**
     * Writes the next buffer
     * @param buffer Buffer to write.
     */
    public void writeBuffer(final byte[] buffer) {
        try (FileOutputStream fileStream = new FileOutputStream(diskFolder + "/" + (lastWritten + 1) + ".tmp")) {
            fileStream.write(buffer);
        } catch (IOException ex) {
            // TODO: Better Error Handling.
            System.err.println("Unable to write data to file: " + diskFolder + "/" + (lastWritten + 1) + ".tmp");
            ex.printStackTrace();
            System.exit(-1);
        }
        lastWritten++;
    }
}

Dann baucht man natürlich noch den ThreadedBuffer oben drüber. Der arbeitet mit zwei Threads - einmal zum lesen und einmal zum schreiben.
Entgegen der beschreibung speichert er aber lediglich, ob der Block aus dem Memory oder aus dem Disk Buffer gelesen werden soll.
Java:
package de.kneitzel;

import java.io.*;
import java.nio.BufferOverflowException;

/**
* A simple class which can be used to buffer a stream with a buffer that can be as big as required.
*/
public class ThreadedBuffer {

    /**
     * Package Size.
     */
    private int packageSize = 1024 * 1024; // 1 KB default package size.

    /**
     * Number of packages to store inside Memory.
     */
    private int numberRamBuffers;

    /**
     * Number of packages to store on disk;
     */
    private int numberDiskBuffers;

    /**
     * Stream to read from.
     */
    private InputStream inputStream;

    /**
     * Stream to write to.
     */
    private OutputStream outputStream;

    /**
     * Folder to store all files.
     */
    private String diskFolder = ".";

    /**
     * Thread that is reading from the inputStream.
     */
    private volatile Thread readThread;

    /**
     * Thread that is writing to the outputStream.
     */
    private volatile Thread writeThread;

    /**
     * Buffer that is used to read data to.
     */
    private byte[] readBuffer;

    /**
     * Stores the number of Bytes read from the thread.
     */
    private int readBufferBytesRead;

    /**
     * Array with references to used buffers. Positive number references a memory buffer, negative number references a disk buffer.
     */
    private boolean[] bufferReferenceInMemory;

    /**
     * Next buffer reference to write to.
     */
    int lastWrittenTo;

    /**
     * Next buffer reference to read.
     */
    int lastReadFrom;

    /**
     * Flag that indicates if the read thread was blocked.
     */
    private volatile boolean writeThreadBlocked = false;

    private MemoryBuffer memoryBuffer;
    private DiskBuffer diskBuffer;

    /**
     * Creates a new instance of ThreadedBuffer.
     * @param ramPackages Number of packages to store inside RAM.
     * @param diskPackages Number of packages to store on DISK.
     */
    public ThreadedBuffer(final int ramPackages, final int diskPackages, final InputStream inputStream, final OutputStream outputStream) {
        this.numberRamBuffers = ramPackages;
        this.numberDiskBuffers = diskPackages;
        this.inputStream = inputStream;
        this.outputStream = outputStream;
    }

    /**
     * Starts the read and write threads.
     */
    public void run() {
        initialize();
        readThread = new Thread(this::readTask);
        readThread.start();
        writeThread = new Thread(this::writeTask);
        writeThread.start();
    }

    /**
     * Waits for read and write thread to end.
     * @throws InterruptedException
     */
    public void waitForEnd() throws InterruptedException {
        if (readThread != null) readThread.join();
        if (writeThread != null) writeThread.join();
    }

    /**
     * Initializes the data structures:
     * <p>
     *     <ul>
     *         <li>buffers inside memory</li>
     *         <li>Array with references to used buffers.</li>
     *         <li>Pointers to used elements.</li>
     *     </ul>
     * </p>
     */
    private void initialize() {
        // Buffer to use for reference blocks.
        bufferReferenceInMemory = new boolean[numberDiskBuffers + numberRamBuffers];
        lastWrittenTo = -1;
        lastReadFrom = -1;

        // Buffer to read data into.
        readBuffer = new byte[packageSize];

        // Buffers
        memoryBuffer = new MemoryBuffer(packageSize, numberRamBuffers);
        diskBuffer = new DiskBuffer(packageSize, diskFolder);
    }

    private void readTask() {
        try {
            while (readFromInputTask()) {
                storeBuffer();

                if (writeThreadBlocked)
                    writeThread.interrupt();
            }
        } catch (IOException ex) {
            // TODO: Better handling missing.
            System.err.println("IOException occured: " + ex.getMessage());
            ex.printStackTrace();
        }
        System.err.println("ReadTask done.");
    }

    /**
     * Writes data from buffers to outputStream.
     */
    private void writeTask() {
        boolean running = true;
        while (running) {
            if (lastReadFrom == lastWrittenTo) {
                if (readThread.isAlive()) {
                    System.err.println("Write Thread: Nothing to read, sleeping!");
                    // Wait for new data to read.
                    try {
                        writeThreadBlocked = true;
                        Thread.sleep(1000);
                    } catch (InterruptedException ex) {
                        System.err.println("Write Thread: Sleep interrupted.");
                    }
                    writeThreadBlocked = false;
                } else {
                    try {
                        // Read Thread done so no more data incoming.
                        // We need to write the last block read and we are done.
                        outputStream.write(readBuffer, 0, readBufferBytesRead);
                        System.err.println("Write Thread: last buffer written. Stopping ...");
                        running = false;
                    } catch (IOException ex) {
                        // TODO Better error handling.
                        System.err.println("IOException when writing data ... exiting!");
                        ex.printStackTrace();
                        System.exit(-1);
                    }
                }
            } else {
                int nextBufferToRead = lastReadFrom + 1;
                if (nextBufferToRead == bufferReferenceInMemory.length)
                    nextBufferToRead = 0;

                if (bufferReferenceInMemory[nextBufferToRead]) {
                    System.err.println("Write Task: Getting buffer from Memory.");
                    // RAM Buffer
                    try {
                        outputStream.write(memoryBuffer.get());
                    } catch (Exception ex) {
                        // TODO: better data handling.
                        System.err.println("Unable to write data!");
                        ex.printStackTrace();
                        System.exit(-1);
                    }
                } else {
                    System.err.println("Write Task: Getting buffer from Disk.");
                    // Disk Buffer
                    try {
                        outputStream.write(diskBuffer.readBuffer());
                    } catch (IOException ex) {
                        // TODO: better data handling.
                        System.err.println(ex.getMessage());
                        ex.printStackTrace();
                        System.exit(-1);
                    }
                }
                lastReadFrom = nextBufferToRead;
            }
        }
        System.err.println("WriteTask done.");
    }

    /**
     * Reads a package of the inputStream.
     * @return true if a full packages was read, else false.
     * @throws IOException An IOException was thrown while reading from the input stream.
     */
    private boolean readFromInputTask() throws IOException {
        readBufferBytesRead = 0;
        int bytesLeftToRead = packageSize;
        int lastReadBytes = 0;

        while (bytesLeftToRead > 0 && (lastReadBytes = inputStream.read(readBuffer, readBufferBytesRead, bytesLeftToRead)) > 0) {
            bytesLeftToRead -= lastReadBytes;
            readBufferBytesRead += lastReadBytes;
        }

        System.err.println("Read Bytes from input Stream: " + readBufferBytesRead);
        return lastReadBytes != -1;
    }

    /**
     * Stores the readBuffer.
     * @return
     */
    private void storeBuffer() {
        // First we need to get the next Element to write to.
        int nextElement = lastWrittenTo + 1;
        if (nextElement == bufferReferenceInMemory.length)
            nextElement = 0;

        if (nextElement == lastReadFrom)
            throw new BufferOverflowException();

        if (memoryBuffer.hasSpace()) {
            System.err.println("Read Task: Putting buffer in Memory.");
            readBuffer = memoryBuffer.put(readBuffer);
            bufferReferenceInMemory[nextElement] = true;
        } else {
            System.err.println("Read Task: Putting buffer to disk.");
            diskBuffer.writeBuffer(readBuffer);
            bufferReferenceInMemory[nextElement] = false;
        }

        lastWrittenTo = nextElement;
    }
}

Und "last but not least": ThreadedBufferTest ist einfach eine kleine Testroutine. Nimmt zwei Parameter:
Eingabedatei -> diese wird gelesen
Ausgabedatei -> diese wird geschrieben.
Desweiteren ist ThreadedBufferTest ein OutputStream, der beim Schreiben eines Blocks eine Sekunde wartet. Also sozusagen eine schlechte Verbindung simuliert.

Java:
package de.kneitzel;

import java.io.*;

public class ThreadedBufferTest extends OutputStream {

    private FileOutputStream outStream;

    public static void main(String[] args) throws IOException  {
        if (args.length != 2) {
            System.out.println("Aufruf mit 2 Argumenten - Source Datei und Ziel Datei!)");
            return;
        }

        InputStream inStream = new FileInputStream(args[0]);
        OutputStream outStream = new ThreadedBufferTest(args[1]);

        ThreadedBuffer threadedBuffer = new ThreadedBuffer(3, 10000, inStream, outStream);
        threadedBuffer.run();
    }

    public ThreadedBufferTest (final String filename) throws IOException {
        outStream = new FileOutputStream(filename);
    }

    @Override
    public void write(byte[] buffer) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {}
        outStream.write(buffer);
    }

    @Override
    public void write(byte[] buffer, int offset, int length) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {}
        outStream.write(buffer, offset, length);
    }

    @Override
    public void write(int b) throws IOException {
        outStream.write(b);
    }
}

Das wäre also einfach einmal stur die skizzierte Idee von mir herunter geschrieben. War zuerst alles in einer Klasse, weil ich es wirklich nur simpel runterschreiben wollte, aber so ganz ohne Refactoring war das nichts, was ich bereit wäre zu posten :)

Der Algorithmus selbst ist - wie schon in meinem ersten Post in dem ich ihn beschrieben habe geschrieben - recht trivial und mit den Erläuterungen sollte er einfach zu verstehen sein. Die exakten Anforderungen müsste man durchgehen - aber diesbezüglich bin ich wirklich raus, denn ich sehe da nicht wirklich ein Entgegenkommen vom TE. Der Algorithmus selbst ist geliefert und kann angepasst werden:
- Ggf. benötigte Fehlerbehandlung
- ggf. überdenken der Dateistruktur und dem Löschen. Hier wäre denkbar, Dateien nicht zu löschen bzw. nur bei Bedarf. Denn ich sehe das Problem, dass je nach Filesystem ggf. die SD Karte schnell gekillt wird durch zu viele Schreibvorgänge auf einzelnen Speicherzellen. Aber ich muss gestehen, dass ich da nicht in den technischen Details stecke. Aber es ist die gleiche Problematik wie bei SSDs (und so manch einer hat am Anfang SSDs schnell gekillt weil nicht gleichmäßig alle Speicherzellen beschrieben wurden ...)

Edit: MemoryBuffer hatte zuerst auch ThreadedBuffer enthalten...
Edit2: Die diskPackages in ThreadedBuffer können natürlich auch bei der Implementation weg. Es gibt ja diesbezüglich kein Limit.
 
G

Gelöschtes Mitglied 9001

- ggf. überdenken der Dateistruktur und dem Löschen. Hier wäre denkbar, Dateien nicht zu löschen bzw. nur bei Bedarf. Denn ich sehe das Problem, dass je nach Filesystem ggf. die SD Karte schnell gekillt wird durch zu viele Schreibvorgänge auf einzelnen Speicherzellen. Aber ich muss gestehen, dass ich da nicht in den technischen Details stecke.
Das Löschen einer Datei bedeutet auf Dateisystemebene lediglich, einen Bereich als frei zu markieren (nicht, ihn zu überschreiben), d.h., der Schreibzugriff, der durch das Löschen entsteht, dürfte verhältnismäßig vernachlässigbar sein. Aber ich gebe dir recht, dass man das Löschen auch nur im Bedarfsfall machen könnte, das würde Zeit sparen.
 
mihe7

mihe7

Das Löschen einer Datei bedeutet auf Dateisystemebene lediglich, einen Bereich als frei zu markieren (nicht, ihn zu überschreiben),
LOL, ich glaube nicht, dass Du das @JustNobody erklären musst :)

Problem an der Geschichte ist etwas völlig anderes, wenn es um Flashspeicher geht (ich habe mir das mal vor Jahren angesehen). Soweit ich das noch in Erinnerung habe, müssen Blöcke gelöscht werden, bevor sie mit bestimmten Daten beschrieben werden können (je nach Speicher kann ein gesetztes/gelöschtes Bit nicht einfach gelöscht/gesetzt werden). Diese Löschzyklen sind begrenzt. D. h. es spielt ggf. gar keine Rolle, ob Du nur ein Dateiattribut setzt, wenn ein relativ großer Block gelöscht werden muss. Linderung verschafft Wear-Leveling.

Allerdings weiß ich nicht, wie heute der Stand der Dinge bei den Flashdingern ist.
 
I

insert2020

Nächste Runde, hier eine Version, die zu 100% funktioniert, die aber den Nachteil hat, dass die Datei auf der Festplatte immer größer wird:
Java:
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;

public class MyBuffer {
	private static final int RAM_BUFFER_SIZE = 8; // choose any size
	private ByteBuffer buffer1 = null;
	private FileOutputStream out1 = null;
	private BufferedInputStream in1 = null;
	private long out1written = 0;
	private int buffer1pos = 0;

	public MyBuffer() throws IOException {
		buffer1 = ByteBuffer.allocate(RAM_BUFFER_SIZE);
		out1 = new FileOutputStream(new File("my_backup_file.txt"));
		in1 = new BufferedInputStream(new FileInputStream(new File("my_backup_file.txt")), RAM_BUFFER_SIZE);
	}

	public synchronized void write(byte b) throws IOException {
		if (!buffer1.hasRemaining()) {
			if (buffer1pos > 0) {
				buffer1.position(buffer1.position() - buffer1pos);
				for (int i = 0; buffer1pos + i < buffer1.limit(); i++) {
					buffer1.put(i, buffer1.get(buffer1pos + i));
				}
				buffer1pos = 0;
			} else {
				out1.write(buffer1.array());
				out1.flush();
				out1written += buffer1.limit();
				buffer1.clear();
				buffer1pos = 0;
			}
		}
		buffer1.put(b);
	}

	public synchronized int read() throws IOException {
		if (out1written > 0) {
			out1written--;
			return in1.read();
		}
		if (buffer1pos == buffer1.position()) {
			return -1;
		}
		return buffer1.get(buffer1pos++);
	}

	public synchronized byte[] bulkRead() throws IOException {
		ByteArrayOutputStream list = new ByteArrayOutputStream();
		int b;
		while ((b = read()) != -1) {
			list.write(b);
		}
		return list.toByteArray();
	}

	public static void main(String[] args) throws IOException {
		// TEST
		Random r = new Random(0);
		MyBuffer buffer = new MyBuffer();
		buffer.write((byte) 3);
		buffer.write((byte) 2);
		buffer.write((byte) 1);
		System.out.println(Arrays.toString(buffer.bulkRead()));
		for (int i = 0; i < 100; i++) {
			int s = r.nextInt(RAM_BUFFER_SIZE);
			ArrayList<Byte> l1 = new ArrayList<>();
			ArrayList<Byte> l2 = new ArrayList<>();
			for (int j = 0; j < s * 2; j++) {
				System.out.println("w: " + (byte) j);
				buffer.write((byte) j);
				l1.add((byte) j);
			}
			for (int j = 0; j < s; j++) {
				byte b = (byte) buffer.read();
				System.out.println("r: " + b);
				l2.add(b);
			}
			for (int j = 0; j < s; j++) {
				System.out.println("w: " + (byte) j);
				buffer.write((byte) j);
				l1.add((byte) j);
			}
			for (int j = 0; j < s; j++) {
				byte b = (byte) buffer.read();
				System.out.println("r: " + b);
				l2.add(b);
			}
			for (int j = 0; j < s; j++) {
				byte b = (byte) buffer.read();
				System.out.println("r: " + b);
				l2.add(b);
			}
			System.out.println("r: " + buffer.read());
			System.out.println(l1.equals(l2));
			System.out.println();
			if (!l1.equals(l2)) {
				return;
			}
		}
	}
}
Ich wäre dankbar wenn nicht danach gefragt wird, wie es funktioniert. :D
 
I

insert2020

wie funktioniert das?
Es gibt einen ByteBuffer == Speicher im RAM, einen FileOutputStream == Speicher auf der Festplatte und einen BufferedInputStream == sowohl Speicher im RAM wie auch Speicher auf der Festplatte...
Nur wenn der Speicher im RAM voll ist, wird auf die Festplatte geschrieben und gelesen.
Der Rest ist Index schubsen...
 
J

JustNobody

Das Löschen einer Datei bedeutet auf Dateisystemebene lediglich, einen Bereich als frei zu markieren (nicht, ihn zu überschreiben), d.h., der Schreibzugriff, der durch das Löschen entsteht, dürfte verhältnismäßig vernachlässigbar sein. Aber ich gebe dir recht, dass man das Löschen auch nur im Bedarfsfall machen könnte, das würde Zeit sparen.
Es geht hier nicht um irgendwelches Zeit sparen. Es geht um die Lebenszeit der SD-Karte. Speicherzellen lassen sich nur begrenzt oft beschreiben, weshalb man ggf. verhindern möchte, dass der gleiche Speicherbereich wiederholt geschrieben wird.
Aber auch bei einer SD Karte kommen ähnliche mapping Mechanismen wie bei einer SSD zum tragen wenn ich das richtig verstanden habe. Aber ich bin kein Hardware Spezialist, daher kann ich dazu nicht viel sagen. Ich habe aber halt die Befürchtung, dass bei SD Karten das weniger optimiert wurde als bei SSDs.
 
I

insert2020

Speicherzellen lassen sich nur begrenzt oft beschreiben, weshalb man ggf. verhindern möchte, dass der gleiche Speicherbereich wiederholt geschrieben wird.
Das stimmt. Es wäre ggf. sogar sinnvoll, das File während der Programmlaufzeit zu "akkumulieren", anstatt immer wieder die gleiche/selbe Stelle zu beschreiben. Insofern wäre mein "MyBuffer"-Vorschlag gar nicht so abwegig.

Aber mir fehlt da leider auch das Hintergrundwissen über den Anwendungsfall und über Speicherkarten...
 
G

Gelöschtes Mitglied 9001

Es geht hier nicht um irgendwelches Zeit sparen.
Das kommt drauf an. Ein Audiointerface liefert die Daten unerbittlich und wartet nicht, bis jemand sie abgeholt hat. Wenn der Abholthread nun plötzlich einen Datenblock auf die Karte schreiben muss oder einen Block löschen, braucht er etwas mehr Zeit. Er muss das in der Zeit schaffen, wie der Buffer des Audiointerface lang ist. Diese Zeit (in Sekunden) errechnet sich aus Buffergröße in Bytes / (Kanäle x Bits/8 x Samplerate). Ist das Schreiben in dem Zeitraum nicht geschafft, hat das Audiointerface die Daten in seinem Buffer schon mit neuen Daten überschrieben und es kommt zu Aussetzern.
Man kann diese Gefahr abmildern, in dem man einen Ringpuffer vorlagert, der tatsächlich nur im RAM liegt.
 
Zuletzt bearbeitet von einem Moderator:
J

JustNobody

Ähm: Du hattest Du einen Punkt bei mir aufgegriffen. Und da ging es um den DiskBuffer, den ich Dir sogar geschrieben habe. Und da ging es mir wie schon einmal geschrieben explizit nicht um Performance. (Was auch relativ wenig Sinn machen würde, denn wir haben auf dem hohen Level, auf dem wir in der Java VM nun einmal agieren, kaum Optionen. In einem Low Level Bereich könnte man da einiges mehr machen. Da könnte man sich dann mit dem Verhalten von SD Karten auseinander setzen und ggf. auf Low Level Bereichen was optimieren. Da gäbe es dann so MMC_ERASE Befehle und so ...

Das was Du jetzt aber schreibst umfasst ja wieder die ganze Lösung. Und das ist auch etwas, das Du doch auch nicht bereden willst und alles, was wir da vorab gebracht hatten, wurde von Dir abgelehnt. Das verwirrt mich jetzt etwas ...

Aber ich wollte mich hier ja verabschieden aus dem Thread, daher ist es nicht wild. Du hast nach möglichen Algorithmen gefragt und ich habe einen Algorithmus beschrieben, der aus meiner Sicht einfach umsetzbar sein sollte. Und Ich habe sogar einmal eine schnelle Implementation nachgereicht. Damit ist die Sache für mich an der Stelle gegessen - so keine Nachfragen zu meinen Beiträgen kommen.
 
Thema: 

Riesenringpuffer

Passende Stellenanzeigen aus deiner Region:
Anzeige

Neue Themen

Anzeige

Anzeige
Oben