summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main')
-rwxr-xr-xqpid/java/client/src/main/java/client.bnd29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java3
10 files changed, 126 insertions, 25 deletions
diff --git a/qpid/java/client/src/main/java/client.bnd b/qpid/java/client/src/main/java/client.bnd
index 8f0f936583..0ddd163d4f 100755
--- a/qpid/java/client/src/main/java/client.bnd
+++ b/qpid/java/client/src/main/java/client.bnd
@@ -1,7 +1,26 @@
-ver: 0.7.0
-
-Bundle-SymbolicName: qpid-client
-Bundle-Version: ${ver}
-Export-Package: *;version=${ver}
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
+
+Bundle-SymbolicName: qpid-client
+Bundle-Version: ${ver}
+Export-Package: *;version=${ver}
Bundle-RequiredExecutionEnvironment: J2SE-1.5
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index dbd742070e..ee3e0767d4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1452,16 +1452,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
- // deliver the exception if there is a listener
- if (_exceptionListener != null)
- {
- _exceptionListener.onException(je);
- }
- else
- {
- _logger.error("Throwable Received but no listener set: " + cause);
- }
// if we are closing the connection, close sessions first
if (closer)
@@ -1475,6 +1465,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.error("Error closing all sessions: " + e, e);
}
}
+
+ // deliver the exception if there is a listener
+ if (_exceptionListener != null)
+ {
+ _exceptionListener.onException(je);
+ }
+ else
+ {
+ _logger.error("Throwable Received but no listener set: " + cause);
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 24e5253cc8..75f71a99c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -577,7 +577,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -593,9 +594,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
preAcquire = !consumer.isNoConsume() &&
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
+
+ arguments.putAll(
+ (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 905bf5e111..4bac54b3e4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -107,7 +107,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
- private final boolean _exclusive;
+ protected boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
@@ -182,7 +182,7 @@ public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSes
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
-
+
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_noConsume = noConsume;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index d0f1f79631..699b52a6b1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -490,4 +490,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
+
+ public boolean isExclusive()
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index cae11e3962..32c7ef29de 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -223,8 +223,10 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
- routingKey == null ? null : new AMQShortString(routingKey));
+ dest = generateDestination(exchange == null ? new AMQShortString("") :
+ new AMQShortString(exchange),
+ routingKey == null ? new AMQShortString(""):
+ new AMQShortString(routingKey));
_destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 64d5b16db0..00503cc650 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.messaging.address.Node.UnknownNodeType;
@@ -264,6 +265,7 @@ public class AddressHelper
public Link getLink()
{
Link link = new Link();
+ link.setSubscription(new Subscription());
if (linkProps != null)
{
link.setDurable(linkProps.getBoolean(DURABLE) == null ? false
@@ -283,7 +285,8 @@ public class AddressHelper
.setProducerCapacity(capacityProps
.getInt(CAPACITY_TARGET) == null ? 0
: capacityProps.getInt(CAPACITY_TARGET));
- } else
+ }
+ else
{
int cap = linkProps.getInt(CAPACITY) == null ? 0 : linkProps
.getInt(CAPACITY);
@@ -292,6 +295,21 @@ public class AddressHelper
}
link.setFilter(linkProps.getString(FILTER));
// so far filter type not used
+
+ if (((Map) address.getOptions().get(LINK)).containsKey(X_SUBSCRIBE))
+ {
+ Map x_subscribe = (Map)((Map) address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
+
+ link.getSubscription().setExclusive(exclusive);
+ }
}
return link;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index 0ebcaf548b..a7d19d1bd5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.messaging.address;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.client.messaging.address.Node.QueueNode;
public class Link
@@ -34,6 +37,7 @@ public class Link
protected int _consumerCapacity = 0;
protected int _producerCapacity = 0;
protected Node node;
+ protected Subscription subscription;
public Node getNode()
{
@@ -114,4 +118,40 @@ public class Link
{
this.name = name;
}
+
+ public Subscription getSubscription()
+ {
+ return this.subscription;
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ this.subscription = subscription;
+ }
+
+ public static class Subscription
+ {
+ private Map<String,Object> args = new HashMap<String,Object>();
+ private boolean exclusive = false;
+
+ public Map<String, Object> getArgs()
+ {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args)
+ {
+ this.args = args;
+ }
+
+ public boolean isExclusive()
+ {
+ return exclusive;
+ }
+
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index edfb4bb16b..10250a1ac0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -66,7 +66,6 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
import org.slf4j.Logger;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 4236f20301..44376331ee 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -112,8 +112,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this);
_connection = connection;
}