diff options
Diffstat (limited to 'java')
5 files changed, 54 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 855af2cfd0..a5c2688511 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -423,6 +423,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } + + public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException + { + if( consumer.getQueuename() != null) + { + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName()); + } + } + public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName) throws AMQException, FailoverException; @@ -2156,6 +2165,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQShortString queueName = declareQueue(amqd, protocolHandler); + // store the consumer queue name + consumer.setQueuename(queueName); + // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java new file mode 100644 index 0000000000..97e2898de4 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java @@ -0,0 +1,11 @@ +package org.apache.qpid.client; + +import org.apache.qpid.AMQException; + +import javax.jms.Topic; + +public interface AMQTopicSubscriber +{ + + void addBindingKey(Topic topic, String bindingKey) throws AMQException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index c008e30855..ba31a6102f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; +import org.apache.qpid.AMQException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +144,13 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me */ private Thread _receivingThread; + + /** + * Used to store this consumer queue name + * Usefull when more than binding key should be used + */ + private AMQShortString _queuename; + /** * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive * on the queue. This is used for queue browsing. @@ -970,4 +978,19 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me { // do nothing as this is a 0_10 feature } + + public AMQShortString getQueuename() + { + return _queuename; + } + + public void setQueuename(AMQShortString queuename) + { + this._queuename = queuename; + } + + public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException + { + _session.addBindingKey(this,amqd,routingKey); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java index d4bec5d906..507490d6fb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQException; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -30,7 +32,7 @@ import javax.jms.TopicSubscriber; * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract * */ -class TopicSubscriberAdaptor implements TopicSubscriber +class TopicSubscriberAdaptor implements TopicSubscriber, AMQTopicSubscriber { private final Topic _topic; private final BasicMessageConsumer _consumer; @@ -123,4 +125,8 @@ class TopicSubscriberAdaptor implements TopicSubscriber return _consumer; } + public void addBindingKey(Topic topic, String bindingKey) throws AMQException + { + _consumer.addBindingKey((AMQDestination) topic, bindingKey); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 2d8f325d8a..465ae5763c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -53,7 +53,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected ByteBuffer _data; private boolean _readableProperties = false; protected boolean _readableMessage = false; - protected boolean _changedData; + protected boolean _changedData = true; private Destination _destination; private JMSHeaderAdapter _headerAdapter; private BasicMessageConsumer _consumer; |