summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-03-05 16:04:16 +0000
commit6713bfc5ddc1ff6202dad0d950a252273f73f795 (patch)
treed789ac52d18fdc493f5d7e1731384c43cbfde9f1
parent58c93e3b5e6c2227cc0018720a8781b25ec0d288 (diff)
downloadqpid-python-6713bfc5ddc1ff6202dad0d950a252273f73f795.tar.gz
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java210
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java10
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java2
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java51
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java2
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java42
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java99
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java3
41 files changed, 515 insertions, 115 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index eb3b665242..bd0411619e 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -585,7 +585,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
_messageId = messageId;
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
return null;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 2a688f497a..e01f4b7db9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -423,11 +423,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
final List<? extends BaseQueue> route(final ServerMessage message,
+ final String routingAddress,
final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
- List<? extends BaseQueue> queues = doRoute(message, instanceProperties);
+ List<? extends BaseQueue> queues = doRoute(message, routingAddress, instanceProperties);
List<? extends BaseQueue> allQueues = queues;
boolean deletedQueues = false;
@@ -464,18 +465,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
- List<? extends BaseQueue> queues = route(message, instanceProperties);
+ List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
if(queues == null || queues.isEmpty())
{
ExchangeImpl altExchange = getAlternateExchange();
if(altExchange != null)
{
- return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -515,6 +517,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
+ final String routingAddress,
final InstanceProperties instanceProperties);
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index f59049d276..123a4f0a63 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -50,13 +50,31 @@ public class DefaultDestination implements MessageDestination
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
- final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ final AMQQueue q = _virtualHost.getQueue(routingAddress);
if(q == null)
{
+ if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
+ {
+ String[] parts = routingAddress.split("/",2);
+ ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
+ if(exchange != null)
+ {
+ return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
+ }
+ }
+ else if(!routingAddress.contains("/"))
+ {
+ ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
+ if(exchange != null)
+ {
+ return exchange.send(message, "", instanceProperties, txn, postEnqueueAction);
+ }
+ }
return 0;
}
else
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 0e8cfb72a1..a67cacf821 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -143,11 +143,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
}
@Override
- public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public List<? extends BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
- final String routingKey = payload.getRoutingKey();
-
BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
if(bindings != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index c7f81f1d15..b7810e8112 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -79,7 +79,9 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
for(BindingImpl b : getBindings())
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 60df38af55..9d3ce0a415 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -93,7 +93,9 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingKey,
+ final InstanceProperties instanceProperties)
{
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index e7236bdf3e..db73e842b8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -157,12 +157,14 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
}
@Override
- public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload,
+ final String routingAddress,
+ final InstanceProperties instanceProperties)
{
- final String routingKey = payload.getRoutingKey() == null
+ final String routingKey = routingAddress == null
? ""
- : payload.getRoutingKey();
+ : routingAddress;
final Collection<AMQQueue> matchedQueues =
getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
@@ -181,7 +183,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
if(queues == null || queues.isEmpty())
{
- _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
+ _logger.info("Message routing key: " + routingAddress + " No routes.");
}
return queues;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 155f209ffb..1913f11ae1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -32,14 +32,18 @@ public interface MessageDestination extends MessageNode
/**
* Routes a message
+ *
+ *
* @param message the message to be routed
+ * @param routingAddress
* @param instanceProperties the instance properties
* @param txn the transaction to enqueue within
* @param postEnqueueAction action to perform on the result of every enqueue (may be null)
* @return the number of queues in which the message was enqueued performed
*/
<M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- Action<? super MessageInstance> postEnqueueAction);
+ final String routingAddress,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ Action<? super MessageInstance> postEnqueueAction);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 28491edaba..8c35af8be4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
- String getRoutingKey();
+ String getInitialRoutingAddress();
AMQMessageHeader getMessageHeader();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 6375cfb07d..fdc2fa90a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -44,6 +44,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
private final Object _messageBody;
private final int _contentSize;
private InternalMessageHeader _header;
+ private String _initialRoutingAddress;
InternalMessage(final StoredMessage<InternalMessageMetaData> handle,
@@ -80,9 +81,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
- return null;
+ return _initialRoutingAddress;
}
@Override
@@ -253,4 +254,8 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
+ public void setInitialRoutingAddress(final String initialRoutingAddress)
+ {
+ _initialRoutingAddress = initialRoutingAddress;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 62634970a6..11eb0b8a19 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -70,7 +70,6 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import javax.management.NotificationListener;
import javax.security.auth.Subject;
public abstract class AbstractQueue
@@ -2465,9 +2464,10 @@ public abstract class AbstractQueue
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
txn.enqueue(this,message, new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 9814431beb..91148b1dc0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -377,6 +377,7 @@ public abstract class QueueEntryImpl implements QueueEntry
if (alternateExchange != null)
{
enqueues = alternateExchange.send(getMessage(),
+ getMessage().getInitialRoutingAddress(),
getInstanceProperties(),
txn,
action);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index be0704aeaa..fa75d41810 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -127,7 +127,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, null);
- List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -136,7 +136,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
- result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -144,14 +144,14 @@ public class FanoutExchangeTest extends TestCase
_exchange.deleteBinding("key",queue2);
- result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
- result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to queue1 only", 1, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -160,7 +160,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
- result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
+ result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index e4e07813c7..76752de5d0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -73,7 +73,7 @@ public class HeadersExchangeTest extends TestCase
protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
{
- List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
+ List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY);
List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
unexpected.removeAll(Arrays.asList(expected));
assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 1c191b7b2e..21aa171551 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -324,8 +324,8 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber)
{
ServerMessage message = mock(ServerMessage.class);
- when(message.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
+ when(message.getInitialRoutingAddress()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index bd43100cd2..5622383f3f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -119,7 +119,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
return null;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3b74110a6e..8992cf62c9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -67,7 +67,7 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
throw new NotImplementedException();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 69c625d41d..120ba2d951 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -417,7 +417,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
- msg.getRoutingKey()));
+ msg.getInitialRoutingAddress()));
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 37bbd810b4..7ff3873856 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -30,17 +30,8 @@ import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.typedmessage.TypedBytesContentReader;
-import org.apache.qpid.typedmessage.TypedBytesFormatException;
-import java.io.EOFException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage>
{
@@ -123,7 +114,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
};
}
- private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
{
DeliveryProperties deliveryProps = new DeliveryProperties();
MessageProperties messageProps = new MessageProperties();
@@ -132,7 +123,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
deliveryProps.setExpiration(serverMsg.getExpiration());
deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
@@ -142,7 +133,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
{
messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
}
-
+ messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap());
Header header = new Header(deliveryProps, messageProps, null);
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index df4c398115..9ebf4570d0 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -33,7 +33,6 @@ import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -127,7 +126,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
deliveryProps.setExpiration(serverMsg.getExpiration());
deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 487862bcba..869ac01c4e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -41,7 +41,7 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
return getStoredMessage().getMetaData();
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
return getMetaData().getRoutingKey();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 236a955ea9..5627b2eabe 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -47,7 +47,6 @@ import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -58,7 +57,6 @@ import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
@@ -199,7 +197,10 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
- int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
+ int enqueues = exchange.send(message,
+ message.getInitialRoutingAddress(),
+ instanceProperties, _transaction, _checkCapacityAction
+ );
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
return enqueues;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 9e0c5b6be6..99068a9d6c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -378,8 +378,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
}
};
- int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
- immediate ? _immediateAction : _capacityCheckAction);
+ int enqueues = _currentMessage.getDestination().send(amqMessage,
+ amqMessage.getInitialRoutingAddress(),
+ instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction
+ );
if(enqueues == 0)
{
handleUnroutableMessage(amqMessage);
@@ -1574,7 +1577,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
if (altExchange == null)
{
_logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress()));
}
else
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 833f5fb06f..0ed63daf7c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -71,7 +71,7 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
return getMessageMetaData().getContentHeaderBody();
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
MessageMetaData messageMetaData = getMessageMetaData();
if (messageMetaData != null)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index f2bb95c8d5..9ba212d1ed 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -125,7 +125,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
};
}
- private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
{
MessagePublishInfo publishInfo = new MessagePublishInfo()
@@ -133,7 +133,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
@Override
public AMQShortString getExchange()
{
- return null;
+ return AMQShortString.EMPTY_STRING;
}
@Override
@@ -157,7 +157,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
@Override
public AMQShortString getRoutingKey()
{
- return null;
+ return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress());
}
};
@@ -174,6 +174,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
props.setUserId(serverMsg.getMessageHeader().getUserId());
+
Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
@@ -184,6 +185,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
props.setHeaders(FieldTable.convertToFieldTable(headerProps));
final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ chb.setBodySize(size);
return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index f35d37ecbd..b244e7626f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -20,20 +20,27 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
import org.apache.qpid.typedmessage.TypedBytesFormatException;
+import org.apache.qpid.url.AMQBindingURL;
import java.io.EOFException;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage>
{
@@ -58,9 +65,210 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
Object body = convertMessageBody(mimeType, data);
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body);
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(),
+ new DelegatingMessageHeader(serverMessage.getMessageHeader()), body);
}
+ private static class ReplyToComponents
+ {
+ private String _exchange;
+ private String _queue;
+ private String _routingKey;
+
+ public void setExchange(final String exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void setQueue(final String queue)
+ {
+ _queue = queue;
+ }
+
+ public void setRoutingKey(final String routingKey)
+ {
+ _routingKey = routingKey;
+ }
+
+ public String getExchange()
+ {
+ return _exchange;
+ }
+
+ public String getQueue()
+ {
+ return _queue;
+ }
+
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public boolean hasExchange()
+ {
+ return _exchange != null;
+ }
+
+ public boolean hasQueue()
+ {
+ return _queue != null;
+ }
+
+ public boolean hasRoutingKey()
+ {
+ return _routingKey != null;
+ }
+ }
+
+ private static class DelegatingMessageHeader implements AMQMessageHeader
+ {
+ private final AMQMessageHeader _delegate;
+
+ private DelegatingMessageHeader(final AMQMessageHeader delegate)
+ {
+ _delegate = delegate;
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return _delegate.getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return _delegate.getExpiration();
+ }
+
+ @Override
+ public String getUserId()
+ {
+ return _delegate.getUserId();
+ }
+
+ @Override
+ public String getAppId()
+ {
+ return _delegate.getAppId();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return _delegate.getMessageId();
+ }
+
+ @Override
+ public String getMimeType()
+ {
+ return _delegate.getMimeType();
+ }
+
+ @Override
+ public String getEncoding()
+ {
+ return _delegate.getEncoding();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return _delegate.getPriority();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return _delegate.getTimestamp();
+ }
+
+ @Override
+ public String getType()
+ {
+ return _delegate.getType();
+ }
+
+ @Override
+ public String getReplyTo()
+ {
+ String originalReplyTo = _delegate.getReplyTo();
+ ReplyToComponents replyTo = convertReplyTo(originalReplyTo);
+ if(replyTo != null)
+ {
+ if(replyTo.hasExchange())
+ {
+ return replyTo.getExchange() + (replyTo.hasRoutingKey() ? "/" + replyTo.getRoutingKey() : "");
+ }
+ else
+ {
+ return replyTo.hasQueue() ? replyTo.getQueue() : replyTo.getRoutingKey();
+ }
+ }
+ else
+ {
+ return originalReplyTo;
+ }
+ }
+
+ private ReplyToComponents convertReplyTo(final String origReplyToString)
+ {
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ ReplyToComponents replyTo = new ReplyToComponents();
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+
+ AMQShortString queueName = burl.getQueueName();
+ if(queueName != null)
+ {
+ replyTo.setQueue(queueName.asString());
+ }
+ return replyTo;
+ }
+ catch (URISyntaxException e)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public Object getHeader(final String name)
+ {
+ return _delegate.getHeader(name);
+ }
+
+ @Override
+ public boolean containsHeaders(final Set<String> names)
+ {
+ return _delegate.containsHeaders(names);
+ }
+
+ @Override
+ public boolean containsHeader(final String name)
+ {
+ return _delegate.containsHeader(name);
+ }
+
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return _delegate.getHeaderNames();
+ }
+ }
+
+
private static Object convertMessageBody(String mimeType, byte[] data)
{
if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index d83665ad39..fc2c0d93d0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -76,7 +76,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- int enqueues = _exchange.send(message, instanceProperties, txn, null);
+ int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
return enqueues == 0 ? REJECTED : ACCEPTED;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index d5a349304c..4540308f61 100755
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -563,6 +563,11 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
{
return _properties == null ? null : _properties.getTo();
}
+
+ public Map<String, Object> getHeadersAsMap()
+ {
+ return new HashMap<String, Object>(_appProperties);
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 66094f52f0..36796851e0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -69,7 +69,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
_arrivalTime = System.currentTimeMillis();
}
- public String getRoutingKey()
+ public String getInitialRoutingAddress()
{
Object routingKey = getMessageHeader().getHeader("routing-key");
if(routingKey != null)
@@ -78,7 +78,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
}
else
{
- return getMessageHeader().getSubject();
+ return getMessageHeader().getTo();
}
}
@@ -92,12 +92,6 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
return getMessageMetaData().getMessageHeader();
}
- public boolean isRedelivered()
- {
- // TODO
- return false;
- }
-
public long getSize()
{
long size = 0l;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index f7f049831e..dedb8a3dc0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -76,7 +76,7 @@ public class NodeReceivingDestination implements ReceivingDestination
return null;
}};
- int enqueues = _exchange.send(message, instanceProperties, txn, null);
+ int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null);
return enqueues == 0 ? REJECTED : ACCEPTED;
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
index a70bd4b243..26dd8e5f37 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
@@ -115,7 +115,7 @@ public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<Messa
}
}
- props.setSubject(serverMessage.getRoutingKey());
+ props.setSubject(serverMessage.getInitialRoutingAddress());
if(msgProps.hasUserId())
{
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 0f0197cb63..a3c7ea31e0 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
@@ -33,6 +34,7 @@ import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
import java.nio.ByteBuffer;
@@ -53,16 +55,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
@Override
public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost)
{
- return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+ return new MessageTransferMessage(convertToStoredMessage(serverMsg, vhost), null);
}
- private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg)
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg,
+ final VirtualHost vhost)
{
Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg);
final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject);
final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
+ vhost,
MessageConverter_from_1_0.getBodyMimeType(bodyObject),
messageContent.length);
@@ -119,25 +123,54 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
};
}
- private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+ private MessageMetaData_0_10 convertMetaData(Message_1_0 serverMsg,
+ final VirtualHost vhost,
+ final String bodyMimeType,
+ final int size)
{
DeliveryProperties deliveryProps = new DeliveryProperties();
MessageProperties messageProps = new MessageProperties();
+ final AMQMessageHeader origHeader = serverMsg.getMessageHeader();
deliveryProps.setExpiration(serverMsg.getExpiration());
- deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
- deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(origHeader.getPriority()));
+ deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress());
+ deliveryProps.setTimestamp(origHeader.getTimestamp());
- messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentEncoding(origHeader.getEncoding());
messageProps.setContentLength(size);
messageProps.setContentType(bodyMimeType);
- if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ if(origHeader.getCorrelationId() != null)
{
- messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ messageProps.setCorrelationId(origHeader.getCorrelationId().getBytes());
}
+ final String origReplyTo = origHeader.getReplyTo();
+ if(origReplyTo != null && !origReplyTo.equals(""))
+ {
+ ReplyTo replyTo;
+ if(origReplyTo.startsWith("/"))
+ {
+ replyTo = new ReplyTo("",origReplyTo);
+ }
+ else if(origReplyTo.contains("/"))
+ {
+ String[] parts = origReplyTo.split("/",2);
+ replyTo = new ReplyTo(parts[0],parts[1]);
+ }
+ else if(vhost.getExchange(origReplyTo) != null)
+ {
+ replyTo = new ReplyTo(origReplyTo,"");
+ }
+ else
+ {
+ replyTo = new ReplyTo("",origReplyTo);
+ }
+ messageProps.setReplyTo(replyTo);
+ }
+
+ messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap());
Header header = new Header(deliveryProps, messageProps, null);
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 544099f1f2..dd371acc3d 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -132,7 +132,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
deliveryProps.setExpiration(message_0_8.getExpiration());
deliveryProps.setImmediate(message_0_8.isImmediate());
deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
- deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+ deliveryProps.setRoutingKey(message_0_8.getInitialRoutingAddress());
deliveryProps.setTimestamp(properties.getTimestamp());
messageProps.setContentEncoding(properties.getEncodingAsString());
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
index bbea177260..a0026ccd8f 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -37,6 +38,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+import org.apache.qpid.url.AMQBindingURL;
public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
{
@@ -102,9 +104,45 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess
{
props.setMessageId(new Binary(messageId.getBytes()));
}
- props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+ final String originalReplyTo = String.valueOf(contentHeader.getReplyTo());
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(originalReplyTo);
+ String replyTo;
+
+ if(burl.getExchangeName() != null && !burl.getExchangeName().equals(AMQShortString.EMPTY_STRING))
+ {
+ replyTo = burl.getExchangeName().asString();
+
+ if(burl.getRoutingKey() != null)
+ {
+ replyTo += "/" + burl.getRoutingKey().asString();
+ }
+
+ }
+ else if(burl.getQueueName() != null && !burl.getQueueName().equals(AMQShortString.EMPTY_STRING))
+ {
+ replyTo = burl.getQueueName().asString();
+ }
+ else if(burl.getRoutingKey() != null)
+ {
+ replyTo = burl.getRoutingKey().asString();
+ }
+ else
+ {
+ replyTo = originalReplyTo;
+ }
+
+ props.setReplyTo(replyTo);
+ }
+ catch (URISyntaxException e)
+ {
+ props.setReplyTo(originalReplyTo);
+ }
+
+
- props.setSubject(serverMessage.getRoutingKey());
+ props.setSubject(serverMessage.getInitialRoutingAddress());
if(contentHeader.getUserId() != null)
{
props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 6029b09466..a6c5131222 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -44,6 +44,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -261,9 +262,10 @@ class ManagementNode implements MessageSource, MessageDestination
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
- final InstanceProperties instanceProperties,
- final ServerTransaction txn,
- final Action<? super MessageInstance> postEnqueueAction)
+ final String routingAddress,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance> postEnqueueAction)
{
@SuppressWarnings("unchecked")
@@ -361,11 +363,19 @@ class ManagementNode implements MessageSource, MessageDestination
ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo());
+ response.setInitialRoutingAddress(message.getMessageHeader().getReplyTo());
if(consumer != null)
{
// TODO - check same owner
consumer.send(response);
}
+ else
+ {
+ _virtualHost.getDefaultDestination().send(response,
+ message.getMessageHeader().getReplyTo(), InstanceProperties.EMPTY,
+ new AutoCommitTransaction(_virtualHost.getMessageStore()),
+ null);
+ }
// TODO - route to a queue
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 91c23ff384..6c421a9610 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -112,6 +112,11 @@ public abstract class AMQDestination implements Destination, Referenceable
_name = name;
}
+ public boolean neverDeclare()
+ {
+ return false;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8224c77ba9..29c2a3b279 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2864,16 +2864,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (_declareExchanges)
+ if (_declareExchanges && !amqd.neverDeclare())
{
declareExchange(amqd, nowait);
}
- if (_delareQueues || amqd.isNameRequired())
+ if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
- if (_bindQueues)
+ if (_bindQueues && !amqd.neverDeclare())
{
if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
index fa2afb3ee4..ec9c595f99 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
@@ -37,4 +37,10 @@ public class AMQUndefinedDestination extends AMQDestination
{
return getAMQQueueName() == null;
}
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 5acaa5c543..1981d134af 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -131,7 +131,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_channelId = channelId;
_session = session;
_producerId = producerId;
- if (destination != null && !(destination instanceof AMQUndefinedDestination))
+ if (destination != null && !(destination.neverDeclare()))
{
declareDestination(destination);
}
@@ -177,7 +177,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
void resubscribe() throws AMQException
{
- if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
+ if (_destination != null && !_destination.neverDeclare())
{
declareDestination(_destination);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index dbfbb743ec..fef9769d06 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -36,6 +36,7 @@ import org.apache.qpid.url.BindingURL;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Queue;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Enumeration;
@@ -258,7 +259,29 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
}
catch (URISyntaxException e)
{
- throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
+ if(replyToEncoding.startsWith("/"))
+ {
+ dest = new DefaultRouterDestination(replyToEncoding);
+ }
+ else if(replyToEncoding.contains("/"))
+ {
+ String[] parts = replyToEncoding.split("/",2);
+ dest = new NonBURLReplyToDestination(parts[0], parts[1]);
+
+
+ }
+ else
+ {
+ if(getAMQSession().isQueueBound(AMQShortString.valueOf(replyToEncoding), null, null))
+ {
+ dest = new NonBURLReplyToDestination(replyToEncoding, "");
+ }
+ else
+ {
+ dest = new DefaultRouterDestination(replyToEncoding);
+ }
+ }
+
}
_destinationCache.put(replyToEncoding, dest);
@@ -371,7 +394,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getBoolean(propertyName);
@@ -381,7 +404,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getByte(propertyName);
@@ -391,7 +414,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getShort(propertyName);
@@ -401,7 +424,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getInteger(propertyName);
@@ -411,7 +434,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getLong(propertyName);
@@ -421,7 +444,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getFloat(propertyName);
@@ -431,7 +454,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getDouble(propertyName);
@@ -448,7 +471,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
return getJmsHeaders().getString(propertyName);
@@ -469,7 +492,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -480,7 +503,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -491,7 +514,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -508,7 +531,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -519,7 +542,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -530,7 +553,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
if (STRICT_AMQP_COMPLIANCE)
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ throw new UnsupportedOperationException("JMS Properties not supported in AMQP");
}
checkWritableProperties();
@@ -585,4 +608,50 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
_readableProperties = false;
}
+
+ private static class DefaultRouterDestination extends AMQDestination implements Queue
+ {
+ public DefaultRouterDestination(final String replyToEncoding)
+ {
+ super(AMQShortString.EMPTY_STRING,
+ AMQShortString.valueOf("direct"),
+ AMQShortString.valueOf(replyToEncoding),
+ AMQShortString.valueOf(replyToEncoding));
+ }
+
+ @Override
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
+ }
+
+ private static class NonBURLReplyToDestination extends AMQDestination implements Queue
+ {
+ public NonBURLReplyToDestination(final String exchange, final String routingKey)
+ {
+ super(AMQShortString.valueOf(exchange),
+ null,
+ AMQShortString.valueOf(routingKey),
+ AMQShortString.valueOf(routingKey));
+ }
+
+ @Override
+ public boolean isNameRequired()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean neverDeclare()
+ {
+ return true;
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index bb57750426..f92e361483 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -62,7 +62,6 @@ import org.apache.qpid.util.FileUtils;
import java.io.File;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.mockito.Matchers.eq;
@@ -627,7 +626,7 @@ public class MessageStoreTest extends QpidTestCase
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null);
+ exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null);
}