import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
/**
*
* @author achr
*/
public class Java7FileAioTest {
private int count = 10000000; // write a total of 10mio entries
private int setSize = 12;
private int blockFactor = 1000000;
private ByteBuffer buffer = ByteBuffer.allocate(blockFactor * setSize); // write 1mio entries with 12 bytes each at once
private final HashMap<Integer, Long> map1;
private final HashMap<Integer, Long> map2;
private Iterator<Integer> iterator;
private AsynchronousFileChannel afc;
private Semaphore sema = new Semaphore(1);
long readMapSize = -1;
long readPos = 0;
long fileSize = -1;
private long start;
private long stop;
private CompletionHandler<Integer, AsynchronousFileChannel> writeSetCompletion = new CompletionHandler<Integer, AsynchronousFileChannel>() {
@Override
public void completed(Integer result, AsynchronousFileChannel attachment) {
writeSet(attachment);
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel attachment) {
throw new UnsupportedOperationException("Not supported yet.");
}
};
private CompletionHandler<Integer, AsynchronousFileChannel> readSetCompletion = new CompletionHandler<Integer, AsynchronousFileChannel>() {
@Override
public void completed(Integer result, AsynchronousFileChannel attachment) {
readPos += result;
readSet(attachment);
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel attachment) {
throw new UnsupportedOperationException("Not supported yet.");
}
};
public Java7FileAioTest() {
start = System.currentTimeMillis();
map1 = new HashMap<>(count); // init hashmap mit voller größe --> geht dann schneller beim befüllen
map2 = new HashMap<>(count); // init hashmap mit voller größe --> geht dann schneller beim befüllen
stop = System.currentTimeMillis();
System.out.println("Creating map: " + ((stop - start)/2) + "ms");
long fillDuration = 0;
long value = 0;
for (int key = 0; key < count; key++) {
if (key % 1000000 == 0) {
System.out.println("Adding key: " + key);
}
start = System.nanoTime();
map1.put(key, value);
stop = System.nanoTime();
fillDuration += (stop - start);
value++;
}
System.out.println("Filling map takes " + (fillDuration / 1000000) + "ms");
System.out.println("Start writing...");
start = System.currentTimeMillis();
writeToFile();
stop = System.currentTimeMillis();
System.out.println("Writing takes " + (stop - start) + " ms");
System.out.println("Write-Speed: "+((double)fileSize/(double)((stop-start)/1000d))/1000000d+" mb/sec");
map1.clear();
System.out.println("Start reading + filling map...");
start = System.currentTimeMillis();
readFile();
stop = System.currentTimeMillis();
System.out.println("Reading+Filling takes " + (stop - start) + " ms");
System.out.println("Start verifying...");
start = System.currentTimeMillis();
verify();
stop = System.currentTimeMillis();
System.out.println("Verifying takes " + (stop - start) + " ms");
}
private void writeToFile() {
try {
sema.acquire();
File target = new File("/home/achr/HashMap.dat");
if (!target.exists()) {
target.createNewFile();
}
Path path = FileSystems.getDefault().getPath("/home/achr", "HashMap.dat");
afc = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
// single buffer for map size
ByteBuffer bufferMapSize = ByteBuffer.allocate(8);
bufferMapSize.putLong(map1.size());
bufferMapSize.flip();
// init iterator for later use in "write()" method
iterator = map1.keySet().iterator();
// initial write of map size
afc.write(bufferMapSize, afc.size(), afc, new CompletionHandler<Integer, AsynchronousFileChannel>() {
@Override
public void completed(Integer result, AsynchronousFileChannel attachment) {
// delegate writing of map entries
writeSet(attachment);
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel attachment) {
throw new UnsupportedOperationException("Not supported yet.");
}
});
// waiting for end of write
sema.acquireUninterruptibly();
sema.release();
fileSize = afc.size();
System.out.println("File has size: " + fileSize);
// close file
afc.close();
} catch (FileNotFoundException ex) {
ex.printStackTrace();
} catch (IOException | InterruptedException ex) {
ex.printStackTrace();
}
}
private void writeSet(AsynchronousFileChannel afc) {
boolean somethingToWrite = false;
boolean endOfMap = false;
buffer.clear();
// fill complete buffer ...
while (buffer.hasRemaining()) {
// but only until end of map reached
if (iterator.hasNext()) {
Integer key = iterator.next();
Long value = map1.get(key);
buffer.putInt(key);
buffer.putLong(value);
somethingToWrite = true;
} else {
endOfMap = true;
System.out.println("done writing.");
break;
}
}
buffer.flip();
if (somethingToWrite) {
try {
afc.write(buffer, afc.size(), afc, writeSetCompletion);
} catch (IOException ex) {
ex.printStackTrace();
}
}
if (endOfMap) {
// signal end of write to 'main'
sema.release();
}
}
private void readFile() {
try {
sema.acquire();
Path path = FileSystems.getDefault().getPath("/home/achr", "HashMap.dat");
afc = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
final ByteBuffer mapSizeBuffer = ByteBuffer.allocate(8);
mapSizeBuffer.clear();
afc.read(mapSizeBuffer, 0, afc, new CompletionHandler<Integer, AsynchronousFileChannel>() {
@Override
public void completed(Integer result, AsynchronousFileChannel attachment) {
if (result != 8) {
throw new RuntimeException("Not able to read map size");
}
readPos += result;
mapSizeBuffer.flip();
readMapSize = mapSizeBuffer.getLong();
System.out.println("map has size: " + readMapSize);
// trigger reading set
buffer.clear();
afc.read(buffer, readPos, afc, readSetCompletion);
}
@Override
public void failed(Throwable exc, AsynchronousFileChannel attachment) {
throw new UnsupportedOperationException("Not supported yet.");
}
});
// waiting for end of read
sema.acquireUninterruptibly();
sema.release();
// close file
afc.close();
} catch (InterruptedException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
private void readSet(AsynchronousFileChannel afc) {
buffer.flip();
// get data from buffer
for (int i = 0; i < blockFactor; i++) {
int key = buffer.getInt();
long val = buffer.getLong();
map2.put(key, val);
// System.out.println("Read: "+key+"/"+val);
}
if (readPos >= fileSize) {
// signal end of read to 'main'
sema.release();
} else {
// end not yet reached
// delegate next read
buffer.clear();
afc.read(buffer, readPos, afc, readSetCompletion);
}
}
private void verify() {
boolean allOk = true;
for(int i=0;i<count;i++) {
Long value = map2.get(i);
if (value==null){
allOk = false;
System.out.println("Key '"+i+"' is missing.");
} else if (value.intValue()!=i) {
allOk = false;
System.out.println("Value for key '"+i+"' is wrong: "+value);
break;
}
}
if (!allOk) {
System.out.println("Verification failed!");
} else {
System.out.println("Verification succeded!");
}
}
public static void main(String[] args) {
new Java7FileAioTest();
}
}