Producer/Consumer oder reicht synchronizedList

Fohnbit

Top Contributor
Hallo!

Ich bekomme über eine Methode Werte (OSGi Plugin) in meinen Code, welchen ich in eine InfluxDB übers Web schreibe.
Das schreiben soll alle 30s erfolgen.

Nun habe ich das Problem, wenn ein Internetausfall ist, die Queue an die 26.000 Einträge hat und das Programm dann blockt. Ich vermute das das Hochladen zu lange dauert und die ankommenden neuen Daten einen Fehler verursachen.

Vermutlich ist mein ganzer Ablauf nicht korrekt und Threadsicher (oder wie man das beschreibt)

Erstellen der synchronizedList und die Methode, die die List füllt.
Java:
private List<Point> list = Collections.synchronizedList(new ArrayList<Point>());
...
    public void onData(double value, String measurement, String tags, String field, long timestamp) {
        if (getOutputState() == ENUM_STATE_RUNNING) {
            synchronized (list) {

                Map<String, String> tag = new HashMap<>();
                for (String tagElement : tags.split(",")) {
                    tag.put(tagElement.split(":")[0], tagElement.split(":")[1]);
                }

                Instant time = Instant.ofEpochSecond(timestamp);
                Point point = Point.measurement(measurement).addTags(tag).addField(field, value).time(time,
                        WritePrecision.S);
                list.add(point);
                updateOutputQueuesize(list.size());
            }
        }

Der Aufruf zum hochladen in einem eigenen Thread:
Code:
    public void upload() {
        synchronized (list) {

            logger.debug("Queue size: " + list.size());
            if (list.size() != 0) {
                logger.debug("Start Thread Upload");

                Runnable r = new Upload(client, getPropertyBucket(), getPropertyOrganisation(), list);
                // list.clear();
                new Thread(r).start();
                logger.debug("Start Thread Upload: DONE");
                updateOutputQueuesize(list.size());
            }

        }
    }

Die Uploadklasse selbst
Code:
public class Upload implements Runnable {

    private InfluxDBClient client = null;
    private String bucket;
    private String org;
    private List<Point> points;
    private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());

    public Upload(InfluxDBClient client, String bucket, String org, List<Point> points) {
        this.bucket = bucket;
        this.org = org;
        this.client = client;
        this.points = points;
        this.logger.setLevel(Level.INFO);
   
    }

    @Override
    public void run() {
        try {
            logger.debug("create InfluxDB Data: " + points.toString());
            WriteApiBlocking writeApi = client.getWriteApiBlocking();
            logger.debug("upload InfluxDB Data");
            writeApi.writePoints(bucket, org, points);
            logger.debug("upload InfluxDB Data: DONE");
            points.clear();
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }
    }

    public List<Point> getValue() {
        return points;
    }

}
 

mrBrown

Super-Moderator
Mitarbeiter
Das Problem dürfte hauptsächlich das synchronized sein, wodurch das Schreiben in die Liste während des Uploads blockiert wird.

Der einfachste Weg das zu lösen, ohne viel zu ändern, ist wahrscheinlich, nicht mehr so lange über die Liste zu synchronisieren, zB in dem man beim Upload erst eine Kopie der Liste erstellt und sie danach direkt wieder freigibt. Die Liste ist dann nur für die Dauer des Kopierens blockiert, aber nicht während des ganzen Uploads.
 

Fohnbit

Top Contributor
Ja, aber wenn ich gleich das mit einem Producer/Consumer mache, wäre es noch besser, oder?

Ich habe das soweit eingebaut, aber noch nicht heraus gefunden, wie ich einem laufenden Producer neue Werte zum schreiben übermitteln kann. Ich soll den Producer ja nicht ständig starten/beenden, da die Werte teilweise im Millisekunden Takt rein kommen.
 

mrBrown

Super-Moderator
Mitarbeiter
Ja, aber wenn ich gleich das mit einem Producer/Consumer mache, wäre es noch besser, oder?
Producer-Consumer ist erstmal nur ein Pattern für asynchrone Kommunikation, in dem es einen „Erzeuger“ und einen „Verbraucher“ von Daten gibt - genau das hast du jetzt auch schon.

Man kann das natürlich beliebig umbauen, aber dafür hat man zig Möglichkeiten.
 

Fohnbit

Top Contributor
Danke für deinen Tipp und Hilfe.

Dann mache ich es so .. ich kopiere die aktuelle Liste .. verarbeite diese und bei Fehlern beim Upload schreibe ich die Daten zurück in die originale Liste.
 

Fohnbit

