package foxnet.network.protocol.transport;
import foxnet.network.protocol.event.FileTransferListener;
import foxnet.network.protocol.event.FileTransferEvent;
import foxnet.network.protocol.requests.FileRequests;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.mina.core.session.IoSession;
/**
*
* @author Kr0e
*/
public class FileTransfer {
protected long counter;
protected LinkedHashMap<Long, FileTask> waitingQueue;
protected LinkedBlockingQueue<FileTask> transfers;
protected LinkedList<FileTransferListener> listeners;
protected ExecutorService threadPool;
protected Thread workerThread;
protected File downloadDir;
protected Long inc() {
counter++;
if(counter == Long.MIN_VALUE)
counter = 0;
return Long.valueOf(counter);
}
protected void init() {
counter = 0;
transfers = new LinkedBlockingQueue<FileTask>();
waitingQueue = new LinkedHashMap<Long, FileTask>();
listeners = new LinkedList<FileTransferListener>();
}
protected class RunnableTransfer implements Runnable {
protected FileTask task;
public RunnableTransfer(FileTask task) {
this.task = task;
}
@Override
public void run() {
try {
//Upload...
if(task instanceof Upload) {
((Upload) task).upload();
//Notify
for(FileTransferListener l : listeners)
l.onUploadData(new FileTransferEvent(this, task));
//Put it back
if(!task.isReady() &&
!task.isAbort())
transfers.put(task);
else
//Notify
for(FileTransferListener l : listeners)
l.onUploadFinished(new FileTransferEvent(this, task));
}
//Download...
else if(task instanceof Download) {
((Download) task).download();
if(task.isReady() ||
task.isAbort()) {
waitingQueue.remove(Long.valueOf(task.getId()));
//Notify
for(FileTransferListener l : listeners)
l.onDownloadFinished(new FileTransferEvent(this, task));
}
else
//Notify
for(FileTransferListener l : listeners)
l.onDownloadData(new FileTransferEvent(this, task));
}
}
catch(Exception e) {
e.printStackTrace();
}
}
}
protected class WorkerThread implements Runnable {
@Override
public void run() {
while(!Thread.interrupted()) {
try {
//Next task
threadPool.submit(new RunnableTransfer(transfers.take()));
}
catch(Exception e) {
e.printStackTrace();
}
}
threadPool.shutdown();
}
}
public FileTransfer(File downloadDir, int maxThreads) {
if(downloadDir == null)
throw new NullPointerException("downloadDir is null");
//Init
init();
//Create a threadPool for the uploads
threadPool = Executors.newFixedThreadPool(maxThreads);
//Save
this.downloadDir = downloadDir;
//Worker thread
(workerThread = new Thread(new WorkerThread())).start();
}
public void addFileTransferListener(FileTransferListener listener) {
if(listener != null)
listeners.add(listener);
}
public void removeFileTransferListener(FileTransferListener listener) {
listeners.remove(listener);
}
public void acceptUpload(FileRequests.Accepted f) throws Exception {
//Get and remove
FileTask tf = waitingQueue.remove(Long.valueOf(f.getUploadId()));
if(! (tf instanceof Upload) )
waitingQueue.put(Long.valueOf(f.getUploadId()), tf);
else if(tf != null) {
Upload upload = (Upload)tf;
//Set the new id
upload.setId(f.getFileId());
//Register for uploading
transfers.put(upload);
//Notify
for(FileTransferListener l : listeners)
l.onUploadAccepted(new FileTransferEvent(this, upload));
}
}
public void requestUpload(Upload upload) throws Exception {
if(upload == null)
return;
//Save as waiting
waitingQueue.put(inc(), upload);
//Send a start request
upload.getSession().write(new FileRequests.Start(upload, counter));
//Notify
for(FileTransferListener l : listeners)
l.onUploadRequested(new FileTransferEvent(this, upload));
}
public FileTask getFileTask(long id) {
return waitingQueue.get(Long.valueOf(id));
}
public void extendDownload(FileRequests.Data data) throws Exception {
if(data == null)
return;
FileTask ft = getFileTask(data.getFileId());
if(ft instanceof Download) {
((Download) ft).getQueue().put(data);
transfers.put(ft);
}
}
public void finishDownload(FileRequests.Finished f) throws Exception {
FileTask ft = getFileTask(f.getFileId());
if(ft instanceof Download) {
((Download) ft).getQueue().put(new FileRequests.Data(null, 0, 0));
transfers.put(ft);
}
}
public void addDownload(IoSession session, FileRequests.Start s) throws Exception {
if(s == null)
return;
Download download = new Download(new File(downloadDir.getPath()+
"/" + s.getFileName()),
session, s.getFileSize(),
s.getFileOffset(), s.getLength());
waitingQueue.put(inc(), download);
download.setId(counter);
download.getSession().write(new FileRequests.Accepted(counter, s.getUploadId()));
//Notify
for(FileTransferListener l : listeners)
l.onDownloadAdded(new FileTransferEvent(this, download));
}
public void close() {
workerThread.interrupt();
}
}