diff options
author | Fraser Adams <fadams@apache.org> | 2014-08-30 08:48:02 +0000 |
---|---|---|
committer | Fraser Adams <fadams@apache.org> | 2014-08-30 08:48:02 +0000 |
commit | b85515ffc6ee8884d58ff72f759d00b02472350f (patch) | |
tree | a19be47d48df7fb1d39f4c0f1b2781b3eb1468fc | |
parent | 0363349e3bb06c650d9a039f891996d7eebfbe7a (diff) | |
download | qpid-python-b85515ffc6ee8884d58ff72f759d00b02472350f.tar.gz |
QPID-6059: Java QMF2 Agent code made some now invalid assumptions about replyTo
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621430 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java b/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java index a5e1a46710..80acf93e55 100644 --- a/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java +++ b/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java @@ -199,8 +199,8 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent // Send heartbeat messages with a Time To Live (in msecs) set to two times the _heartbeatInterval // to prevent stale heartbeats from getting to the consoles. - _broadcaster.send(response, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, - _heartbeatInterval*2000); + _producer.send(_topicAddress, response, Message.DEFAULT_DELIVERY_MODE, + Message.DEFAULT_PRIORITY, _heartbeatInterval*2000); } catch (JMSException jmse) { @@ -311,46 +311,48 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent private MessageConsumer _mainConsumer; // _aliasConsumer is used for the alias address if the Agent is a broker Agent (used in Java Broker QMF plugin) private MessageConsumer _aliasConsumer; - private MessageProducer _responder; - private MessageProducer _broadcaster; - /** - * This contains a String representation of the broadcastAddress used to decide whether to use _responder or - * _broadcaster as the MessageProducer for Message responses. See the comments for the sendResponse() method below. - */ - private String _broadcastAddress; + private MessageProducer _producer; + + private String _quotedDirectBase; + private Destination _directAddress; + + private String _quotedTopicBase; + private Destination _topicAddress; // private implementation methods // ******************************************************************************************************** /** - * There's some slight "hackery" below. The Agent clearly needs to respond to requests and quite possibly using - * the JMS replyTo is the correct thing to do, however in older versions of Qpid invoking send() on the replyTo - * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't as good as it might be. - * To get around this the Agent actually uses the relevant exchange name as the core address and sets the Message - * "qpid.subject" with an appropriate Routing Key. The problem occurs if Console clients decide to use the - * qmf.default.topic as a replyTo instead of qmf.default.direct so we check the start of the replyTo using - * _broadcastAddress. It's slightly hacky because the Destination.toString() could change as it's implemenation - * specific. That shouldn't be too much of a pain as most clients should use qmf.default.direct. + * There's some slight "hackery" below. The Agent clearly needs to respond + * to requests and quite possibly using the JMS replyTo is the correct thing + * to do, however in older versions of Qpid invoking send() on the replyTo + * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't + * as good as it might be. To get around this the Agent uses exchange name + * as the core address and sets the Message "qpid.subject" property with an + * appropriate Routing Key. * @param handle the reply handle that contains the replyTo Address. * @param message the JMS Message to be sent. */ private final void sendResponse(final Handle handle, final Message message) throws JMSException { - // A replyTo looks a bit like 'qmf.default.direct'/'direct.95dab79b-0d3e-4214-9f55-f9efb146c101'; None - // so we check if it starts with 'qmf.default.topic' and if so use the _broadcaster MessageProducer - // otherwise use the _responder. N.B. if the Destination.toString() format changes this will fail and - // always sent to _responder, though *most* clients (hear me qpid-config!!!) use qmf.default.direct. + // Just in case the replyTo issues still exist check if the replyTo starts + // with qmf.default.topic or qmf.default.direct and if so send to the + // main topic or direct Destinations, if not fall back to using the real + // replyTo Destination. TODO check if original replyTo issue still exists. String replyTo = handle.getReplyTo().toString(); - - if (replyTo.startsWith(_broadcastAddress)) + if (replyTo.startsWith(_quotedTopicBase)) + { + _producer.send(_topicAddress, message); + } + else if (replyTo.startsWith(_quotedDirectBase)) { - _broadcaster.send(message); + _producer.send(_directAddress, message); } else { - _responder.send(message); + _producer.send(handle.getReplyTo(), message); } } @@ -452,7 +454,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent // taken by the C++ broker ManagementAgent, so if it's a problem here........ // N.B. the results list declared here is a generic List of Objects. We *must* only pass a List of - // Map to queryResponse(), but conversely if the response items are sortable we need tp sort them + // Map to queryResponse(), but conversely if the response items are sortable we need to sort them // before doing mapEncode(). Unfortunately we don't know if the items are sortable a priori so // we either add a Map or we add a QmfAgentData, then sort then mapEncode() each item. I'm not // sure of a more elegant way to do this without creating two lists, which might not be so bad @@ -1020,21 +1022,21 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent try { - String directBase = "qmf." + _domain + ".direct"; - String topicBase = "qmf." + _domain + ".topic"; - String address = directBase + "/" + _name + addressOptions; - _asyncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _syncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Create a MessageProducer for the QMF topic address used to broadcast Events & Heartbeats. - Destination topicAddress = _syncSession.createQueue(topicBase); - _broadcaster = _syncSession.createProducer(topicAddress); - _broadcastAddress = "'" + topicBase + "'"; + // Create a Destination for the QMF direct address, mainly used for request/response + String directBase = "qmf." + _domain + ".direct"; + _quotedDirectBase = "'" + directBase + "'"; + _directAddress = _syncSession.createQueue(directBase); - // Create a MessageProducer for the QMF direct address, mainly used for request/response - Destination directAddress = _syncSession.createQueue(directBase); - _responder = _syncSession.createProducer(directAddress); + // Create a Destination for the QMF topic address used to broadcast Events & Heartbeats. + String topicBase = "qmf." + _domain + ".topic"; + _quotedTopicBase = "'" + topicBase + "'"; + _topicAddress = _syncSession.createQueue(topicBase); + + // Create an unidentified MessageProducer for sending to various destinations. + _producer = _syncSession.createProducer(null); // TODO it should be possible to bind _locateConsumer, _mainConsumer and _aliasConsumer to the // same queue if I can figure out the correct AddressString to use, probably not a big deal though. @@ -1045,6 +1047,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent _locateConsumer.setMessageListener(this); // Set up MessageListener on the Agent address + String address = directBase + "/" + _name + addressOptions; Destination agentAddress = _asyncSession.createQueue(address); _mainConsumer = _asyncSession.createConsumer(agentAddress); _mainConsumer.setMessageListener(this); @@ -1158,7 +1161,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent List<Map> results = new ArrayList<Map>(); results.add(event.mapEncode()); AMQPMessage.setList(response, results); - _broadcaster.send(response); + _producer.send(_topicAddress, response); } catch (JMSException jmse) { |