import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
class Host {
	String ip;
	public Host(String ip) {
		super();
		this.ip = ip;
	}
	@Override
	public String toString() {
		return "Host [" + (ip != null ? "ip=" + ip : "") + "]";
	}
}
class Message {
	String message;
	public Message(String message) {
		super();
		this.message = message;
	}
	@Override
	public String toString() {
		return "Message [" + (message != null ? "message=" + message : "")
				+ "]";
	}
}
interface HostListener<T>{
	void onNewHost(final Host host, final BlockingQueue<T> queue);
}
interface MessageListener {
	void onMessages(final Host host, final List<Message> messages);
}
class MessageConsumer implements Callable<Void> {
	
	private static final Logger logger = Logger.getLogger(MessageConsumer.class.getName());
	
	private static final int DEFAULT_MESSAGE_ARRAYSIZE = 100;
	
	private Host host;
	private BlockingQueue<Message> queue;
	private MessageListener messageListener;
	public MessageConsumer(Host host, BlockingQueue<Message> queue,
			MessageListener messageListener) {
		super();
		this.host = host;
		this.queue = queue;
		this.messageListener = messageListener;
	}
	@Override
	public Void call() throws Exception {
		while(!Thread.interrupted()) {								
			final List<Message> messages = new ArrayList<>(DEFAULT_MESSAGE_ARRAYSIZE);
			queue.drainTo(messages);		
			
			if(!messages.isEmpty()) {
				logger.log(Level.FINER, "Dispatching {0} messages to host {1}", new Object [] {messages.size(), host});
				messageListener.onMessages(host, messages);				
			}
			Thread.sleep(100);
		}
		return null;
	}
}
class MessageProducer implements Callable<Void> {
	
	private static final Logger logger = Logger.getLogger(MessageProducer.class.getName());
	
	private static final int NUM_MAXMESSAGES = 10000;
	
	private final Map<Host, BlockingQueue<Message>> hostQueueMap;
	private final HostListener<Message> hostListener;
	
	final Host [] hosts = {new Host("google"), new Host("amazon"), new Host("java-forum")};		
	final String [] messages = {"start", "stop", "burn"};
	final Random random = new Random();		
	public MessageProducer(Map<Host, BlockingQueue<Message>> hostQueueMap,
			HostListener<Message> hostListener) {
		super();
		this.hostQueueMap = hostQueueMap;
		this.hostListener = hostListener;
	}
	private void dispatch(final Host host, final Message message) throws InterruptedException {
		BlockingQueue<Message> queue = hostQueueMap.get(host);
		if(queue == null) {
			queue = new LinkedBlockingQueue<>();
			hostQueueMap.put(host, queue);
			hostListener.onNewHost(host, queue);				
		}
		
		queue.put(message);
	}
	@Override
	public Void call() throws Exception {
		while(!Thread.interrupted()) {	
			final int numMessages = random.nextInt(NUM_MAXMESSAGES) + 1;
			for(int i=0; i < numMessages; i++) {
				
				final Host host = hosts[random.nextInt(hosts.length)];
				final Message msg = new Message(messages[random.nextInt(messages.length)]);
				dispatch(host, msg);
				
				logger.log(Level.FINEST, "Created message {0} for host {1}", new Object [] {host, msg});
			}
			Thread.sleep(10);
		}
		return null;
	}		
};
public class DispatchDemo implements HostListener<Message>, MessageListener {
	
	private static final Logger logger = Logger.getLogger(DispatchDemo.class.getName());
	
	private Map<Host, BlockingQueue<Message>> hostQueueMap = Collections.synchronizedMap(new HashMap<Host, BlockingQueue<Message>>());
	
	private ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
	
	void start() {
		newCachedThreadPool.submit(new MessageProducer(hostQueueMap, this));
		logger.info("started.");
	}
	
	@Override
	public void onNewHost(Host host, BlockingQueue<Message> queue) {
		logger.log(Level.INFO, "adding queue for new host: {0}", host);
		newCachedThreadPool.submit(new MessageConsumer(host, queue, this));		
	}
	
	@Override
	public void onMessages(Host host, List<Message> messages) {
//		logger.log(Level.INFO, "Sending {0} messages to {2}: ({1}) .", new Object[]{messages.size(), messages, host});
		logger.log(Level.INFO, "Sending {0} messages to {2}.", new Object[]{messages.size(), messages, host});
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		System.setProperty("java.util.logging.SimpleFormatter.format", "[%1$tH:%1$tM:%1$tS:%1$tL] %4$s: %5$s%n");
		DispatchDemo dispatchDemo = new DispatchDemo();
		dispatchDemo.start();
		
		Thread.sleep(1000000);
	}
}