parallele / Multithreaded Iteration über Map

beginner99

Aktives Mitglied
Hallo,

ich habe eine Map und jedes der Objekte in values wird ein Wert berechnet, der gewünscht ist.
Das ganze soll auch bei grossen Maps (über 100'000) in erträglicher Zeit ablaufen.
Ich möchte jetzt die Map <Anzahl Prozessor> Teile zerlegen und dann für jeden Teil einen eigenen Thread laufen lassen.
Der key ist wichtig, der entspricht nämlich dem Primärschlüssel der Datenherkunft.

Momentan sehe ich nicht mal, wie ich die Map sinnvoll teilen kann und dabei den Rest berücksichtige.

Ideen?
 
S

SlaterB

Gast
das muss ja mit der Map nichts zu tun haben,
kopiere die Keys in eine Liste, bilde Sublisten

interessant wirds, wenn während der Bearbeitung die Map verändert wird und das auch in der laufenden Bearbeitung berücksichtigt werden soll,
aber lieber nicht, besser neue Keys separat sammeln, am Ende noch verarbeiten,
gelöschte Keys bemerkt jeder der Prozessoren, wenn er zum Key das Objekt in der Map nachschaut und nix mehr da ist
 
B

bygones

Gast
lass mich raten... du hast noch keinen beweis, dass das Szenario überhaupt performance probleme bringt, vielleicht noch nicht mal dass es wirklich so "viele" sind.

Lieg ich falsch - such dir eine lösung
Lieg ich richtig - lass die finger von irgendwelchen "performanceoptimierungen"
 

beginner99

Aktives Mitglied
Die Map ist konstant, es wird nur gelesen. Frage ist, ob sämtliche Map Implementierungen gleichzeitiges lesen unterstützen?
Antwort: scheint so zumindest mit einem naiven test mit kleinen Maps.


Hier mal code, aber der ist langsamer als das gleiche single-threaded zumindest für eine kleine Map von size 2000.
Klar, je grösser die Map desto grösser der Vorteil, aber das erstellen der subListen dauert auch länger je grösser die Map.

Java:
        Map<Integer, IMolecule> molecules = getDataAccessLayer().getAllMolecules();
        profiler.start("Create Fingerprints");
        ArrayList<Integer> molIds = new ArrayList<Integer>(molecules.keySet());
        int mod = molIds.size() % cpus;
        int subListSize = molIds.size()/cpus;
        getLogger().debug("Nr. of CPUs: {}", cpus);
        getLogger().debug("Size of Sublists: {}", subListSize);
        getLogger().debug("Modulo: {}", mod);
        ArrayList<FingerprintRunner> runners = new ArrayList<FingerprintRunner>();
        for (int i = 0; i < cpus; i++) {

            ArrayList<Integer> subList = new ArrayList<Integer>();
            for (int j = i * subListSize; j < i * subListSize + subListSize; j++) {
                subList.add(molIds.get(j));
                getLogger().debug("Id at position {} added to subList {}.", j, i+1);
            }
            // add the rest (%) of entries to last list
            if (i == cpus) {
                for (int c = i * subListSize + subListSize; c < i * subListSize + subListSize + mod; c++) {
                    subList.add(molIds.get(c));
                }
            }

            FingerprintRunner runner = new FingerprintRunner(subList, molecules);
            runners.add(runner);
            runner.start();
        }

        for (FingerprintRunner runner : runners) {

            try {                
                runner.join();
                fingerprints.putAll(runner.subMap);
                getLogger().debug("Added a submap to Main map.");
            }
            catch(InterruptedException ex){
                logger.catching(ex);
            }
        }

Jetzt beim schreiben in den Sinn gekommen: Das erstellen der sublist kann ich ja in den threads selbst machen, muss einfach jedem den Index Bereich mitgeben.
Bleibt noch die Frage, ob das erstellen der Sub-Listen nicht zuviel overhead ist bzw. ob man das besser lösen kann.
(und was mache ich bei einer InterruptedException? sollte aber nie eintreten, oder?)
 

fastjack

Top Contributor
Auf den ersten Blick sieht das mit den Threads kompiliziert aus. Ich weis gerade nicht, was Du mit CPU's meinst. Du kannst pro CPU mehrere Threads zünden.

Ich nehme mal an, Du liest aus der Datenbank den ganzen Schwung an Molekülen auf einmal und speicherst Ihn dann in der Map, die Du dann wiederum für irgendwelche Berechnung benutzt. Wenn es geht, würde ich die Moleküle sowieso häppchenweise (immer 1000, 10000 oder so) lesen und verarbeiten. Das bringt meiner Erfahrung nach bei großen Datenmengen fast immer einen Geschwindigkeitsvorteil. Du ballerst Dir dann nicht auf einmal den Speicher zu. Wenn möglich würde ich verarbeitete Moleküle aus dem Speicher entfernen.

Vielleicht bringt das etwas: n-Berechnungs-Threads, die auf eine threadsafe-Queue zugreifen. Diese Queue wird im Schleifendurchlauf der Map gefüttert. Ein Thread zieht ein Element aus der Queue, verarbeitet es und nimmt das nächste. Die anderen Threads machen dasselbe. Die Threads kannst Du bereits vor dem eigentlichen Schleifendurchlauf starten. Am Ende des Schleifendurchlaufs wird gewartet, bis die Queue leer ist (falls sie das nicht schon ist) und dann werden alle Threads beendet.
 

beginner99

Aktives Mitglied
Auf den ersten Blick sieht das mit den Threads kompiliziert aus. Ich weis gerade nicht, was Du mit CPU's meinst. Du kannst pro CPU mehrere Threads zünden.
ok, blöde variablen-Benennung. Sollte ich in nrOfCores umtaufen oder so.
Java:
int cpus = Runtime.getRuntime().availableProcessors();

Ich nehme mal an, Du liest aus der Datenbank den ganzen Schwung an Molekülen auf einmal und speicherst Ihn dann in der Map, die Du dann wiederum für irgendwelche Berechnung benutzt. Wenn es geht, würde ich die Moleküle sowieso häppchenweise (immer 1000, 10000 oder so) lesen und verarbeiten. Das bringt meiner Erfahrung nach bei großen Datenmengen fast immer einen Geschwindigkeitsvorteil. Du ballerst Dir dann nicht auf einmal den Speicher zu. Wenn möglich würde ich verarbeitete Moleküle aus dem Speicher entfernen.

Da hast du recht. Die Frage ist nur, wie ich das erreiche. Aber denke forum oder google wird hilfreich sein.
Idealerweise wäre auch der lese-schritt multithreaded.

Vielleicht bringt das etwas: n-Berechnungs-Threads, die auf eine threadsafe-Queue zugreifen. Diese Queue wird im Schleifendurchlauf der Map gefüttert. Ein Thread zieht ein Element aus der Queue, verarbeitet es und nimmt das nächste. Die anderen Threads machen dasselbe. Die Threads kannst Du bereits vor dem eigentlichen Schleifendurchlauf starten. Am Ende des Schleifendurchlaufs wird gewartet, bis die Queue leer ist (falls sie das nicht schon ist) und dann werden alle Threads beendet.

Das muss ich mir wohl noch paar mal durchlesen, bis ich es kapiert habe bzw. mal bisschen rumspielen.
 

fastjack

Top Contributor
So ähnlich wie das hier zum Beispiel. Das ist aber nur so eine Art Pseudo-Codeersatz. Keine Ahnung obs funktioniert. Es soll nur den Ansatz demonstrieren, den ich meinte. :rtfm:

Code:
    public void foo() {
        Map<Integer, IMolecule> molecules = ... // die Map
        Queue<Integer> d = new LinkedBlockingQueue<Integer>(); // ... eine Blocking-Queue
        Thread t1 = new Thread(new Runner(d, 50));
        Thread t2 = new Thread(new Runner(d, 100));
        // lets go
        t1.start();
        t2.start();
        // Durchlaufen der map
        for (Iterator<IMolecule> i = d.iterator(); i.hasNext();) {
            int id = i.next().getId();
            d.offer(id);
            i.remove(); // Element aus Map entfernen.
        }
        for(;;) {
            if (d.size() > 0) {
                // noch Elemente vorhanden, also warten
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                break;
            }
        }
        t1.interrupt();
        t2.interrupt();
    }

    class Runner implements Runnable {
        Queue<Integer> q = null;
        long ms = 0;
        public Runner(Queue<Integer> q, long ms) { // referenz auf queue
            super();
            this.q = q;
            this.ms = ms;
        }

        @Override
        public void run() {
            for(;;) {
                Object o = q.poll();
                if (o != null) {
                    // Berechnungen mit Molekül o
                    // ...
                }
                try {
                    Thread.sleep(this.ms); // pausieren
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
 

Marco13

Top Contributor
Hm. Das sieht IMHO aber AUCH (unnötig?) kompliziert aus.

Als erstes stellt sich IMHO die Frage, ob die Moleküle wirklich als Map<Integer, Molecule> geliefert werden müssen. Eine List wäre praktischer. Oder werden die Integers wirklich irgendwo gebraucht? Sind die Integers forlaufend, oder bunt gemischt?

Auf jeden fall sollte aber (selbst wenn man eine Map verwendet, und nicht die Moleküle sondern nur die relevanten IDs als Liste übergeben werden) die Methode "List#subList" verwenden, und nicht einzelne neue Listen erstellen.

Mal ein bißchen rumprobiert - nicht zuuu ernst nehmen...
Java:
import java.util.*;

class MultithreadTest
{
    public static void main(String args[])
    {

        for (int elements = 1000001; elements <= 2000001; elements += 100000)
        {
            System.out.println("Preparing "+elements+" elements");
            List<Integer> list = new ArrayList<Integer>(elements);
            for (int i=0; i<elements; i++)
            {
                list.add(i);
            }

            System.out.println("Running");
            for (int threads = 1; threads <= 8; threads *= 2)
            {
                long before = System.nanoTime();
                long result = process(list, threads);
                long after = System.nanoTime();
                double duration = (after-before)/1e6;
                System.out.println("result with "+threads+" threads : "+result+" time "+duration);
            }
        }
    }

    private static long process(List<Integer> list, int threads)
    {
        int subListSize = (int)Math.ceil((float)list.size() / threads);

        //System.out.println("subListSize "+subListSize);
        List<Thread> threadList = new ArrayList<Thread>();
        List<Runner> runnerList = new ArrayList<Runner>();
        for (int i=0; i<threads; i++)
        {
            int min = i * subListSize;
            int max = (i+1) * subListSize;
            max = Math.min(max, list.size());

            //System.out.println("Runner "+i+" processing "+min+" to "+max);

            List<Integer> subList = list.subList(min, max);
            Runner runner = new Runner(subList);
            Thread thread = new Thread(runner);
            runnerList.add(runner);
            threadList.add(thread);
            thread.start();
        }

        long result = 0;
        for (int i=0; i<threads; i++)
        {
            Thread thread = threadList.get(i);
            try
            {
                thread.join();
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
            Runner runner = runnerList.get(i);

            //System.out.println("result "+i+": "+runner.result);

            result += runner.result;
        }
        result /= threads;

        return result;
    }

    private static class Runner implements Runnable
    {
        private long result = 0;
        private List<Integer> list;
        public Runner(List<Integer> list)
        {
            this.list = list;
        }

        public void run()
        {
            //System.out.println("Runner started");
            for (int i=0; i<list.size(); i++)
            {
                double d = list.get(i);
                result += Math.round(Math.cos(d)*Math.cos(d)+Math.sin(d)*Math.sin(d));
            }
            result /= list.size();
            //System.out.println("Runner finished");
        }
    }


}
 

beginner99

Aktives Mitglied
lass mich raten... du hast noch keinen beweis, dass das Szenario überhaupt performance probleme bringt, vielleicht noch nicht mal dass es wirklich so "viele" sind.

Lieg ich falsch - such dir eine lösung
Lieg ich richtig - lass die finger von irgendwelchen "performanceoptimierungen"

ja und nein. Ich könnte jetzt 5 Seiten schreiben, dass zu erklären, aber mal ne Kurzfassung:

Das ganze ist ein "Hobby/Übungs Projekt" und nicht kommerziell und ich mache momentan eine Weiterbildung in Informatik (man verzeihe mir deshalb gewisse Bildungslücken) und das würde sich dann auch noch gut als Abschlussarbeit machen, wenn ich dann soweit bin. Schlussendlich ist es dem Anwender überlassen, wie gross sein Datensatz ist und wenn es zu langsam ist, wird er es einfach nicht benutzen, was schlussendlich egal ist. Als Lerneffekt von Multithreading wäre es aber eine gute Möglichkeit.

Aber du hast Recht. Obige Berechnung wird nicht dauernd gemacht, das Resultat wandert dann zurück in die Datenbank. Es ist also eher eine einmalige Aktion (Daten müssen aber gepflegt werden und ab und zu das ganze neu erstellen wird wohl auch nötig sein), deshalb kann das auch dauern und ist nicht performance kritisch (Bei 100'000 records dürfte es in den Stundenbereich kommen, aber natürlich extrem infrastruktur abhängig).

Beim weiteren überlegen gibt es einen anderen Bereich, der performance kritischer wäre und die eigentliche Hauptfunktionalität darstellt. Aber auch da wird über eine Map Iteriert, die geteilt werden müsste und am Ende sollte ich eine Liste haben. Also ja, den Hebel habe ich hier wohl falsch angesetzt.


Hm. Das sieht IMHO aber AUCH (unnötig?) kompliziert aus.

Als erstes stellt sich IMHO die Frage, ob die Moleküle wirklich als Map<Integer, Molecule> geliefert werden müssen. Eine List wäre praktischer. Oder werden die Integers wirklich irgendwo gebraucht? Sind die Integers forlaufend, oder bunt gemischt?

Integer ist der Primärschlüssel, also ja der ist nötig. Die logische Folge-Frage ist, wieso ich den nicht ins Molekül stecke. Die Antwort ist, das IMolecule ein Interface ist und zwar aus einer anderen Bibliothek, auf dem das ganze basiert:
SourceForge.net: cdk

Mein Projekt soll auch keine Applikation liefern sondern auch mehr eine Bibliothek. Somit soll jeder seine eigene Implementation von IMolecule verwenden können.

Das beantwortet auch gleich deine zweite Frage, in der Regel dürften die Integers fortlaufend sein, ist aber nicht wirklich sicher und lücken könnten auch auftreten durch löschen oder Sequence cache.

Genug geschrieben, wird jetzt schon schwer sein Leser zu finden für den Text. ;)
 
G

Guest2

Gast
Moin,

wenn ich ne Map/List/Array, oder was auch immer, auf mehrere Threads verteilen will, nutze ich meistens folgendes Schema:

Java:
import java.util.HashMap;
import java.util.Map;
import java.util.Random;


public class Test {

    private static final int THREADS = Runtime.getRuntime().availableProcessors();

    private static final class IMolecule {

        private static final Random random = new Random(System.currentTimeMillis());

        double                      value  = random.nextDouble();
        double                      result;

        @Override
        public String toString() {

            return value + ":" + result;

        }

    }


    private final void process(final Map<Integer, IMolecule> molecules) {

        final Thread[] threads = new Thread[THREADS];
        final Integer[] set = molecules.keySet().toArray(new Integer[0]);

        for (int i = 0; i < THREADS; i++) {

            final int thread = i;

            threads[i] = new Thread(new Runnable() {


                @Override
                public void run() {

                    for (int j = thread; j < set.length; j += THREADS)
                        process(j, molecules.get(set[j]));

                }
            });

            threads[i].start();

        }

        try {

            for (int i = 0; i < THREADS; i++)

                threads[i].join();

        } catch (final InterruptedException e) {

            Thread.currentThread().interrupt();

        }

    }


    private final void process(final int index, final IMolecule iMolecule) {

        iMolecule.result = Math.cos(iMolecule.value) * Math.tan(iMolecule.value) + Math.sin(iMolecule.value) * Math.tan(iMolecule.value) + index;

    }


    public static void main(final String[] args) {

        final int size = Integer.MAX_VALUE / 50;
        System.out.println("size: " + size);

        final Map<Integer, IMolecule> molecules = new HashMap<Integer, IMolecule>(size);

        System.out.print("prepare ");
        final long prepareTime = System.currentTimeMillis();
        for (int i = 0; i < size; i++)
            molecules.put(i, new IMolecule());
        System.out.println("in: " + ((System.currentTimeMillis() - prepareTime) / 1000) + "s");

        System.out.print("process ");
        final Test test = new Test();
        final long processTime = System.currentTimeMillis();
        test.process(molecules);
        System.out.println("in: " + ((System.currentTimeMillis() - processTime) / 1000) + "s");

        System.out.println("result");
        // System.out.println(molecules);

    }

}

Ergibt:

Code:
>java -Xms8096m -Xmx8096m -XX:-UseParallelGC Test
size: 42949672
prepare in: 14s
process in: 4s

Der Ansatz ist also ähnlich dem von Marco, nur dass ich das KeySet gnadenlos in ein Array kopiere und dann einfach auf jedem "thread'em" Element rumrechne.

Gruß,
Fancy
 

LoR

Bekanntes Mitglied
Auf Basis des schon gesagten könnte man das auch so machen:

Java:
public interface IMolecule {

    void process();

    double getResult();
}

Java:
public class H2O implements IMolecule {

    private final double value;
    private double result;

    public H2O(double value) {
        this.value = value;
    }

    public double getValue() {
        return value;
    }

    @Override
    public void process() {
        result = Math.cos(value) * Math.tan(value) + Math.sin(value) * Math.tan(value);
    }

    @Override
    public double getResult() {
        return result;
    }
}

Java:
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MoleculeProcessor {

    private final Map<Integer, IMolecule> moleculeMap;
    private final ExecutorService execService;
    private final int nThreads;

    public MoleculeProcessor(Map<Integer, IMolecule> moleculeMap, int nThreads) {
        this.execService = Executors.newFixedThreadPool(nThreads);
        this.moleculeMap = moleculeMap;
        this.nThreads = nThreads;
    }

    public void excecute() throws InterruptedException {
        try {
            for (int index = 0; index < nThreads; index++) {
                execService.submit(new MoleculeWorker(moleculeMap, nThreads, index));
            }
        } finally {
            execService.shutdown();
            execService.awaitTermination(10, TimeUnit.SECONDS);
        }
    }

    private final class MoleculeWorker implements Runnable {

        private final int index;
        private final Map<Integer, IMolecule> moleculeMap;
        private final int nThreads;

        public MoleculeWorker( Map<Integer, IMolecule> moleculeMap, int nThreads, int index) {
            this.moleculeMap = moleculeMap;
            this.nThreads = nThreads;
            this.index = index;
        }

        @Override
        public void run() {
            for (int i = index; i < moleculeMap.size(); i += nThreads) {
                IMolecule molecule = moleculeMap.get(i);
                molecule.process();
            }
        }
    }
}


Java:
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class Main {

    private static final int SIZE = (Integer.MAX_VALUE / 100);
    private static final Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) throws InterruptedException {
        System.out.print("prepare ");
        final long prepareTime = System.currentTimeMillis();
        Map<Integer, IMolecule> map = new HashMap<Integer, IMolecule>();
        for (int i = 0; i < SIZE; i++) {
            map.put(i, new H2O(random.nextDouble()));
        }
        System.out.println("in: " + ((System.currentTimeMillis() - prepareTime) / 1000) + "s");

        System.out.print("process ");
        final MoleculeProcessor processor = new MoleculeProcessor(map, 8);
        final long processTime = System.currentTimeMillis();
        processor.excecute();
        System.out.println("in: " + ((System.currentTimeMillis() - processTime) / 1000) + "s");
    }
}

@Guest2
Hast du eigentlich das ganze mal mit random.nextInt() oder irgeneinem anderen ganzzahligen Wert ausprobiert :D? Daran sieht man dann mal wieder was so eine scheinbar harmlose Änderung für Unterschiede macht.
 
Ähnliche Java Themen
  Titel Forum Antworten Datum
M Wie funktionieren parallele Java Streams? Allgemeine Java-Themen 1
S Threads Kann mir jemand helfen eine parallele Hilfsklasse zu implementieren..? Allgemeine Java-Themen 3
M Parallele Konzepte in Java Allgemeine Java-Themen 4
M Parallele Programmierung: volatile Variable nimmt ungewöhnlichen Wert an Allgemeine Java-Themen 3
C Open Soure Projekte für parallele Programmierung Allgemeine Java-Themen 6
6 Java - Threads - parallele Programmierung - Tutorial Allgemeine Java-Themen 6
I parallele Programmierung mit Java Allgemeine Java-Themen 3
P Java 3D parallele Ebenen Allgemeine Java-Themen 4
S Programm für parallele bearbeitung Allgemeine Java-Themen 11
V Parallele Ausführung Allgemeine Java-Themen 8
A Parallele Threads oder verschachtelte? Allgemeine Java-Themen 7
T parallele Schnittstelle auslesen Allgemeine Java-Themen 5
H javax.comm und Parallele Schnittstelle (PIN für PIN) Allgemeine Java-Themen 7
H Zugriff auf parallele Schnittstelle Allgemeine Java-Themen 4
alderwaran Hoher Sys-Load bei Multithreaded Anwendung Allgemeine Java-Themen 8
M Multithreaded Linear Search Allgemeine Java-Themen 9
B Liste ändern während Iteration über Diese? Allgemeine Java-Themen 16
AmsananKING String Iteration Allgemeine Java-Themen 5
H Input/Output Iteration in Feldern Allgemeine Java-Themen 8
M Normalized Iteration count funktioniert nicht. Wo ist mien Denkfehler? Allgemeine Java-Themen 6
J Rekursion oder Iteration - verkettete Listen Allgemeine Java-Themen 8
R Controlling Programm (Objekt-Iteration Problem) Allgemeine Java-Themen 9
O Verschachtelte Iteration: Innere Iteration abbrechen Allgemeine Java-Themen 3
W sortierte Iteration über Set oder Map, bzw. Collections Allgemeine Java-Themen 5
G Performante Array iteration Allgemeine Java-Themen 27

Ähnliche Java Themen

Neue Themen


Oben