Hallo, ich habe versucht, meinen Backpropagation-Algorithmus zu parallelisieren. Dabei stellt die Klasse
Die Klasse
Die Klasse
Problem, die Kommunikation zwischen den Threads, bzw. das Austauschen der Gewichte funktioniert auf diese Weise überhaupt nicht und ich habe auch keine Idee, wie man das verbessern kann.
BackpropagationRunnable
den eigentlichen Algorithmus dar: Dieser arbeitet mit einer Kopie des Netzes und einer Teilmenge der Beispiele, wobei immer über alle Beispiele iteriert wird und jeweils die Gewichte angepasst werden.
Java:
public class BackpropagationRunnable implements Runnable {
private Matrix samples;
private Matrix expOut;
private NeuralNetwork net;
private double alpha;
public BackpropagationRunnable(NeuralNetwork net, Matrix in, Matrix out, double alpha) {
this.net = net;
this.samples = in;
this.expOut = out;
this.alpha = alpha;
}
@Override
public void run() {
for (int i = 0; i < samples.numberOfRows(); i++) {
// the current sample
Vector sample = samples.getRow(i);
// the expected output for the current sample
Vector expectedOutput = expOut.getRow(i);
// the actual output for the current sample
Vector output = net.predict(sample);
// the error of the current sample
Vector error = expectedOutput.sub(output);
// create variable layer and set its value to the output layer
Layer layer = net.getOutputLayer();
// update the error-vector of the output layer: delta = error x g'(input)
layer.setError(error.elementProduct(layer.getInput().map(x -> net.fct.applyDerivation(x))));
// calculate new weights and biases for the output layer
calculateWeightsInOutputLayer(layer, alpha, sample);
// move backwards in the network
layer = layer.getPreviousLayer();
// iterate over all hidden layer
while (layer != null) {
// calculate new weights and biases for the hidden layer
calculateWeightsInHiddenLayer(layer, alpha, sample);
layer = layer.getPreviousLayer();
}
// update weights and biases in the output layer
layer = net.getOutputLayer();
layer.updateWeightsAndBiases();
layer = layer.getPreviousLayer();
// iterate over all hidden layers and update weights and biases
while (layer != null) {
layer.updateWeightsAndBiases();
layer = layer.getPreviousLayer();
}
}
}
/**
* Calculates the new weights and biases in the output layer, for a single
* weight:<br>
* w_{j,k} <- w_{j,k} x alpha x a_j x error_k <br>
* For all weights: <br>
* W' = W + alpha x error x a^T<br>
* W is the {@link Matrix} of weights, a is the vector of outputs of the
* previous layer, error is the vector of errors of the current layer<br>
* For the biases it is the same but a is not used in the formula
*
* @param l the {@link Layer} the updates are calculated for
* @param alpha learning rate
* @param sample the example that is currently seen
*/
protected void calculateWeightsInOutputLayer(Layer l, double alpha, Vector sample) {
Matrix newWeights = Matrix.clone(l.getInputWeights());
Vector newBiases = Vector.clone(l.getBiases());
// error of the layer
Matrix delta = l.getError().toMatrix();
// outputs of the previous layer / input to the net if previousLyer is null
Matrix inputs = (l.getPreviousLayer() == null ? sample : l.getPreviousLayer().getOutput()).toMatrix();
// calculate new weights
newWeights = l.getInputWeights().add(delta.mul(inputs.transpose()).mul(alpha));
// calculate new biases
newBiases = l.getBiases().add(l.getError().mul(alpha));
// update new weights
l.setNewInpWeights(newWeights);
// update new biases
l.setNewBiases(newBiases);
}
/**
* Calculates the error for the current hidden layer, for a single neuron j it
* is:<br>
* error_j = g'(input_j) x sum_k{w_{j,k} * delta_k}<br>
* g' is the derivative of the activation function, delta_k is the error of the
* next layer
* <p>
* Then calculates the new weights and biases, for a single weight:<br>
* w_{j,j} <- w_{j,j} x alpha x a_i x error_j <br>
* For all weights: <br>
* W' = W + alpha x error x a^T<br>
* W is the {@link Matrix} of weights, a is the vector of outputs of the
* previous layer, error is the vector of errors of the current layer<br>
* For the biases it is the same but a is not used in the formula
*
* @param l the {@link Layer} the updates are calculated for
* @param alpha learning rate
* @param sample the example that is currently seen
*/
protected void calculateWeightsInHiddenLayer(Layer l, double alpha, Vector sample) {
ActivationFunction fct = l.getActivationFunction();
Layer next = l.getNextLayer();
// update error-vector
l.setError(l.getInput().map(x -> fct.applyDerivation(x))
.elementProduct(next.getInputWeights().transpose().mul(next.getError())));
Matrix newWeights = Matrix.clone(l.getInputWeights());
Vector newBiases = Vector.clone(l.getBiases());
// error of the layer
Matrix delta = l.getError().toMatrix();
// outputs of the previous layer / input to the net if previousLyer is null
Matrix inputs = (l.getPreviousLayer() == null ? sample : l.getPreviousLayer().getOutput()).toMatrix();
// calculate new weights
newWeights = l.getInputWeights().add(delta.mul(inputs.transpose()).mul(alpha));
// calculate new biases
newBiases = l.getBiases().add(l.getError().mul(alpha));
// update new weights
l.setNewInpWeights(newWeights);
// update new biases
l.setNewBiases(newBiases);
}
public NeuralNetwork getNet() {
return net;
}
public Matrix[] getWeights() {
return net.getWeights();
}
public void setWeights(Matrix[] weights) {
net.setWeights(weights);
}
public Vector[] getBiases() {
return net.getBiases();
}
public void setBiases(Vector[] biases) {
net.setBiases(biases);
}
}
Die Klasse
BackpropagationThread
bekommt das Netz übergeben und eine Anzhal an Iterationen, epochs
, wie oft der Thread die zugehörige Runnable ausführen soll. Vor dem Ausführen von run() werden allerdings die Gewichte aus dem Netz, das alle Threads nutzen, geladen und nach run() werden die neuen Gewichte gespeichert:
Java:
class BackpropagationThread extends Thread {
private final BackpropagationRunnable backprop;
private final NeuralNetwork sharedNet;
private final int epochs;
public BackpropagationThread(NeuralNetwork net, Matrix in, Matrix out, double alpha, int epochs) {
backprop = new BackpropagationRunnable((NeuralNetwork) net.clone(), in, out, alpha);
this.sharedNet = net;
this.epochs = epochs;
}
@Override
public void run() {
for (int a = 0; a < epochs; a++) {
synchronized (sharedNet) {
backprop.setWeights(sharedNet.getWeights());
backprop.setBiases(sharedNet.getBiases());
}
backprop.run();
synchronized (sharedNet) {
Matrix weights[] = backprop.getWeights();
Vector biases[] = backprop.getBiases();
Matrix[] newWeights = sharedNet.getWeights();
Vector[] newBiases = sharedNet.getBiases();
for (int i = 0; i < newWeights.length; i++) {
newWeights[i] = weights[i];
newBiases[i] = biases[i];
}
sharedNet.setWeights(newWeights);
sharedNet.setBiases(newBiases);
backprop.setWeights(newWeights);
backprop.setBiases(newBiases);
}
}
}
public NeuralNetwork getNet() {
return backprop.getNet();
}
public Matrix[] getWeights() {
return backprop.getWeights();
}
public void setWeights(Matrix[] weights) {
backprop.setWeights(weights);
}
public Vector[] getBiases() {
return backprop.getBiases();
}
public void setBiases(Vector[] biases) {
backprop.setBiases(biases);
}
}
Die Klasse
BackpropagationParallel
teilt die Beispielmenge auf und erzeugt die Threads
Java:
public class BackpropagationParallel extends Backpropagation {
@Override
public NeuralNetwork backprop(NeuralNetwork net, Matrix in, Matrix out, double alpha, int epochs) {
Objects.requireNonNull(net, "The specified NeuralNetwork may not be null");
Objects.requireNonNull(in, "The specified input Matrix may not be null");
Objects.requireNonNull(out, "The specified output Matrix may no be null");
if (in.numberOfRows() != out.numberOfRows())
throw new IllegalArgumentException(
"The number of rows of the input matrix must equal the number of rows of the output matrix");
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("available cores: " + cores);
BackpropagationThread[] threads = new BackpropagationThread[cores];
List<Matrix> splittedIn = in.split(cores);
List<Matrix> splittedOut = out.split(cores);
for (int i = 0; i < threads.length; i++) {
threads[i] = new BackpropagationThread(net, splittedIn.get(i), splittedOut.get(i),
alpha, epochs);
}
for (BackpropagationThread t : threads)
t.start();
try {
for (BackpropagationThread t : threads)
t.join();
} catch (InterruptedException e) {
throw new UnsupportedOperationException(e.getMessage());
}
return net;
}
}
Problem, die Kommunikation zwischen den Threads, bzw. das Austauschen der Gewichte funktioniert auf diese Weise überhaupt nicht und ich habe auch keine Idee, wie man das verbessern kann.