summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-19 16:07:12 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-19 16:07:12 +0000
commit3085b02021e10aefd23de281f1ae71189a3c2ac8 (patch)
tree69f9c2cd527d3dadb907ff9374d448d1d53269e9 /java
parentee05152f188034e8b3cdc9e67c9e615febbf1d42 (diff)
downloadqpid-python-3085b02021e10aefd23de281f1ae71189a3c2ac8.tar.gz
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body and passed down channel to subscription. SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal parameter through to the Subscription ConnectionStartOkMethodHandler.java - Saved the client properties so the client identifier can be used in comparison with the publisher id to implement no_local AMQMinaProtocolSession.java - added _clientProperties to store the sent client properties. AMQProtocolSession.java - interface changes to get/set ClientProperties ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this will take care of the hasFilters optimisation check. SubscriptionImpl.java - Added code to do comparison of client ids to determin insterest in a given message. SubscriptionSet.java - tidied up code to use hasInterest as this is where the nolocal is implemented. ConnectionStartMethodHandler.java - Moved literal values to a ClientProperties.java enumeration and a QpidProperties.java values. QpidConnectionMetaData.java - updated to get values from QpidProperties.java MockProtocolSession.java - null implementation of new get/set methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488712 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ClientProperties.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/QpidProperties.java46
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java10
15 files changed, 210 insertions, 78 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index d8485ef0f2..117231b36e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -286,12 +286,14 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
+ * @param noLocal
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -302,7 +304,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters,noLocal);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -499,7 +501,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
- " and multiple " + multiple);
+ " and multiple " + multiple);
}
if (multiple)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index bf282020ee..1e57c714ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -77,7 +77,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments, body.noLocal);
if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
@@ -90,8 +91,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
_log.info("Closing connection due to invalid selector");
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
catch (ConsumerTagNotUniqueException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 00ae547683..79b2e11bca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -78,12 +78,19 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ //save clientProperties
+ if (protocolSession.getClientProperties() == null)
+ {
+ protocolSession.setClientProperties(body.clientProperties);
+ }
+
switch (authResult.status)
{
case ERROR:
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
+
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
static int getConfiguredFrameSize()
{
final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
_logger.info("Framesize set to " + framesize);
return framesize;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e623d23a79..6ba78ba722 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -26,17 +26,19 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -89,10 +91,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
-
+
/* AMQP Version for this session */
private byte _major;
private byte _minor;
+ private FieldTable _clientProperties;
public ManagedObject getManagedObject()
{
@@ -128,7 +131,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return new AMQProtocolSessionMBean(this);
}
- catch(JMException ex)
+ catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -153,18 +156,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
ProtocolInitiation pi = (ProtocolInitiation) message;
// this ensures the codec never checks for a PI message again
- ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
- try {
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
+ {
pi.checkVersion(this); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
_major = pi.protocolMajor;
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
mechanisms.getBytes(), locales.getBytes());
_minaProtocolSession.write(response);
- } catch (AMQException e) {
+ }
+ catch (AMQException e)
+ {
_logger.error("Received incorrect protocol initiation", e);
/* Find last protocol version in protocol version list. Make sure last protocol version
listed in the build file (build-module.xml) is the latest version which will be used
@@ -211,7 +217,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.debug("Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody)frame.bodyFrame);
+ (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
@@ -266,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -275,7 +281,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
}
/**
@@ -355,6 +361,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* Close a specific channel. This will remove any resources used by the channel, including:
* <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
* </ul>
+ *
* @param channelId id of the channel to close
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
@@ -381,6 +388,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* In our current implementation this is used by the clustering code.
+ *
* @param channelId
*/
public void removeChannel(int channelId)
@@ -390,11 +398,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
- if(delay > 0)
+ if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -404,6 +413,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Closes all channels that were opened by this protocol session. This frees up all resources
* used by the channel.
+ *
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
@@ -421,7 +431,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
*/
public void closeSession() throws AMQException
{
- if(!_closed)
+ if (!_closed)
{
_closed = true;
closeAllChannels();
@@ -463,11 +473,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
- return ((InetSocketAddress)address).getHostName();
+ return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
- return "vmpipe:" + ((VmPipeAddress)address).getPort();
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
@@ -484,22 +494,32 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_saslServer = saslServer;
}
-
+
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
+ }
+
/**
* Convenience methods for managing AMQP version.
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
-
+
public byte getAmqpMajor()
{
return _major;
}
-
+
public byte getAmqpMinor()
{
return _minor;
}
-
+
public boolean amqpVersionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index acaf6b0d9b..03d0c50dac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -122,4 +123,9 @@ public interface AMQProtocolSession
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index e64daef690..561b719b2e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -362,12 +362,17 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index d8bb6e1948..8bdadcb493 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -281,7 +281,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
// Only give the message to those that want them.
- if (sub.hasFilters() && sub.hasInterest(msg))
+ if (sub.hasInterest(msg))
{
sub.enqueueForPreDelivery(msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index f464384562..2bb77dc649 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -33,12 +33,10 @@ import org.apache.qpid.framing.FieldTable;
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 79b0593f69..fc00754cda 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -56,6 +57,7 @@ public class SubscriptionImpl implements Subscription
private Queue<AMQMessage> _messages;
+ private final boolean _noLocal;
/**
* True if messages need to be acknowledged
@@ -65,21 +67,15 @@ public class SubscriptionImpl implements Subscription
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
- }
-
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
- {
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
}
}
@@ -87,11 +83,11 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null);
+ this(channelId, protocolSession, consumerTag, acks, null, false);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters)
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -105,6 +101,8 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
_filters = FilterManagerFactory.createManager(filters);
if (_filters != null)
@@ -218,7 +216,22 @@ public class SubscriptionImpl implements Subscription
public boolean hasInterest(AMQMessage msg)
{
- return _filters.allAllow(msg);
+ if (_noLocal)
+ {
+ return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+ }
+ else
+ {
+ if (_filters != null)
+ {
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ return true;
+ }
+ }
}
public Queue<AMQMessage> getPreDeliveryQueue()
@@ -235,8 +248,6 @@ public class SubscriptionImpl implements Subscription
}
-
-
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index a4afe18e4d..91e720ea54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -139,22 +139,15 @@ class SubscriptionSet implements WeightedSubscriptionManager
if (!subscription.isSuspended())
{
- if (!subscription.hasFilters())
+ if (subscription.hasInterest(msg))
{
- return subscription;
- }
- else
- {
- if (subscription.hasInterest(msg))
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (subscription.getPreDeliveryQueue().isEmpty())
- {
- return subscription;
- }
+ return subscription;
}
}
}
@@ -208,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue)
diff --git a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
index 6ab7808110..d9e946c397 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.common.QpidProperties;
+
import java.util.Enumeration;
import javax.jms.ConnectionMetaData;
@@ -29,7 +31,6 @@ public class QpidConnectionMetaData implements ConnectionMetaData
{
-
QpidConnectionMetaData(AMQConnection conn)
{
}
@@ -46,7 +47,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getJMSProviderName() throws JMSException
{
- return "Apache Qpid";
+ return "Apache " + QpidProperties.getProductName();
}
public String getJMSVersion() throws JMSException
@@ -71,8 +72,8 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getProviderVersion() throws JMSException
{
- return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
- + getProtocolVersion() + "] )";
+ return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
}
private String getProtocolVersion()
@@ -89,8 +90,7 @@ public class QpidConnectionMetaData implements ConnectionMetaData
public String getClientVersion()
{
- // TODO - get client build version from properties file or similar
- return "<unknown>";
+ return QpidProperties.getBuildVerision();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 9333df3fe4..f7b0cb5331 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,11 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put("instance", ps.getClientID());
- clientProperties.put("product", "Qpid");
- clientProperties.put("version", "1.0");
- clientProperties.put("platform", getFullSystemInfo());
+
+ clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVerision());
+ clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
saslResponse, selectedLocale));
}
diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
new file mode 100644
index 0000000000..07371b5182
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public enum ClientProperties
+{
+ instance,
+ product,
+ version,
+ platform
+}
diff --git a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
new file mode 100644
index 0000000000..3a96821e93
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public class QpidProperties
+{
+
+ static
+ {
+ //load values from property file.
+ }
+
+ public static String getProductName()
+ {
+ return "Qpid";
+ }
+
+ public static String getReleaseVerision()
+ {
+ return "1.0";
+ }
+
+
+ public static String getBuildVerision()
+ {
+ return "1";
+ }
+}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index bb8fd5bc19..87e5c43932 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
@@ -121,4 +122,13 @@ public class MockProtocolSession implements AMQProtocolSession
public void setSaslServer(SaslServer saslServer)
{
}
+
+ public FieldTable getClientProperties()
+ {
+ return null;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ }
}