diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 16:04:16 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-05 16:04:16 +0000 |
commit | 6713bfc5ddc1ff6202dad0d950a252273f73f795 (patch) | |
tree | d789ac52d18fdc493f5d7e1731384c43cbfde9f1 | |
parent | 58c93e3b5e6c2227cc0018720a8781b25ec0d288 (diff) | |
download | qpid-python-6713bfc5ddc1ff6202dad0d950a252273f73f795.tar.gz |
QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68
41 files changed, 515 insertions, 115 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index eb3b665242..bd0411619e 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -585,7 +585,7 @@ public class BDBMessageStoreTest extends MessageStoreTest _messageId = messageId; } - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 2a688f497a..e01f4b7db9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -423,11 +423,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> final List<? extends BaseQueue> route(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - List<? extends BaseQueue> queues = doRoute(message, instanceProperties); + List<? extends BaseQueue> queues = doRoute(message, routingAddress, instanceProperties); List<? extends BaseQueue> allQueues = queues; boolean deletedQueues = false; @@ -464,18 +465,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { - List<? extends BaseQueue> queues = route(message, instanceProperties); + List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties); if(queues == null || queues.isEmpty()) { ExchangeImpl altExchange = getAlternateExchange(); if(altExchange != null) { - return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -515,6 +517,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties); @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java index f59049d276..123a4f0a63 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -50,13 +50,31 @@ public class DefaultDestination implements MessageDestination public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { - final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) + { + String[] parts = routingAddress.split("/",2); + ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); + if(exchange != null) + { + return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); + } + } + else if(!routingAddress.contains("/")) + { + ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); + if(exchange != null) + { + return exchange.send(message, "", instanceProperties, txn, postEnqueueAction); + } + } return 0; } else diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 0e8cfb72a1..a67cacf821 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -143,11 +143,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange> } @Override - public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public List<? extends BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey(); - BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); if(bindings != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index c7f81f1d15..b7810e8112 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -79,7 +79,9 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { for(BindingImpl b : getBindings()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 60df38af55..9d3ce0a415 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -93,7 +93,9 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index e7236bdf3e..db73e842b8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -157,12 +157,14 @@ public class TopicExchange extends AbstractExchange<TopicExchange> } @Override - public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList<BaseQueue> doRoute(ServerMessage payload, + final String routingAddress, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey() == null + final String routingKey = routingAddress == null ? "" - : payload.getRoutingKey(); + : routingAddress; final Collection<AMQQueue> matchedQueues = getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); @@ -181,7 +183,7 @@ public class TopicExchange extends AbstractExchange<TopicExchange> if(queues == null || queues.isEmpty()) { - _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); + _logger.info("Message routing key: " + routingAddress + " No routes."); } return queues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 155f209ffb..1913f11ae1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -32,14 +32,18 @@ public interface MessageDestination extends MessageNode /** * Routes a message + * + * * @param message the message to be routed + * @param routingAddress * @param instanceProperties the instance properties * @param txn the transaction to enqueue within * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ <M extends ServerMessage<? extends StorableMessageMetaData>> int send(M message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action<? super MessageInstance> postEnqueueAction); + final String routingAddress, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action<? super MessageInstance> postEnqueueAction); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 28491edaba..8c35af8be4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource { - String getRoutingKey(); + String getInitialRoutingAddress(); AMQMessageHeader getMessageHeader(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 6375cfb07d..fdc2fa90a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -44,6 +44,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, private final Object _messageBody; private final int _contentSize; private InternalMessageHeader _header; + private String _initialRoutingAddress; InternalMessage(final StoredMessage<InternalMessageMetaData> handle, @@ -80,9 +81,9 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { - return null; + return _initialRoutingAddress; } @Override @@ -253,4 +254,8 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } + public void setInitialRoutingAddress(final String initialRoutingAddress) + { + _initialRoutingAddress = initialRoutingAddress; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 62634970a6..11eb0b8a19 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -70,7 +70,6 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import javax.management.NotificationListener; import javax.security.auth.Subject; public abstract class AbstractQueue @@ -2465,9 +2464,10 @@ public abstract class AbstractQueue } public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { txn.enqueue(this,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 9814431beb..91148b1dc0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -377,6 +377,7 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { enqueues = alternateExchange.send(getMessage(), + getMessage().getInitialRoutingAddress(), getInstanceProperties(), txn, action); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index be0704aeaa..fa75d41810 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -127,7 +127,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, null); - List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + List<? extends BaseQueue> result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -136,7 +136,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -144,14 +144,14 @@ public class FanoutExchangeTest extends TestCase _exchange.deleteBinding("key",queue2); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to queue1 only", 1, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -160,7 +160,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index e4e07813c7..76752de5d0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -73,7 +73,7 @@ public class HeadersExchangeTest extends TestCase protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { - List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY); + List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY); List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 1c191b7b2e..21aa171551 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -324,8 +324,8 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) { ServerMessage message = mock(ServerMessage.class); - when(message.getRoutingKey()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY); + when(message.getInitialRoutingAddress()).thenReturn(routingKey); + List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index bd43100cd2..5622383f3f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -119,7 +119,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 3b74110a6e..8992cf62c9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -67,7 +67,7 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { throw new NotImplementedException(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 69c625d41d..120ba2d951 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -417,7 +417,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), - msg.getRoutingKey())); + msg.getInitialRoutingAddress())); } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 37bbd810b4..7ff3873856 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -30,17 +30,8 @@ import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.typedmessage.TypedBytesContentReader; -import org.apache.qpid.typedmessage.TypedBytesFormatException; -import java.io.EOFException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage> { @@ -123,7 +114,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte }; } - private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + private MessageMetaData_0_10 convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size) { DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); @@ -132,7 +123,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); @@ -142,7 +133,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte { messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); } - + messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap()); Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index df4c398115..9ebf4570d0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -33,7 +33,6 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -127,7 +126,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index 487862bcba..869ac01c4e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -41,7 +41,7 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra return getStoredMessage().getMetaData(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { return getMetaData().getRoutingKey(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 236a955ea9..5627b2eabe 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -47,7 +47,6 @@ import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -58,7 +57,6 @@ import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; @@ -199,7 +197,10 @@ public class ServerSession extends Session _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } - int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction); + int enqueues = exchange.send(message, + message.getInitialRoutingAddress(), + instanceProperties, _transaction, _checkCapacityAction + ); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); return enqueues; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9e0c5b6be6..99068a9d6c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -378,8 +378,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } }; - int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, - immediate ? _immediateAction : _capacityCheckAction); + int enqueues = _currentMessage.getDestination().send(amqMessage, + amqMessage.getInitialRoutingAddress(), + instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction + ); if(enqueues == 0) { handleUnroutableMessage(amqMessage); @@ -1574,7 +1577,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> if (altExchange == null) { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } else diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 833f5fb06f..0ed63daf7c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -71,7 +71,7 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return getMessageMetaData().getContentHeaderBody(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { MessageMetaData messageMetaData = getMessageMetaData(); if (messageMetaData != null) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index f2bb95c8d5..9ba212d1ed 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -125,7 +125,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter }; } - private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size) + private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size) { MessagePublishInfo publishInfo = new MessagePublishInfo() @@ -133,7 +133,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter @Override public AMQShortString getExchange() { - return null; + return AMQShortString.EMPTY_STRING; } @Override @@ -157,7 +157,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter @Override public AMQShortString getRoutingKey() { - return null; + return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress()); } }; @@ -174,6 +174,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter props.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); props.setUserId(serverMsg.getMessageHeader().getUserId()); + Map<String,Object> headerProps = new LinkedHashMap<String, Object>(); for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) @@ -184,6 +185,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter props.setHeaders(FieldTable.convertToFieldTable(headerProps)); final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + chb.setBodySize(size); return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java index f35d37ecbd..b244e7626f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -20,20 +20,27 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.apache.qpid.url.AMQBindingURL; import java.io.EOFException; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage> { @@ -58,9 +65,210 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe Object body = convertMessageBody(mimeType, data); - return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body); + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), + new DelegatingMessageHeader(serverMessage.getMessageHeader()), body); } + private static class ReplyToComponents + { + private String _exchange; + private String _queue; + private String _routingKey; + + public void setExchange(final String exchange) + { + _exchange = exchange; + } + + public void setQueue(final String queue) + { + _queue = queue; + } + + public void setRoutingKey(final String routingKey) + { + _routingKey = routingKey; + } + + public String getExchange() + { + return _exchange; + } + + public String getQueue() + { + return _queue; + } + + public String getRoutingKey() + { + return _routingKey; + } + + public boolean hasExchange() + { + return _exchange != null; + } + + public boolean hasQueue() + { + return _queue != null; + } + + public boolean hasRoutingKey() + { + return _routingKey != null; + } + } + + private static class DelegatingMessageHeader implements AMQMessageHeader + { + private final AMQMessageHeader _delegate; + + private DelegatingMessageHeader(final AMQMessageHeader delegate) + { + _delegate = delegate; + } + + @Override + public String getCorrelationId() + { + return _delegate.getCorrelationId(); + } + + @Override + public long getExpiration() + { + return _delegate.getExpiration(); + } + + @Override + public String getUserId() + { + return _delegate.getUserId(); + } + + @Override + public String getAppId() + { + return _delegate.getAppId(); + } + + @Override + public String getMessageId() + { + return _delegate.getMessageId(); + } + + @Override + public String getMimeType() + { + return _delegate.getMimeType(); + } + + @Override + public String getEncoding() + { + return _delegate.getEncoding(); + } + + @Override + public byte getPriority() + { + return _delegate.getPriority(); + } + + @Override + public long getTimestamp() + { + return _delegate.getTimestamp(); + } + + @Override + public String getType() + { + return _delegate.getType(); + } + + @Override + public String getReplyTo() + { + String originalReplyTo = _delegate.getReplyTo(); + ReplyToComponents replyTo = convertReplyTo(originalReplyTo); + if(replyTo != null) + { + if(replyTo.hasExchange()) + { + return replyTo.getExchange() + (replyTo.hasRoutingKey() ? "/" + replyTo.getRoutingKey() : ""); + } + else + { + return replyTo.hasQueue() ? replyTo.getQueue() : replyTo.getRoutingKey(); + } + } + else + { + return originalReplyTo; + } + } + + private ReplyToComponents convertReplyTo(final String origReplyToString) + { + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + ReplyToComponents replyTo = new ReplyToComponents(); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + + AMQShortString queueName = burl.getQueueName(); + if(queueName != null) + { + replyTo.setQueue(queueName.asString()); + } + return replyTo; + } + catch (URISyntaxException e) + { + return null; + } + } + + @Override + public Object getHeader(final String name) + { + return _delegate.getHeader(name); + } + + @Override + public boolean containsHeaders(final Set<String> names) + { + return _delegate.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return _delegate.containsHeader(name); + } + + @Override + public Collection<String> getHeaderNames() + { + return _delegate.getHeaderNames(); + } + } + + private static Object convertMessageBody(String mimeType, byte[] data) { if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index d83665ad39..fc2c0d93d0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -76,7 +76,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - int enqueues = _exchange.send(message, instanceProperties, txn, null); + int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index d5a349304c..4540308f61 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -563,6 +563,11 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData { return _properties == null ? null : _properties.getTo(); } + + public Map<String, Object> getHeadersAsMap() + { + return new HashMap<String, Object>(_appProperties); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index 66094f52f0..36796851e0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -69,7 +69,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM _arrivalTime = System.currentTimeMillis(); } - public String getRoutingKey() + public String getInitialRoutingAddress() { Object routingKey = getMessageHeader().getHeader("routing-key"); if(routingKey != null) @@ -78,7 +78,7 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM } else { - return getMessageHeader().getSubject(); + return getMessageHeader().getTo(); } } @@ -92,12 +92,6 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM return getMessageMetaData().getMessageHeader(); } - public boolean isRedelivered() - { - // TODO - return false; - } - public long getSize() { long size = 0l; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java index f7f049831e..dedb8a3dc0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -76,7 +76,7 @@ public class NodeReceivingDestination implements ReceivingDestination return null; }}; - int enqueues = _exchange.send(message, instanceProperties, txn, null); + int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java index a70bd4b243..26dd8e5f37 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java @@ -115,7 +115,7 @@ public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<Messa } } - props.setSubject(serverMessage.getRoutingKey()); + props.setSubject(serverMessage.getInitialRoutingAddress()); if(msgProps.hasUserId()) { diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 0f0197cb63..a3c7ea31e0 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.converter.v0_10_v1_0; +import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; @@ -33,6 +34,7 @@ import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; import java.nio.ByteBuffer; @@ -53,16 +55,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 @Override public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost) { - return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); + return new MessageTransferMessage(convertToStoredMessage(serverMsg, vhost), null); } - private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg) + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg, + final VirtualHost vhost) { Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject); final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg, + vhost, MessageConverter_from_1_0.getBodyMimeType(bodyObject), messageContent.length); @@ -119,25 +123,54 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 }; } - private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + private MessageMetaData_0_10 convertMetaData(Message_1_0 serverMsg, + final VirtualHost vhost, + final String bodyMimeType, + final int size) { DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); + final AMQMessageHeader origHeader = serverMsg.getMessageHeader(); deliveryProps.setExpiration(serverMsg.getExpiration()); - deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); - deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + deliveryProps.setPriority(MessageDeliveryPriority.get(origHeader.getPriority())); + deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); + deliveryProps.setTimestamp(origHeader.getTimestamp()); - messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); + messageProps.setContentEncoding(origHeader.getEncoding()); messageProps.setContentLength(size); messageProps.setContentType(bodyMimeType); - if(serverMsg.getMessageHeader().getCorrelationId() != null) + if(origHeader.getCorrelationId() != null) { - messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); + messageProps.setCorrelationId(origHeader.getCorrelationId().getBytes()); } + final String origReplyTo = origHeader.getReplyTo(); + if(origReplyTo != null && !origReplyTo.equals("")) + { + ReplyTo replyTo; + if(origReplyTo.startsWith("/")) + { + replyTo = new ReplyTo("",origReplyTo); + } + else if(origReplyTo.contains("/")) + { + String[] parts = origReplyTo.split("/",2); + replyTo = new ReplyTo(parts[0],parts[1]); + } + else if(vhost.getExchange(origReplyTo) != null) + { + replyTo = new ReplyTo(origReplyTo,""); + } + else + { + replyTo = new ReplyTo("",origReplyTo); + } + messageProps.setReplyTo(replyTo); + } + + messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeadersAsMap()); Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index 544099f1f2..dd371acc3d 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -132,7 +132,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag deliveryProps.setExpiration(message_0_8.getExpiration()); deliveryProps.setImmediate(message_0_8.isImmediate()); deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority())); - deliveryProps.setRoutingKey(message_0_8.getRoutingKey()); + deliveryProps.setRoutingKey(message_0_8.getInitialRoutingAddress()); deliveryProps.setTimestamp(properties.getTimestamp()); messageProps.setContentEncoding(properties.getEncodingAsString()); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java index bbea177260..a0026ccd8f 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.converter.v0_8_v1_0; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; @@ -37,6 +38,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.url.AMQBindingURL; public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage> { @@ -102,9 +104,45 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess { props.setMessageId(new Binary(messageId.getBytes())); } - props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + final String originalReplyTo = String.valueOf(contentHeader.getReplyTo()); + try + { + AMQBindingURL burl = new AMQBindingURL(originalReplyTo); + String replyTo; + + if(burl.getExchangeName() != null && !burl.getExchangeName().equals(AMQShortString.EMPTY_STRING)) + { + replyTo = burl.getExchangeName().asString(); + + if(burl.getRoutingKey() != null) + { + replyTo += "/" + burl.getRoutingKey().asString(); + } + + } + else if(burl.getQueueName() != null && !burl.getQueueName().equals(AMQShortString.EMPTY_STRING)) + { + replyTo = burl.getQueueName().asString(); + } + else if(burl.getRoutingKey() != null) + { + replyTo = burl.getRoutingKey().asString(); + } + else + { + replyTo = originalReplyTo; + } + + props.setReplyTo(replyTo); + } + catch (URISyntaxException e) + { + props.setReplyTo(originalReplyTo); + } + + - props.setSubject(serverMessage.getRoutingKey()); + props.setSubject(serverMessage.getInitialRoutingAddress()); if(contentHeader.getUserId() != null) { props.setUserId(new Binary(contentHeader.getUserId().getBytes())); diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 6029b09466..a6c5131222 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -44,6 +44,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -261,9 +262,10 @@ class ManagementNode implements MessageSource, MessageDestination @Override public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance> postEnqueueAction) { @SuppressWarnings("unchecked") @@ -361,11 +363,19 @@ class ManagementNode implements MessageSource, MessageDestination ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo()); + response.setInitialRoutingAddress(message.getMessageHeader().getReplyTo()); if(consumer != null) { // TODO - check same owner consumer.send(response); } + else + { + _virtualHost.getDefaultDestination().send(response, + message.getMessageHeader().getReplyTo(), InstanceProperties.EMPTY, + new AutoCommitTransaction(_virtualHost.getMessageStore()), + null); + } // TODO - route to a queue } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 91c23ff384..6c421a9610 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,6 +112,11 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } + public boolean neverDeclare() + { + return false; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8224c77ba9..29c2a3b279 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2864,16 +2864,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - if (_declareExchanges) + if (_declareExchanges && !amqd.neverDeclare()) { declareExchange(amqd, nowait); } - if (_delareQueues || amqd.isNameRequired()) + if ((_delareQueues || amqd.isNameRequired()) && !amqd.neverDeclare()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } - if (_bindQueues) + if (_bindQueues && !amqd.neverDeclare()) { if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index fa2afb3ee4..ec9c595f99 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -37,4 +37,10 @@ public class AMQUndefinedDestination extends AMQDestination { return getAMQQueueName() == null; } + + @Override + public boolean neverDeclare() + { + return true; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 5acaa5c543..1981d134af 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -131,7 +131,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _channelId = channelId; _session = session; _producerId = producerId; - if (destination != null && !(destination instanceof AMQUndefinedDestination)) + if (destination != null && !(destination.neverDeclare())) { declareDestination(destination); } @@ -177,7 +177,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac void resubscribe() throws AMQException { - if (_destination != null && !(_destination instanceof AMQUndefinedDestination)) + if (_destination != null && !_destination.neverDeclare()) { declareDestination(_destination); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index dbfbb743ec..fef9769d06 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -36,6 +36,7 @@ import org.apache.qpid.url.BindingURL; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; +import javax.jms.Queue; import java.net.URISyntaxException; import java.util.Collections; import java.util.Enumeration; @@ -258,7 +259,29 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate } catch (URISyntaxException e) { - throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); + if(replyToEncoding.startsWith("/")) + { + dest = new DefaultRouterDestination(replyToEncoding); + } + else if(replyToEncoding.contains("/")) + { + String[] parts = replyToEncoding.split("/",2); + dest = new NonBURLReplyToDestination(parts[0], parts[1]); + + + } + else + { + if(getAMQSession().isQueueBound(AMQShortString.valueOf(replyToEncoding), null, null)) + { + dest = new NonBURLReplyToDestination(replyToEncoding, ""); + } + else + { + dest = new DefaultRouterDestination(replyToEncoding); + } + } + } _destinationCache.put(replyToEncoding, dest); @@ -371,7 +394,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getBoolean(propertyName); @@ -381,7 +404,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getByte(propertyName); @@ -391,7 +414,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getShort(propertyName); @@ -401,7 +424,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getInteger(propertyName); @@ -411,7 +434,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getLong(propertyName); @@ -421,7 +444,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getFloat(propertyName); @@ -431,7 +454,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getDouble(propertyName); @@ -448,7 +471,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } return getJmsHeaders().getString(propertyName); @@ -469,7 +492,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -480,7 +503,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -491,7 +514,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -508,7 +531,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -519,7 +542,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -530,7 +553,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { if (STRICT_AMQP_COMPLIANCE) { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); + throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); } checkWritableProperties(); @@ -585,4 +608,50 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate _readableProperties = false; } + + private static class DefaultRouterDestination extends AMQDestination implements Queue + { + public DefaultRouterDestination(final String replyToEncoding) + { + super(AMQShortString.EMPTY_STRING, + AMQShortString.valueOf("direct"), + AMQShortString.valueOf(replyToEncoding), + AMQShortString.valueOf(replyToEncoding)); + } + + @Override + public boolean isNameRequired() + { + return false; + } + + @Override + public boolean neverDeclare() + { + return true; + } + } + + private static class NonBURLReplyToDestination extends AMQDestination implements Queue + { + public NonBURLReplyToDestination(final String exchange, final String routingKey) + { + super(AMQShortString.valueOf(exchange), + null, + AMQShortString.valueOf(routingKey), + AMQShortString.valueOf(routingKey)); + } + + @Override + public boolean isNameRequired() + { + return false; + } + + @Override + public boolean neverDeclare() + { + return true; + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index bb57750426..f92e361483 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -62,7 +62,6 @@ import org.apache.qpid.util.FileUtils; import java.io.File; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.mockito.Matchers.eq; @@ -627,7 +626,7 @@ public class MessageStoreTest extends QpidTestCase ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null); + exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null); } |