summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFraser Adams <fadams@apache.org>2014-08-30 08:48:02 +0000
committerFraser Adams <fadams@apache.org>2014-08-30 08:48:02 +0000
commitb85515ffc6ee8884d58ff72f759d00b02472350f (patch)
treea19be47d48df7fb1d39f4c0f1b2781b3eb1468fc
parent0363349e3bb06c650d9a039f891996d7eebfbe7a (diff)
downloadqpid-python-b85515ffc6ee8884d58ff72f759d00b02472350f.tar.gz
QPID-6059: Java QMF2 Agent code made some now invalid assumptions about replyTo
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621430 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java79
1 files changed, 41 insertions, 38 deletions
diff --git a/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java b/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java
index a5e1a46710..80acf93e55 100644
--- a/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java
+++ b/tools/src/java/qpid-qmf2/src/main/java/org/apache/qpid/qmf2/agent/Agent.java
@@ -199,8 +199,8 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
// Send heartbeat messages with a Time To Live (in msecs) set to two times the _heartbeatInterval
// to prevent stale heartbeats from getting to the consoles.
- _broadcaster.send(response, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY,
- _heartbeatInterval*2000);
+ _producer.send(_topicAddress, response, Message.DEFAULT_DELIVERY_MODE,
+ Message.DEFAULT_PRIORITY, _heartbeatInterval*2000);
}
catch (JMSException jmse)
{
@@ -311,46 +311,48 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
private MessageConsumer _mainConsumer;
// _aliasConsumer is used for the alias address if the Agent is a broker Agent (used in Java Broker QMF plugin)
private MessageConsumer _aliasConsumer;
- private MessageProducer _responder;
- private MessageProducer _broadcaster;
- /**
- * This contains a String representation of the broadcastAddress used to decide whether to use _responder or
- * _broadcaster as the MessageProducer for Message responses. See the comments for the sendResponse() method below.
- */
- private String _broadcastAddress;
+ private MessageProducer _producer;
+
+ private String _quotedDirectBase;
+ private Destination _directAddress;
+
+ private String _quotedTopicBase;
+ private Destination _topicAddress;
// private implementation methods
// ********************************************************************************************************
/**
- * There's some slight "hackery" below. The Agent clearly needs to respond to requests and quite possibly using
- * the JMS replyTo is the correct thing to do, however in older versions of Qpid invoking send() on the replyTo
- * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't as good as it might be.
- * To get around this the Agent actually uses the relevant exchange name as the core address and sets the Message
- * "qpid.subject" with an appropriate Routing Key. The problem occurs if Console clients decide to use the
- * qmf.default.topic as a replyTo instead of qmf.default.direct so we check the start of the replyTo using
- * _broadcastAddress. It's slightly hacky because the Destination.toString() could change as it's implemenation
- * specific. That shouldn't be too much of a pain as most clients should use qmf.default.direct.
+ * There's some slight "hackery" below. The Agent clearly needs to respond
+ * to requests and quite possibly using the JMS replyTo is the correct thing
+ * to do, however in older versions of Qpid invoking send() on the replyTo
+ * causes spurious exchangeDeclares to occur and the caching of replyTo wasn't
+ * as good as it might be. To get around this the Agent uses exchange name
+ * as the core address and sets the Message "qpid.subject" property with an
+ * appropriate Routing Key.
* @param handle the reply handle that contains the replyTo Address.
* @param message the JMS Message to be sent.
*/
private final void sendResponse(final Handle handle, final Message message) throws JMSException
{
- // A replyTo looks a bit like 'qmf.default.direct'/'direct.95dab79b-0d3e-4214-9f55-f9efb146c101'; None
- // so we check if it starts with 'qmf.default.topic' and if so use the _broadcaster MessageProducer
- // otherwise use the _responder. N.B. if the Destination.toString() format changes this will fail and
- // always sent to _responder, though *most* clients (hear me qpid-config!!!) use qmf.default.direct.
+ // Just in case the replyTo issues still exist check if the replyTo starts
+ // with qmf.default.topic or qmf.default.direct and if so send to the
+ // main topic or direct Destinations, if not fall back to using the real
+ // replyTo Destination. TODO check if original replyTo issue still exists.
String replyTo = handle.getReplyTo().toString();
-
- if (replyTo.startsWith(_broadcastAddress))
+ if (replyTo.startsWith(_quotedTopicBase))
+ {
+ _producer.send(_topicAddress, message);
+ }
+ else if (replyTo.startsWith(_quotedDirectBase))
{
- _broadcaster.send(message);
+ _producer.send(_directAddress, message);
}
else
{
- _responder.send(message);
+ _producer.send(handle.getReplyTo(), message);
}
}
@@ -452,7 +454,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
// taken by the C++ broker ManagementAgent, so if it's a problem here........
// N.B. the results list declared here is a generic List of Objects. We *must* only pass a List of
- // Map to queryResponse(), but conversely if the response items are sortable we need tp sort them
+ // Map to queryResponse(), but conversely if the response items are sortable we need to sort them
// before doing mapEncode(). Unfortunately we don't know if the items are sortable a priori so
// we either add a Map or we add a QmfAgentData, then sort then mapEncode() each item. I'm not
// sure of a more elegant way to do this without creating two lists, which might not be so bad
@@ -1020,21 +1022,21 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
try
{
- String directBase = "qmf." + _domain + ".direct";
- String topicBase = "qmf." + _domain + ".topic";
- String address = directBase + "/" + _name + addressOptions;
-
_asyncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_syncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Create a MessageProducer for the QMF topic address used to broadcast Events & Heartbeats.
- Destination topicAddress = _syncSession.createQueue(topicBase);
- _broadcaster = _syncSession.createProducer(topicAddress);
- _broadcastAddress = "'" + topicBase + "'";
+ // Create a Destination for the QMF direct address, mainly used for request/response
+ String directBase = "qmf." + _domain + ".direct";
+ _quotedDirectBase = "'" + directBase + "'";
+ _directAddress = _syncSession.createQueue(directBase);
- // Create a MessageProducer for the QMF direct address, mainly used for request/response
- Destination directAddress = _syncSession.createQueue(directBase);
- _responder = _syncSession.createProducer(directAddress);
+ // Create a Destination for the QMF topic address used to broadcast Events & Heartbeats.
+ String topicBase = "qmf." + _domain + ".topic";
+ _quotedTopicBase = "'" + topicBase + "'";
+ _topicAddress = _syncSession.createQueue(topicBase);
+
+ // Create an unidentified MessageProducer for sending to various destinations.
+ _producer = _syncSession.createProducer(null);
// TODO it should be possible to bind _locateConsumer, _mainConsumer and _aliasConsumer to the
// same queue if I can figure out the correct AddressString to use, probably not a big deal though.
@@ -1045,6 +1047,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
_locateConsumer.setMessageListener(this);
// Set up MessageListener on the Agent address
+ String address = directBase + "/" + _name + addressOptions;
Destination agentAddress = _asyncSession.createQueue(address);
_mainConsumer = _asyncSession.createConsumer(agentAddress);
_mainConsumer.setMessageListener(this);
@@ -1158,7 +1161,7 @@ public class Agent extends QmfData implements MessageListener, SubscribableAgent
List<Map> results = new ArrayList<Map>();
results.add(event.mapEncode());
AMQPMessage.setList(response, results);
- _broadcaster.send(response);
+ _producer.send(_topicAddress, response);
}
catch (JMSException jmse)
{