Top Contributor
Ich habe das nun einmal so gebaut:
Main Thread:
Java:
public void upload() {
        synchronized (list) {
            logger.debug("Queue size: " + list.size());
            if (list.size() != 0) {
                logger.debug("Start Thread Upload");
                logger.debug("Copy point list");
                List<Point> listCopy = new ArrayList<Point>(list);
                Collections.copy(listCopy, list);
                list.clear();
                Runnable r = new Upload(client, getPropertyBucket(), getPropertyOrganisation(), list, listCopy);
                new Thread(r).start();
                logger.debug("Start Thread Upload: DONE");
                updateOutputQueuesize(list.size());
            }

        }
    }

Die Liste wird kopiert, der Upload Thread gestartet und geleert. Neue Daten landen in der leeren Liste.
Der Upload:
Code:
public class Upload implements Runnable {

    private InfluxDBClient client = null;
    private String bucket;
    private String org;
    private List<Point> points;
    private List<Point> pointsOriginal;
    private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());

    public Upload(InfluxDBClient client, String bucket, String org, List<Point> pointsOriginal, List<Point> points) {
        this.bucket = bucket;
        this.org = org;
        this.client = client;
        this.points = points;
        this.pointsOriginal = pointsOriginal;
        this.logger.setLevel(Level.INFO);
    }

    @Override
    public void run() {
        try {
            logger.debug("create InfluxDB Data: " + points.toString());
            ListIterator<Point> pointIterator = points.listIterator();
            while (pointIterator.hasNext()) {
                List<Point> uploadPayload = new ArrayList<Point>();
                for (int i = 0; i <= 10000; i++) {
                    if (pointIterator.hasNext()) {
                        uploadPayload.add(pointIterator.next());
                        pointIterator.remove();
                    }
                }
                WriteApiBlocking writeApi = client.getWriteApiBlocking();
                logger.debug("upload InfluxDB Data");
                writeApi.writePoints(bucket, org, uploadPayload);
                logger.info("upload InfluxDB Data: " + uploadPayload.size() + " elements DONE");

            }
            pointsOriginal.clear();
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
            pointsOriginal.addAll(points);
        }
    }
}

Die List wird in 10.000 Elemente aufgeteilt und hochgeladen ... bei einem Fehler werden die verbleibenden zurück die originale Liste geschrieben.

Aber scheinbar werden nach 10.001 Elemente die anderen nimmer verarbeitet?
 

Fohnbit

Top Contributor
Fehler gefunden:
Java:
public class Upload implements Runnable {

    private InfluxDBClient client = null;
    private String bucket;
    private String org;
    private List<Point> points;
    private List<Point> pointsOriginal;
    private Logger logger = (Logger) LoggerFactory.getLogger(this.getClass());
    private List<Point> uploadPayload = new ArrayList<Point>();

    public Upload(InfluxDBClient client, String bucket, String org, List<Point> pointsOriginal, List<Point> points) {
        this.bucket = bucket;
        this.org = org;
        this.client = client;
        this.points = points;
        this.pointsOriginal = pointsOriginal;
        this.logger.setLevel(Level.INFO);
    }

    @Override
    public void run() {
        try {
            logger.debug("create InfluxDB Data: " + points.toString());
            ListIterator<Point> pointIterator = points.listIterator();
            while (pointIterator.hasNext()) {
                uploadPayload = new ArrayList<Point>();
                for (int i = 0; i < 10000; i++) {
                    if (pointIterator.hasNext()) {
                        uploadPayload.add(pointIterator.next());
                        pointIterator.remove();
                    }
                }
                WriteApiBlocking writeApi = client.getWriteApiBlocking();
                logger.debug("upload InfluxDB Data");
                writeApi.writePoints(bucket, org, uploadPayload);
                logger.debug("upload InfluxDB Data: " + uploadPayload.size() + " elements DONE");

            }
            points.clear();
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage());
            pointsOriginal.addAll(uploadPayload);
            pointsOriginal.addAll(points);
        }
    }
}
 
Zuletzt bearbeitet:

Fohnbit

Top Contributor
Hallo!

Mir scheint das er nun über die Tage mehr Speicher benötigt. Kann sein das, wenn der Thread beendet etwas im Speicher verbleibt?
 

Neumi5694

Top Contributor
Ich würde eher sagen, der Thread wird nicht ordentlich beendet, ansonsten sollte sich der GC darum kümmern. Lass nach dem points.clear() oder am Besten ganz am Schluss nochmal was ausgeben. Vielleicht hängt ja die eine Methode dank synchronized.

Es kann aber auch sein, dass akzeptiert wird, dass etwas mehr Speicher verwendet wird, anstatt jedesmal neuen freizugeben und wieder zu reservieren - innerhalb eines Limits. Die Wege des GC sind unergründlich.
 

Ähnliche Java Themen

Neue Themen


Oben