import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
class Host {
String ip;
public Host(String ip) {
super();
this.ip = ip;
}
@Override
public String toString() {
return "Host [" + (ip != null ? "ip=" + ip : "") + "]";
}
}
class Message {
String message;
public Message(String message) {
super();
this.message = message;
}
@Override
public String toString() {
return "Message [" + (message != null ? "message=" + message : "")
+ "]";
}
}
interface HostListener<T>{
void onNewHost(final Host host, final BlockingQueue<T> queue);
}
interface MessageListener {
void onMessages(final Host host, final List<Message> messages);
}
class MessageConsumer implements Callable<Void> {
private static final Logger logger = Logger.getLogger(MessageConsumer.class.getName());
private static final int DEFAULT_MESSAGE_ARRAYSIZE = 100;
private Host host;
private BlockingQueue<Message> queue;
private MessageListener messageListener;
public MessageConsumer(Host host, BlockingQueue<Message> queue,
MessageListener messageListener) {
super();
this.host = host;
this.queue = queue;
this.messageListener = messageListener;
}
@Override
public Void call() throws Exception {
while(!Thread.interrupted()) {
final List<Message> messages = new ArrayList<>(DEFAULT_MESSAGE_ARRAYSIZE);
queue.drainTo(messages);
if(!messages.isEmpty()) {
logger.log(Level.FINER, "Dispatching {0} messages to host {1}", new Object [] {messages.size(), host});
messageListener.onMessages(host, messages);
}
Thread.sleep(100);
}
return null;
}
}
class MessageProducer implements Callable<Void> {
private static final Logger logger = Logger.getLogger(MessageProducer.class.getName());
private static final int NUM_MAXMESSAGES = 10000;
private final Map<Host, BlockingQueue<Message>> hostQueueMap;
private final HostListener<Message> hostListener;
final Host [] hosts = {new Host("google"), new Host("amazon"), new Host("java-forum")};
final String [] messages = {"start", "stop", "burn"};
final Random random = new Random();
public MessageProducer(Map<Host, BlockingQueue<Message>> hostQueueMap,
HostListener<Message> hostListener) {
super();
this.hostQueueMap = hostQueueMap;
this.hostListener = hostListener;
}
private void dispatch(final Host host, final Message message) throws InterruptedException {
BlockingQueue<Message> queue = hostQueueMap.get(host);
if(queue == null) {
queue = new LinkedBlockingQueue<>();
hostQueueMap.put(host, queue);
hostListener.onNewHost(host, queue);
}
queue.put(message);
}
@Override
public Void call() throws Exception {
while(!Thread.interrupted()) {
final int numMessages = random.nextInt(NUM_MAXMESSAGES) + 1;
for(int i=0; i < numMessages; i++) {
final Host host = hosts[random.nextInt(hosts.length)];
final Message msg = new Message(messages[random.nextInt(messages.length)]);
dispatch(host, msg);
logger.log(Level.FINEST, "Created message {0} for host {1}", new Object [] {host, msg});
}
Thread.sleep(10);
}
return null;
}
};
public class DispatchDemo implements HostListener<Message>, MessageListener {
private static final Logger logger = Logger.getLogger(DispatchDemo.class.getName());
private Map<Host, BlockingQueue<Message>> hostQueueMap = Collections.synchronizedMap(new HashMap<Host, BlockingQueue<Message>>());
private ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
void start() {
newCachedThreadPool.submit(new MessageProducer(hostQueueMap, this));
logger.info("started.");
}
@Override
public void onNewHost(Host host, BlockingQueue<Message> queue) {
logger.log(Level.INFO, "adding queue for new host: {0}", host);
newCachedThreadPool.submit(new MessageConsumer(host, queue, this));
}
@Override
public void onMessages(Host host, List<Message> messages) {
// logger.log(Level.INFO, "Sending {0} messages to {2}: ({1}) .", new Object[]{messages.size(), messages, host});
logger.log(Level.INFO, "Sending {0} messages to {2}.", new Object[]{messages.size(), messages, host});
}
public static void main(String[] args) throws InterruptedException {
System.setProperty("java.util.logging.SimpleFormatter.format", "[%1$tH:%1$tM:%1$tS:%1$tL] %4$s: %5$s%n");
DispatchDemo dispatchDemo = new DispatchDemo();
dispatchDemo.start();
Thread.sleep(1000000);
}
}