summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java2
5 files changed, 54 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 855af2cfd0..a5c2688511 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -423,6 +423,15 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
+
+ public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+ {
+ if( consumer.getQueuename() != null)
+ {
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+ }
+ }
+
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName) throws AMQException, FailoverException;
@@ -2156,6 +2165,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ // store the consumer queue name
+ consumer.setQueuename(queueName);
+
// bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
new file mode 100644
index 0000000000..97e2898de4
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.Topic;
+
+public interface AMQTopicSubscriber
+{
+
+ void addBindingKey(Topic topic, String bindingKey) throws AMQException;
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index c008e30855..ba31a6102f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.AMQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,13 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
*/
private Thread _receivingThread;
+
+ /**
+ * Used to store this consumer queue name
+ * Usefull when more than binding key should be used
+ */
+ private AMQShortString _queuename;
+
/**
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
@@ -970,4 +978,19 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
{
// do nothing as this is a 0_10 feature
}
+
+ public AMQShortString getQueuename()
+ {
+ return _queuename;
+ }
+
+ public void setQueuename(AMQShortString queuename)
+ {
+ this._queuename = queuename;
+ }
+
+ public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException
+ {
+ _session.addBindingKey(this,amqd,routingKey);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
index d4bec5d906..507490d6fb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQException;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -30,7 +32,7 @@ import javax.jms.TopicSubscriber;
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor implements TopicSubscriber, AMQTopicSubscriber
{
private final Topic _topic;
private final BasicMessageConsumer _consumer;
@@ -123,4 +125,8 @@ class TopicSubscriberAdaptor implements TopicSubscriber
return _consumer;
}
+ public void addBindingKey(Topic topic, String bindingKey) throws AMQException
+ {
+ _consumer.addBindingKey((AMQDestination) topic, bindingKey);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 2d8f325d8a..465ae5763c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -53,7 +53,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected ByteBuffer _data;
private boolean _readableProperties = false;
protected boolean _readableMessage = false;
- protected boolean _changedData;
+ protected boolean _changedData = true;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
private BasicMessageConsumer _consumer;