Hallo zusammen,
ich versuche 2 Applikationen über ActiveMQ miteinander kommunizieren zu lassen. Dazu gibt es eine Applikation die den Server darstellt (hier läuft der ActiveMQ Broker auf Port 61616) und eine Applikation die den Client darstellt. Der Server broadcastet in regelmäßigen Abständen einige Messages an ein Topic, welches die Clients abonieren können, um dieses Messages zu erhalten. Zusätzlich verfügt die Server Applikation über einen Consumer, welcher Requests (über eine Queue) entgegen nehmen soll und dort den Clients antwortet. Dies funktioniert auch alles soweit so gut, solange beide Programme auf dem selben Rechner laufen.
Läuft die Client Applikation dann tatsächlich auf einem anderen Rechner, wie es vorgesehen ist, kann diese zwar noch das Topic subscriben und erhält die Nachrichten darin. Allerdings kann der Client keine Requests senden, bzw. diese kommen gar nicht in der Queue oder beim Server/Broker an.
Alles was ich bisher gecheckt und versucht habe:
Da das Szenario perfekt funktioniert wenn es auf dem selben Rechner läuft, vermute ich dass der Code soweit passt. Auch das der Client, wenn er auf einem anderen Rechner läuft die Nachrichten vom Server noch subscriben kann und erhält deutet für mich darauf hin dass die Connection soweit passen muss.
Hat jmd eine Idee ob ich irgendwelche Konfigurationen, Freigaben, etc. übersehen habe oder warum die Messages vom Client nicht in die Queue raus gehen?
Hier noch der Code für die Requests (Auf das wichtigstes gekürzt):
Server:
Client:
ich versuche 2 Applikationen über ActiveMQ miteinander kommunizieren zu lassen. Dazu gibt es eine Applikation die den Server darstellt (hier läuft der ActiveMQ Broker auf Port 61616) und eine Applikation die den Client darstellt. Der Server broadcastet in regelmäßigen Abständen einige Messages an ein Topic, welches die Clients abonieren können, um dieses Messages zu erhalten. Zusätzlich verfügt die Server Applikation über einen Consumer, welcher Requests (über eine Queue) entgegen nehmen soll und dort den Clients antwortet. Dies funktioniert auch alles soweit so gut, solange beide Programme auf dem selben Rechner laufen.
Läuft die Client Applikation dann tatsächlich auf einem anderen Rechner, wie es vorgesehen ist, kann diese zwar noch das Topic subscriben und erhält die Nachrichten darin. Allerdings kann der Client keine Requests senden, bzw. diese kommen gar nicht in der Queue oder beim Server/Broker an.
Alles was ich bisher gecheckt und versucht habe:
- TCP Port 61616 ist auf beiden Rechnern freigegeben (eingehend + ausgehend)
- Die Rechner lassen sich gegenseitig anpingen
- Broker kommuniziert auch tatsächlich auf 61616
- Message kommt nicht in der Queue des Broker an (gesehen über Web-Tool vom Broker)
- Konfiguration (IP, Port) in beiden Applikationen ist korrekt
- activemq.xml des Brokers:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Da das Szenario perfekt funktioniert wenn es auf dem selben Rechner läuft, vermute ich dass der Code soweit passt. Auch das der Client, wenn er auf einem anderen Rechner läuft die Nachrichten vom Server noch subscriben kann und erhält deutet für mich darauf hin dass die Connection soweit passen muss.
Hat jmd eine Idee ob ich irgendwelche Konfigurationen, Freigaben, etc. übersehen habe oder warum die Messages vom Client nicht in die Queue raus gehen?
Hier noch der Code für die Requests (Auf das wichtigstes gekürzt):
Server:
Java:
package xxx;
import xxx;
public class ActiveMQRequestHandler {
private static String BROKER_URL = null;
private static final String REQUEST_QUEUE = "LR_REQUEST_QUEUE";
private Connection connection;
private Session session;
private Destination requestQueue;
private ExecutorService executorService = Executors.newCachedThreadPool();
public ActiveMQRequestHandler() {
BROKER_URL = "tcp://" + Main.getMasterSettings().ACTIVEMQIP + ":" + Main.getMasterSettings().PORT; // tcp://localhost:61616
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
connectionFactory.setTrustAllPackages(true);
connection = connectionFactory.createConnection();
connection.setClientID("LR_Request_Consumer");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //non transactional
requestQueue = session.createQueue(REQUEST_QUEUE);
connection.start();
} catch (JMSException e) {
Main.errorLogger.safeExepToTxt(e, "Setting up ActiveMQ Factory for LR Consumer");
e.printStackTrace();
}
}
public void start() throws JMSException {
//This Consumer only listens to the RequestQueue
MessageConsumer requestConsumer = session.createConsumer(requestQueue);
requestConsumer.setMessageListener(new RequestHandler());
}
private class RequestHandler implements MessageListener {
@Override
public void onMessage(Message request) {
executorService.submit(new Worker(request));
}
}
private class Worker implements Runnable {
private Message requestMsg;
public Worker(Message request) {
this.requestMsg = request;
}
//Run when new Request is recieved on RequestQueue
@Override
public void run() {
try {
//Set Up to Reply the Request
Destination replyDestoForClient = requestMsg.getJMSReplyTo();
MessageProducer replyProducer = session.createProducer(replyDestoForClient);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
replyProducer.setTimeToLive(5000);
//Reply Objects/Messages
TextMessage replyMsg = session.createTextMessage();
replyMsg.setJMSCorrelationID("Leitrechner");
ObjectMessage replyObj = session.createObjectMessage();
//#################################################################################################################
//############ Commands ###########################################################################################
//#################################################################################################################
if (requestMsg instanceof ObjectMessage){
ObjectMessage recObj = (ObjectMessage) requestMsg;
RequestMessage requestMessage = (RequestMessage) recObj.getObject();
String requestText = requestMessage.getRequestTxt();
Object requestObject = requestMessage.getRequestObj();
Object requestObject2 = requestMessage.getRequestObj2();
replyDestoForClient = requestMsg.getJMSReplyTo();
String[] recTxtWords = requestText.split("\\.");
String clientID = requestMsg.getJMSCorrelationID();
//String clientID = recTxtWords[0];
//String target = recTxtWords[1]; // Always = leitrechner
String msgType = recTxtWords[2];
String subject = recTxtWords[3];
Boolean success;
if(msgType.equals("command")){
switch (subject){
//############ Aufträge ###########################################################################################
case "createAuftrag":{
Auftrag recAuftrag = (Auftrag) requestObject;
//Auftrag erstellen
success = Auftrag.createNewAuftrag(recAuftrag);
//Antwort mit Success
replyMsg.setText(String.valueOf(success));
replyProducer.send(replyDestoForClient, replyMsg);
System.out.println(requestText + "\t-> Success Feedback: " + success);
Main.commLogger.safeToTxt(requestText + "\t-> Success Feedback: " + success);
break;
}
}
//Unkown Subject
else {
System.out.println(requestText + "\n-> Unknown Subject for a TextMessage (should be a Request Type)!");
Main.errorLogger.safeDescripToTxt(requestText + "\n-> Unknown Subject for a TextMessage (should be a Request Type)!");
}
}
else {
System.out.println("Recieved a Message which is not a TextMessage or a ObjectMessage");
Main.errorLogger.safeDescripToTxt("Recieved a Message which is not a TextMessage or a ObjectMessage");
}
} catch (JMSException e) {
e.printStackTrace();
Main.errorLogger.safeExepToTxt(e, "Error in ActiveMQ LR Consumer for incoming Client Messages");
}
}
}
public void stop() throws JMSException {
connection.close();
executorService.shutdown();
}
}
Client:
Java:
package xxx;
import xxx;
public class ActiveMQRequester {
private String clientID;
private ActiveMQConnectionFactory factory;
private Exception excep = null;
private final int TIMEOUT = 1500;
private String command;
private String commandParam;
private static String BROKER_URL = null;
private static final String REQUEST_QUEUE = "LR_REQUEST_QUEUE";
private String request;
private String requestParam;
public ActiveMQRequester(){
BROKER_URL = "tcp://" + Main.getClientSettings().ACTIVEMQIP + ":" + Main.getClientSettings().PORT; // tcp://192.168.0.2:61616
try {
clientID = InetAddress.getLocalHost().getHostName() + "-" + System.getProperty("user.name");
// Create a connection factory
factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true); //Damit Objekte gesendet werden dürfen
}catch (Exception exClass){
Main.errorLogger.safeExepToTxt(exClass, "Init ActiveMQ Producer");
}
}
private void sendMessage(MessageType msgType) throws Exception {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
Destination replyDestForClient = null;
try {
//Create connection + session
connection = factory.createConnection();
connection.setClientID(clientID);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create Destination Queue for Command + Producer to Send Command
Destination lrRequestQueue = session.createQueue(REQUEST_QUEUE);
producer = session.createProducer(lrRequestQueue);
producer.setTimeToLive(2500);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create Temp Response Destination + Consumer for Reply
replyDestForClient = session.createTemporaryQueue();
MessageConsumer replyConsumer = session.createConsumer(replyDestForClient);
Message replyMsg;
//############################################################
//######## SEND A COMMAND ####################################
//############################################################
if(msgType == MessageType.COMMAND){
//Create Message for Request
RequestMessage requestMessage;
ObjectMessage requestObjMessageToLR = session.createObjectMessage();
requestObjMessageToLR.setJMSReplyTo(replyDestForClient);
requestObjMessageToLR.setJMSCorrelationID(clientID);
switch (command) {
case "createAuftrag":{
//Send Request to LR
requestMessage = new RequestMessage(commandParam, auftragToSend, null);
requestObjMessageToLR.setObject(requestMessage);
producer.send(lrRequestQueue, requestObjMessageToLR);
//Reply from LR
replyMsg = replyConsumer.receive(TIMEOUT);
if (replyMsg instanceof TextMessage) {
if (((TextMessage) replyMsg).getText().equals("true")) {
//Successful
} else
excep = new Exception("Leitrechner Antwort: Auftrag konnte nicht erstellt werden!");
} else
excep = new Exception("Keine Leitrechner Antwort über den Erfolg!");
break;
}
}
}
}
catch (Exception ex) {
throw ex;
}finally {
if(producer!=null)
producer.close();
if(session!=null)
session.close();
if(connection!=null)
connection.close();
}
//Throw Exception if Command was not successful at LR
if(excep != null){
Exception tmpEx = excep;
excep = null; // wieder zurück setzen
throw tmpEx;
}
}
//##########################################################################
//##################### COMMANDS ###########################################
//##########################################################################
public void doAuftragAnlegen(Auftrag auftrag) throws Exception {
command = "createAuftrag";
commandParam = clientID + ".leitrechner.command." + "createAuftrag";
auftragToSend = auftrag;
sendMessage(MessageType.COMMAND);
}
}