diff options
Diffstat (limited to 'java')
50 files changed, 1076 insertions, 306 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 0879b77f37..7271bd6e43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -33,17 +33,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -202,9 +201,11 @@ public class AMQChannel } - public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException + public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException { - _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody, + + + _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); // TODO: used in clustering only I think (RG) _currentMessage.setPublisher(publisher); @@ -252,7 +253,7 @@ public class AMQChannel // returns true iff the message was delivered (i.e. if all data was // received - if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) + if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody))) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index af38a9abe5..a35a46f305 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -198,12 +198,18 @@ public class VirtualHostConfiguration for(Object routingKeyNameObj : routingKeys) { AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - exchange.registerQueue(routingKey, queue, null); + + + queue.bind(routingKey, null, exchange); - queue.bind(routingKey, exchange); _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); } + + if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) + { + queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange()); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 9b9765524c..4774383642 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -28,6 +28,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.MessageStore; public class DefaultExchangeRegistry implements ExchangeRegistry { @@ -39,23 +41,32 @@ public class DefaultExchangeRegistry implements ExchangeRegistry private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>(); private Exchange _defaultExchange; + private VirtualHost _host; - public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) + public DefaultExchangeRegistry(VirtualHost host) { //create 'standard' exchanges: - try - { - new ExchangeInitialiser().initialise(exchangeFactory, this); - } - catch(AMQException e) - { - _log.error("Failed to initialise exchanges: ", e); - } + _host = host; + } - public void registerExchange(Exchange exchange) + public void initialise() throws AMQException + { + new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this); + } + + public MessageStore getMessageStore() + { + return _host.getMessageStore(); + } + + public void registerExchange(Exchange exchange) throws AMQException { _exchangeMap.put(exchange.getName(), exchange); + if(exchange.isDurable()) + { + getMessageStore().createExchange(exchange); + } } public void setDefaultExchange(Exchange exchange) @@ -74,6 +85,10 @@ public class DefaultExchangeRegistry implements ExchangeRegistry Exchange e = _exchangeMap.remove(name); if (e != null) { + if(e.isDurable()) + { + getMessageStore().removeExchange(e); + } e.close(); } else @@ -102,7 +117,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ public void routeContent(AMQMessage payload) throws AMQException { - final AMQShortString exchange = payload.getPublishBody().exchange; + final AMQShortString exchange = payload.getMessagePublishInfo().getExchange(); final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the BasicPublish being received (where the exchange is validated) and the final diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 93e9ff2c5b..4d66e37628 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -43,6 +43,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; @@ -126,8 +127,7 @@ public class DestNameExchange extends AbstractExchange try { - registerQueue(new AMQShortString(binding), queue, null); - queue.bind(new AMQShortString(binding), DestNameExchange.this); + queue.bind(new AMQShortString(binding), null, DestNameExchange.this); } catch (AMQException ex) { @@ -170,7 +170,7 @@ public class DestNameExchange extends AbstractExchange } } - public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; @@ -184,13 +184,13 @@ public class DestNameExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - final BasicPublishBody publishBody = payload.getPublishBody(); - final AMQShortString routingKey = publishBody.routingKey; + final MessagePublishInfo info = payload.getMessagePublishInfo(); + final AMQShortString routingKey = info.getRoutingKey(); final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (publishBody.mandatory) + if (info.isMandatory()) { throw new NoRouteException(msg, payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 636b4558c6..8a50e93bf9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -45,6 +45,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; @@ -125,8 +126,7 @@ public class DestWildExchange extends AbstractExchange try { - registerQueue(new AMQShortString(binding), queue, null); - queue.bind(new AMQShortString(binding), DestWildExchange.this); + queue.bind(new AMQShortString(binding), null, DestWildExchange.this); } catch (AMQException ex) { @@ -168,9 +168,9 @@ public class DestWildExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { - BasicPublishBody publishBody = payload.getPublishBody(); + MessagePublishInfo info = payload.getMessagePublishInfo(); - final AMQShortString routingKey = publishBody.routingKey; + final AMQShortString routingKey = info.getRoutingKey(); List<AMQQueue> queues = _routingKey2queues.get(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag @@ -221,7 +221,7 @@ public class DestWildExchange extends AbstractExchange return !_routingKey2queues.isEmpty(); } - public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 7702e8b315..a5f77cc2a4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -47,7 +47,7 @@ public interface Exchange void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException; + void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; void route(AMQMessage message) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index a022b86299..d3a466565f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString; public interface ExchangeRegistry extends MessageRouter { - void registerExchange(Exchange exchange); + void registerExchange(Exchange exchange) throws AMQException; /** * Unregister an exchange @@ -42,4 +42,6 @@ public interface ExchangeRegistry extends MessageRouter void setDefaultExchange(Exchange exchange); Exchange getDefaultExchange(); + + void initialise() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 01a1b0bbc8..095fd2b7e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -19,8 +19,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -98,9 +98,8 @@ public class FanoutExchange extends AbstractExchange }
try
- {
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), FanoutExchange.this);
+ {
+ queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
{
@@ -144,10 +143,10 @@ public class FanoutExchange extends AbstractExchange }
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+
if (!_queues.remove(queue))
{
@@ -158,12 +157,12 @@ public class FanoutExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException
{
- final BasicPublishBody publishBody = payload.getPublishBody();
- final AMQShortString routingKey = publishBody.routingKey;
+ final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
+ final AMQShortString routingKey = publishInfo.getRoutingKey();
if (_queues == null || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishBody.mandatory)
+ if (publishInfo.isMandatory())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index f3dc8131b3..204e2f9f93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -200,10 +200,10 @@ public class HeadersExchange extends AbstractExchange _bindings.add(new Registration(new HeadersBinding(args), queue)); } - public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName()); - _bindings.remove(new Registration(null, queue)); + _bindings.remove(new Registration(new HeadersBinding(args), queue)); } public void route(AMQMessage payload) throws AMQException @@ -232,7 +232,7 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getPublishBody().mandatory) + if (payload.getMessagePublishInfo().isMandatory()) { throw new NoRouteException(msg, payload); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 3798918428..67ade0a744 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; @@ -85,7 +86,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.setPublishFrame(body, session); + MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body); + channel.setPublishFrame(info, session); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 8c722d33cc..4dc67b1970 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -98,8 +98,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist."); } try - { - exch.registerQueue(body.routingKey, queue, body.arguments); + { + queue.bind(body.routingKey, body.arguments, exch); } catch (AMQInvalidRoutingKeyException rke) { @@ -109,7 +109,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); } - queue.bind(body.routingKey, exch); + if (_log.isInfoEnabled()) { _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index a35cb9f7d3..8b2467f47d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -108,8 +108,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (autoRegister) { Exchange defaultExchange = exchangeRegistry.getDefaultExchange(); - defaultExchange.registerQueue(body.queue, queue, null); - queue.bind(body.queue, defaultExchange); + + queue.bind(body.queue, null, defaultExchange); _log.info("Queue " + body.queue + " bound to default exchange"); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index fa8f13127a..29d55ce763 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -41,6 +41,9 @@ public class ExchangeInitialiser private void define(ExchangeRegistry r, ExchangeFactory f, AMQShortString name, AMQShortString type) throws AMQException { - r.registerExchange(f.createExchange(name, type, true, false, 0)); + if(r.getExchange(name)== null) + { + r.registerExchange(f.createExchange(name, type, true, false, 0)); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index be81734ae4..c60c22c4e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -27,20 +27,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicGetOkBody; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.SmallCompositeAMQDataBlock; +import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.txn.TransactionalContext; /** @@ -98,10 +91,12 @@ public class AMQMessage private int _channel; private int _index = -1; + private AMQProtocolSession _protocolSession; - private BodyFrameIterator(int channel) + private BodyFrameIterator(AMQProtocolSession protocolSession, int channel) { _channel = channel; + _protocolSession = protocolSession; } public boolean hasNext() @@ -121,8 +116,9 @@ public class AMQMessage { try { - ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); - return ContentBody.createAMQFrame(_channel, cb); + + AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index)); + return new AMQFrame(_channel, cb); } catch (AMQException e) { @@ -132,6 +128,11 @@ public class AMQMessage } + private ProtocolVersionMethodConverter getProtocolVersionMethodConverter() + { + return _protocolSession.getRegistry().getProtocolVersionMethodConverter(); + } + public void remove() { throw new UnsupportedOperationException(); @@ -143,7 +144,7 @@ public class AMQMessage return _txnContext.getStoreContext(); } - private class BodyContentIterator implements Iterator<ContentBody> + private class BodyContentIterator implements Iterator<ContentChunk> { private int _index = -1; @@ -161,11 +162,11 @@ public class AMQMessage } } - public ContentBody next() + public ContentChunk next() { try { - return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); + return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index); } catch (AMQException e) { @@ -179,13 +180,13 @@ public class AMQMessage } } - public AMQMessage(Long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; - _immediate = publishBody.immediate; - _transientMessageData.setPublishBody(publishBody); + _immediate = info.isImmediate(); + _transientMessageData.setMessagePublishInfo(info); _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) @@ -215,14 +216,14 @@ public class AMQMessage * Used in testing only. This allows the passing of the content header immediately * on construction. * @param messageId - * @param publishBody + * @param info * @param txnContext * @param contentHeader */ - public AMQMessage(Long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException { - this(messageId, publishBody, txnContext); + this(messageId, info, txnContext); setContentHeaderBody(contentHeader); } @@ -230,23 +231,23 @@ public class AMQMessage * Used in testing only. This allows the passing of the content header and some body fragments on * construction. * @param messageId - * @param publishBody + * @param info * @param txnContext * @param contentHeader * @param destinationQueues * @param contentBodies * @throws AMQException */ - public AMQMessage(Long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext, ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, - List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext, + List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { - this(messageId, publishBody, txnContext, contentHeader); + this(messageId, info, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); routingComplete(messageStore, storeContext, messageHandleFactory); - for (ContentBody cb : contentBodies) + for (ContentChunk cb : contentBodies) { addContentBodyFrame(storeContext, cb); } @@ -261,12 +262,12 @@ public class AMQMessage _transientMessageData = msg._transientMessageData; } - public Iterator<AMQDataBlock> getBodyFrameIterator(int channel) + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { - return new BodyFrameIterator(channel); + return new BodyFrameIterator(protocolSession, channel); } - public Iterator<ContentBody> getContentBodyIterator() + public Iterator<ContentChunk> getContentBodyIterator() { return new BodyContentIterator(); } @@ -311,11 +312,11 @@ public class AMQMessage } } - public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException + public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException { - _transientMessageData.addBodyLength(contentBody.getSize()); + _transientMessageData.addBodyLength(contentChunk.getSize()); final boolean allContentReceived = isAllContentReceived(); - _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived); + _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived); if (allContentReceived) { deliver(storeContext); @@ -502,16 +503,16 @@ public class AMQMessage } } - public BasicPublishBody getPublishBody() throws AMQException + public MessagePublishInfo getMessagePublishInfo() throws AMQException { - BasicPublishBody pb; + MessagePublishInfo pb; if (_transientMessageData != null) { - pb = _transientMessageData.getPublishBody(); + pb = _transientMessageData.getMessagePublishInfo(); } else { - pb = _messageHandle.getPublishBody(getStoreContext(),_messageId); + pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId); } return pb; } @@ -554,7 +555,7 @@ public class AMQMessage { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(), + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content @@ -598,9 +599,9 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); - AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); protocolSession.writeFrame(compositeBlock); @@ -610,8 +611,8 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); - protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); + cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -641,9 +642,9 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); + ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0); - AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); + AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); protocolSession.writeFrame(compositeBlock); @@ -653,8 +654,8 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); - protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); + cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i); + protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb))); } @@ -667,10 +668,10 @@ public class AMQMessage private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { - BasicPublishBody pb = getPublishBody(); + MessagePublishInfo pb = getMessagePublishInfo(); AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag, - deliveryTag, pb.exchange, _messageHandle.isRedelivered(), - pb.routingKey); + deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(), + pb.getRoutingKey()); ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem? deliverFrame.writePayload(buf); buf.flip(); @@ -680,14 +681,14 @@ public class AMQMessage private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException { - BasicPublishBody pb = getPublishBody(); + MessagePublishInfo pb = getMessagePublishInfo(); AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(), - deliveryTag, pb.exchange, + deliveryTag, pb.getExchange(), queueSize, _messageHandle.isRedelivered(), - pb.routingKey); + pb.getRoutingKey()); ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? getOkFrame.writePayload(buf); buf.flip(); @@ -699,9 +700,9 @@ public class AMQMessage AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(), - getPublishBody().exchange, + getMessagePublishInfo().getExchange(), replyCode, replyText, - getPublishBody().routingKey); + getMessagePublishInfo().getRoutingKey()); ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? returnFrame.writePayload(buf); buf.flip(); @@ -716,7 +717,7 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId); + Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId); // // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. @@ -767,7 +768,7 @@ public class AMQMessage public void restoreTransientMessageData() throws AMQException { TransientMessageData transientMessageData = new TransientMessageData(); - transientMessageData.setPublishBody(getPublishBody()); + transientMessageData.setMessagePublishInfo(getMessagePublishInfo()); transientMessageData.setContentHeaderBody(getContentHeaderBody()); transientMessageData.addBodyLength(getContentHeaderBody().getSize()); _transientMessageData = transientMessageData; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 210c9f01a8..ede55b3bbf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -21,10 +21,10 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; /** * A pluggable way of getting message data. Implementations can provide intelligent caching for example or @@ -53,11 +53,11 @@ public interface AMQMessageHandle * @return a content body * @throws IllegalArgumentException if the index is invalid */ - ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException; + ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException; - void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException; + void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException; - BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException; + MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException; boolean isRedelivered(); @@ -65,7 +65,7 @@ public interface AMQMessageHandle boolean isPersistent(StoreContext context, Long messageId) throws AMQException; - void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, + void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 557d82359f..e9ebe6c541 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -441,9 +441,24 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.clearAllMessages(storeContext); } - public void bind(AMQShortString routingKey, Exchange exchange) + public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException { - _bindings.addBinding(routingKey, exchange); + exchange.registerQueue(routingKey, this, arguments); + if(isDurable() && exchange.isDurable()) + { + _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments); + } + _bindings.addBinding(routingKey, arguments, exchange); + } + + public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException + { + exchange.deregisterQueue(routingKey, this, arguments); + if(isDurable() && exchange.isDurable()) + { + _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments); + } + _bindings.remove(routingKey, arguments, exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index c0b22b541b..4fd89f39da 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -44,6 +44,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -322,16 +323,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } // get message content - Iterator<ContentBody> cBodies = msg.getContentBodyIterator(); + Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); List<Byte> msgContent = new ArrayList<Byte>(); while (cBodies.hasNext()) { - ContentBody body = cBodies.next(); + ContentChunk body = cBodies.next(); if (body.getSize() != 0) { if (body.getSize() != 0) { - ByteBuffer slice = body.payload.slice(); + ByteBuffer slice = body.getData().slice(); for (int j = 0; j < slice.limit(); j++) { msgContent.add(slice.get()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index d8bc19fcea..0fc8753a87 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -33,7 +33,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -114,11 +114,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Shrink the ContentBodies to their actual size to save memory. if (compressBufferOnQueue) { - Iterator<ContentBody> it = msg.getContentBodyIterator(); + Iterator<ContentChunk> it = msg.getContentBodyIterator(); while (it.hasNext()) { - ContentBody cb = it.next(); - cb.reduceBufferToFit(); + ContentChunk cb = it.next(); + cb.reduceToFit(); } } @@ -493,7 +493,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery"); } - if (!msg.getPublishBody().immediate) + if (!msg.getMessagePublishInfo().isImmediate()) { addMessageToQueue(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index d15cca72d2..a8247aa2db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; /** @@ -35,42 +36,55 @@ import org.apache.qpid.server.exchange.Exchange; */ class ExchangeBindings { + private static final FieldTable EMPTY_ARGUMENTS = new FieldTable(); + static class ExchangeBinding { - private final Exchange exchange; - private final AMQShortString routingKey; + private final Exchange _exchange; + private final AMQShortString _routingKey; + private final FieldTable _arguments; ExchangeBinding(AMQShortString routingKey, Exchange exchange) { - this.routingKey = routingKey; - this.exchange = exchange; + this(routingKey, exchange,EMPTY_ARGUMENTS); + } + + ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) + { + _routingKey = routingKey; + _exchange = exchange; + _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; } void unbind(AMQQueue queue) throws AMQException { - exchange.deregisterQueue(routingKey, queue); + _exchange.deregisterQueue(_routingKey, queue, _arguments); } public Exchange getExchange() { - return exchange; + return _exchange; } public AMQShortString getRoutingKey() { - return routingKey; + return _routingKey; } public int hashCode() { - return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode()); + return (_exchange == null ? 0 : _exchange.hashCode()) + + (_routingKey == null ? 0 : _routingKey.hashCode()) + + (_arguments == null ? 0 : _arguments.hashCode()); } public boolean equals(Object o) { if (!(o instanceof ExchangeBinding)) return false; ExchangeBinding eb = (ExchangeBinding) o; - return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey); + return _exchange.equals(eb._exchange) + && _routingKey.equals(eb._routingKey) + && _arguments.equals(eb._arguments); } } @@ -88,11 +102,18 @@ class ExchangeBindings * are being tracked by the instance has been bound to the exchange * @param exchange the exchange bound to */ - void addBinding(AMQShortString routingKey, Exchange exchange) + void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange) + { + _bindings.add(new ExchangeBinding(routingKey, exchange, arguments )); + } + + + public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange) { - _bindings.add(new ExchangeBinding(routingKey, exchange)); + _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments )); } + /** * Deregisters this queue from any exchange it has been bound to */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 79f875ce1e..630186991b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -25,9 +25,10 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.store.StoreContext; /** @@ -37,9 +38,9 @@ public class InMemoryMessageHandle implements AMQMessageHandle private ContentHeaderBody _contentHeaderBody; - private BasicPublishBody _publishBody; + private MessagePublishInfo _messagePublishInfo; - private List<ContentBody> _contentBodies = new LinkedList<ContentBody>(); + private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>(); private boolean _redelivered; @@ -64,7 +65,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -74,15 +75,15 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _contentBodies.get(index); } - public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException { _contentBodies.add(contentBody); } - public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException + public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException { - return _publishBody; + return _messagePublishInfo; } public boolean isRedelivered() @@ -106,15 +107,15 @@ public class InMemoryMessageHandle implements AMQMessageHandle /** * This is called when all the content has been received. - * @param publishBody + * @param messagePublishInfo * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo, ContentHeaderBody contentHeaderBody) throws AMQException { - _publishBody = publishBody; + _messagePublishInfo = messagePublishInfo; _contentHeaderBody = contentHeaderBody; _arrivalTime = System.currentTimeMillis(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java index a66a85e54d..285f05fb20 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java @@ -16,8 +16,8 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; /** * Encapsulates a publish body and a content header. In the context of the message store these are treated as a @@ -25,7 +25,7 @@ import org.apache.qpid.framing.ContentHeaderBody; */ public class MessageMetaData { - private BasicPublishBody _publishBody; + private MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; @@ -33,15 +33,15 @@ public class MessageMetaData private long _arrivalTime; - public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) + public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) { this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis()); } - public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime) + public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime) { _contentHeaderBody = contentHeaderBody; - _publishBody = publishBody; + _messagePublishInfo = publishBody; _contentChunkCount = contentChunkCount; _arrivalTime = arrivalTime; } @@ -66,14 +66,14 @@ public class MessageMetaData _contentHeaderBody = contentHeaderBody; } - public BasicPublishBody getPublishBody() + public MessagePublishInfo getMessagePublishInfo() { - return _publishBody; + return _messagePublishInfo; } - public void setPublishBody(BasicPublishBody publishBody) + public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo) { - _publishBody = publishBody; + _messagePublishInfo = messagePublishInfo; } public long getArrivalTime() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index 9f3d64f77e..7c8064789e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -21,8 +21,8 @@ import java.util.LinkedList; import java.util.List; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; /** @@ -40,7 +40,7 @@ public class TransientMessageData * Stored temporarily until the header has been received at which point it is used when * constructing the handle */ - private BasicPublishBody _publishBody; + private MessagePublishInfo _messagePublishInfo; /** * Also stored temporarily. @@ -59,14 +59,14 @@ public class TransientMessageData */ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>(); - public BasicPublishBody getPublishBody() + public MessagePublishInfo getMessagePublishInfo() { - return _publishBody; + return _messagePublishInfo; } - public void setPublishBody(BasicPublishBody publishBody) + public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo) { - _publishBody = publishBody; + _messagePublishInfo = messagePublishInfo; } public List<AMQQueue> getDestinationQueues() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 670d895950..373a64e2eb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -27,9 +27,9 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; @@ -40,9 +40,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle { private WeakReference<ContentHeaderBody> _contentHeaderBody; - private WeakReference<BasicPublishBody> _publishBody; + private WeakReference<MessagePublishInfo> _messagePublishInfo; - private List<WeakReference<ContentBody>> _contentBodies; + private List<WeakReference<ContentChunk>> _contentBodies; private boolean _redelivered; @@ -79,7 +79,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle { _arrivalTime = mmd.getArrivalTime(); _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody()); - _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); + _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo()); } public int getBodyCount(StoreContext context, Long messageId) throws AMQException @@ -88,10 +88,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle { MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId); int chunkCount = mmd.getContentChunkCount(); - _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount); + _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount); for (int i = 0; i < chunkCount; i++) { - _contentBodies.add(new WeakReference<ContentBody>(null)); + _contentBodies.add(new WeakReference<ContentChunk>(null)); } } return _contentBodies.size(); @@ -102,19 +102,19 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " + (_contentBodies.size() - 1)); } - WeakReference<ContentBody> wr = _contentBodies.get(index); - ContentBody cb = wr.get(); + WeakReference<ContentChunk> wr = _contentBodies.get(index); + ContentChunk cb = wr.get(); if (cb == null) { cb = _messageStore.getContentBodyChunk(context, messageId, index); - _contentBodies.set(index, new WeakReference<ContentBody>(cb)); + _contentBodies.set(index, new WeakReference<ContentChunk>(cb)); } return cb; } @@ -124,35 +124,36 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle * * @param storeContext * @param messageId - * @param contentBody + * @param contentChunk * @param isLastContentBody * @throws AMQException */ - public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException { if (_contentBodies == null && isLastContentBody) { - _contentBodies = new ArrayList<WeakReference<ContentBody>>(1); + _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1); } else { if (_contentBodies == null) { - _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); } } - _contentBodies.add(new WeakReference<ContentBody>(contentBody)); - _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody); + _contentBodies.add(new WeakReference<ContentChunk>(contentChunk)); + _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, + contentChunk, isLastContentBody); } - public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException + public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException { - BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null); + MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null); if (bpb == null) { MessageMetaData mmd = loadMessageMetaData(context, messageId); - bpb = mmd.getPublishBody(); + bpb = mmd.getMessagePublishInfo(); } return bpb; } @@ -182,7 +183,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody) throws AMQException { @@ -190,7 +191,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle // create en empty list here if (contentHeaderBody.bodySize == 0) { - _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + _contentBodies = new LinkedList<WeakReference<ContentChunk>>(); } final long arrivalTime = System.currentTimeMillis(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java new file mode 100644 index 0000000000..90aa7bb998 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java @@ -0,0 +1,57 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentChunkAdapter
+{
+ public static ContentBody toConentBody(ContentChunk contentBodyChunk)
+ {
+ return new ContentBody(contentBodyChunk.getData());
+ }
+
+ public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
+ {
+ return new ContentChunk() {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index f678cea630..8ccb0be0a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -31,10 +31,12 @@ import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; /** * A simple message store that stores the messages in a threadsafe structure in memory. @@ -49,7 +51,7 @@ public class MemoryMessageStore implements MessageStore protected ConcurrentMap<Long, MessageMetaData> _metaDataMap; - protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap; + protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); @@ -57,7 +59,7 @@ public class MemoryMessageStore implements MessageStore { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) @@ -65,7 +67,7 @@ public class MemoryMessageStore implements MessageStore int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); _log.info("Using capacity " + hashtableCapacity + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); } public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception @@ -97,6 +99,26 @@ public class MemoryMessageStore implements MessageStore _contentBodyMap.remove(messageId); } + public void createExchange(Exchange exchange) throws AMQException + { + + } + + public void removeExchange(Exchange exchange) throws AMQException + { + + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + + } + public void createQueue(AMQQueue queue) throws AMQException { // Not required to do anything @@ -147,10 +169,10 @@ public class MemoryMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { - List<ContentBody> bodyList = _contentBodyMap.get(messageId); + List<ContentChunk> bodyList = _contentBodyMap.get(messageId); if(bodyList == null && lastContentBody) { @@ -160,7 +182,7 @@ public class MemoryMessageStore implements MessageStore { if (bodyList == null) { - bodyList = new ArrayList<ContentBody>(); + bodyList = new ArrayList<ContentChunk>(); _contentBodyMap.put(messageId, bodyList); } @@ -179,9 +201,9 @@ public class MemoryMessageStore implements MessageStore return _metaDataMap.get(messageId); } - public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { - List<ContentBody> bodyList = _contentBodyMap.get(messageId); + List<ContentChunk> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java new file mode 100644 index 0000000000..6ee2fa784d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java @@ -0,0 +1,62 @@ +package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+public class MessagePublishInfoAdapter
+{
+ private final byte _majorVersion;
+ private final byte _minorVersion;
+ private final int _classId;
+ private final int _methodId;
+
+
+ public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
+ {
+ _majorVersion = majorVersion;
+ _minorVersion = minorVersion;
+ _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
+ _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
+ }
+
+ public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
+ {
+ return new BasicPublishBody(_majorVersion,
+ _minorVersion,
+ _classId,
+ _methodId,
+ pubInfo.getExchange(),
+ pubInfo.isImmediate(),
+ pubInfo.isMandatory(),
+ pubInfo.getRoutingKey(),
+ 0) ; // ticket
+ }
+
+ public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
+ {
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 7fa46eb1ca..21988d97a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.store; -import java.util.List; - import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; public interface MessageStore { @@ -51,6 +51,15 @@ public interface MessageStore void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; + void createExchange(Exchange exchange) throws AMQException; + + void removeExchange(Exchange exchange) throws AMQException; + + void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + + void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + + void createQueue(AMQQueue queue) throws AMQException; void removeQueue(AMQShortString name) throws AMQException; @@ -68,24 +77,17 @@ public interface MessageStore boolean inTran(StoreContext context); /** - * Recreate all queues that were persisted, including re-enqueuing of existing messages - * @return - * @throws AMQException - */ - List<AMQQueue> createQueues() throws AMQException; - - /** * Return a valid, currently unused message id. * @return a message id */ Long getNewMessageId(); - void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException; + void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException; void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; - ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; + ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index abbd18eff4..e09ce9326c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -99,9 +99,11 @@ public class VirtualHost _queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
_messageStore = store;
+
+ _exchangeRegistry.initialise();
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -117,10 +119,12 @@ public class VirtualHost _queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
initialiseMessageStore(hostConfig);
+ _exchangeRegistry.initialise();
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b08a97bc84..c1e5c8b555 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1156,6 +1156,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + + public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException + { + AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + null, // arguments + autoDelete, // autoDelete + durable, // durable + exclusive, // exclusive + false, // nowait + false, // passive + name, // queue + getTicket()); // ticket + + getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + + } + + + public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException + { + // TODO: Be aware of possible changes to parameter order as versions change. + AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + arguments, // arguments + exchangeName, // exchange + false, // nowait + queueName, // queue + routingKey, // routingKey + getTicket()); // ticket + + + getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + } + /** * Declare the queue. * diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java index 51c57efdae..7e5563460f 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java @@ -22,13 +22,19 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.MethodConverter_8_0; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.queue.AMQMessage; import java.util.Iterator; public class SimpleSendable implements Sendable { + + //todo fixme - remove 0-8 hard coding + ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0(); + private final AMQMessage _message; public SimpleSendable(AMQMessage message) @@ -38,12 +44,12 @@ public class SimpleSendable implements Sendable public void send(int channel, Member member) throws AMQException { - member.send(new AMQFrame(channel, _message.getPublishBody())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo()))); member.send(new AMQFrame(channel, _message.getContentHeaderBody())); - Iterator<ContentBody> it = _message.getContentBodyIterator(); + Iterator<ContentChunk> it = _message.getContentBodyIterator(); while (it.hasNext()) { - member.send(new AMQFrame(channel, it.next())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next()))); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index d0a64c7d6f..2a83d65ae5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -31,8 +31,6 @@ import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.concurrent.Executor; - /** * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does * not require all the functionality currently in AMQQueue. @@ -81,8 +79,11 @@ public class RemoteQueueProxy extends AMQQueue void relay(AMQMessage msg) throws AMQException { - BasicPublishBody publish = msg.getPublishBody(); - publish.immediate = false; //can't as yet handle the immediate flag in a cluster + // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object + // if cluster can handle immediate then it should wrap the wrapper... + +// BasicPublishBody publish = msg.getMessagePublishInfo(); +// publish.immediate = false; //can't as yet handle the immediate flag in a cluster // send this on to the broker for which it is acting as proxy: _groupMgr.send(_target, new SimpleSendable(msg)); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index c35fc0a6c4..be38695384 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -96,6 +96,8 @@ public class ContentBody extends AMQBody } } + + public static AMQFrame createAMQFrame(int channelId, ContentBody body) { final AMQFrame frame = new AMQFrame(channelId, body); diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 4e3768e4d4..f94cd4934c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -641,8 +641,7 @@ public class EncodingUtils public static void writeTimestamp(ByteBuffer buffer, long timestamp) { - writeUnsignedInteger(buffer, 0/*timestamp msb*/); - writeUnsignedInteger(buffer, timestamp); + writeLong(buffer, timestamp); } public static boolean[] readBooleans(ByteBuffer buffer) @@ -765,8 +764,8 @@ public class EncodingUtils public static long readTimestamp(ByteBuffer buffer) { // Discard msb from AMQ timestamp - buffer.getUnsignedInt(); - return buffer.getUnsignedInt(); + //buffer.getUnsignedInt(); + return buffer.getLong(); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java new file mode 100644 index 0000000000..dd93cc97fa --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java @@ -0,0 +1,104 @@ +package org.apache.qpid.framing;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
+ _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody body = (BasicPublishBody) methodBody;
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBody(getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ _basicPublishClassId,
+ _basicPublishMethodId,
+ info.getExchange(),
+ info.isImmediate(),
+ info.isMandatory(),
+ info.getRoutingKey(),
+ 0) ; // ticket
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java index 1df62c7b1b..ec371453aa 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -20,6 +20,8 @@ */
package org.apache.qpid.framing;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +38,53 @@ public class VersionSpecificRegistry private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+ private ProtocolVersionMethodConverter _protocolVersionConverter;
+
public VersionSpecificRegistry(byte major, byte minor)
{
_protocolMajorVersion = major;
_protocolMinorVersion = minor;
+
+ _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+ }
+
+ private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion)
+ {
+ try
+ {
+ Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+ (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion);
+ return versionMethodConverterClass.newInstance();
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+ if(protocolMinorVersion != 0)
+ {
+ protocolMinorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else if (protocolMajorVersion != 0)
+ {
+ protocolMajorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else
+ {
+ return null;
+ }
+
+
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
}
public byte getProtocolMajorVersion()
@@ -138,4 +183,14 @@ public class VersionSpecificRegistry }
+
+ public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolVersionConverter;
+ }
+
+ public void configure()
+ {
+ _protocolVersionConverter.configure();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java new file mode 100644 index 0000000000..5490d482a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java @@ -0,0 +1,26 @@ +package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private final byte _protocolMajorVersion;
+
+
+ private final byte _protocolMinorVersion;
+
+ public AbstractMethodConverter(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+
+ public final byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public final byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java new file mode 100644 index 0000000000..6312e478a8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java @@ -0,0 +1,32 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+ int getSize();
+ ByteBuffer getData();
+
+ void reduceToFit();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java new file mode 100644 index 0000000000..706499c1b0 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -0,0 +1,36 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+ public AMQShortString getExchange();
+
+ public boolean isImmediate();
+
+ public boolean isMandatory();
+
+ public AMQShortString getRoutingKey();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java new file mode 100644 index 0000000000..c9e15f18e3 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java new file mode 100644 index 0000000000..52e82cdf07 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+ AMQBody convertToBody(ContentChunk contentBody);
+ ContentChunk convertToContentChunk(AMQBody body);
+
+ void configure();
+}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index ffbdf730a9..0f706ac553 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -154,7 +154,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase public void testSetGetTimestamp() { - long timestamp = 999999999; + long timestamp = System.currentTimeMillis(); _testProperties.setTimestamp(timestamp); assertEquals(timestamp, _testProperties.getTimestamp()); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 10f5cd5667..9fcd88b1a8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.ack; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -103,16 +105,32 @@ public class TxAckTest extends TestCase for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; - // TODO: fix hardcoded protocol version data - TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), txnContext); + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; @@ -174,7 +192,7 @@ public class TxAckTest extends TestCase private final long _tag; private int _count; - TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) + TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext) { super(messageId, publishBody, txnContext); _tag = tag; diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index a9d7299bec..6beeb92053 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -149,15 +150,97 @@ public class AbstractHeadersExchangeTestBase extends TestCase return headers; } - static BasicPublishBody getPublishRequest(String id) + + static final class MessagePublishInfoImpl implements MessagePublishInfo { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null,false,false,new AMQShortString(id),0); - + private AMQShortString _exchange; + private boolean _immediate; + private boolean _mandatory; + private AMQShortString _routingKey; + + + public MessagePublishInfoImpl(AMQShortString routingKey) + { + _routingKey = routingKey; + } + + public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public boolean isImmediate() + { + return _immediate; + + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + + public void setImmediate(boolean immediate) + { + _immediate = immediate; + } + + public void setMandatory(boolean mandatory) + { + _mandatory = mandatory; + } + + public void setRoutingKey(AMQShortString routingKey) + { + _routingKey = routingKey; + } + } + + static MessagePublishInfo getPublishRequest(final String id) + { + MessagePublishInfo request = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString(id); + } + }; + return request; } @@ -221,7 +304,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase this(getPublishRequest(id), getContentHeader(headers), null); } - private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException + private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException { super(_messageStore.getNewMessageId(), publish, _txnContext, header); } @@ -265,7 +348,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase { try { - return getPublishBody().routingKey; + return getMessagePublishInfo().getRoutingKey(); } catch (AMQException e) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 70da7d1692..eca642b556 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; import org.apache.qpid.framing.BasicPublishBody; @@ -55,13 +54,13 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase Message m7 = new Message("Message7", "XXXXX"); - BasicPublishBody pb7 = m7.getPublishBody(); - pb7.mandatory = true; + MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); + pb7.setMandatory(true); routeAndTest(m7,true); Message m8 = new Message("Message8", "F0000"); - BasicPublishBody pb8 = m8.getPublishBody(); - pb8.mandatory = true; + MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); + pb8.setMandatory(true); routeAndTest(m8,false,q1); @@ -88,10 +87,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase bindDefault("F0000"); Message m1 = new Message("Message1", "XXXXX"); Message m2 = new Message("Message2", "F0000"); - BasicPublishBody pb1 = m1.getPublishBody(); - pb1.mandatory = true; - BasicPublishBody pb2 = m2.getPublishBody(); - pb2.mandatory = true; + MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); + pb1.setMandatory(true); + MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); + pb2.setMandatory(true); routeAndTest(m1,true); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index c35d38e4ab..2d0315d7f5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -22,6 +22,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -164,20 +165,32 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 93050af2b7..ae2209c629 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.UnacknowledgedMessage; @@ -98,15 +99,29 @@ public class AckTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publishBody = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - new AMQShortString("someExchange"), - false, - false, - new AMQShortString("rk"), - 0); + MessagePublishInfo publishBody = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return new AMQShortString("someExchange"); + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("rk"); + } + }; AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); if (persistent) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index cf5baa77bd..03a56df487 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; @@ -57,20 +59,32 @@ class MessageTestHelper extends TestCase return message(false); } - AMQMessage message(boolean immediate) throws AMQException + AMQMessage message(final boolean immediate) throws AMQException { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - immediate, - false, - null, - 0); - + MessagePublishInfo publish = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, new ContentHeaderBody()); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 89889ca017..6ffa3e0e02 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -24,9 +24,12 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -56,6 +59,26 @@ public class SkeletonMessageStore implements MessageStore { } + public void createExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeExchange(Exchange exchange) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void createQueue(AMQQueue queue) throws AMQException { } @@ -87,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { } @@ -102,7 +125,7 @@ public class SkeletonMessageStore implements MessageStore return null; } - public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException + public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException { return null; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index 6eacd5168f..2f0eaac29a 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -50,16 +52,32 @@ public class TestReferenceCounting extends TestCase public void testMessageGetsRemoved() throws AMQException { createPersistentContentHeader(); - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); @@ -81,16 +99,33 @@ public class TestReferenceCounting extends TestCase public void testMessageRemains() throws AMQException { - // TODO: fix hardcoded protocol version data - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, - (byte)0, - BasicPublishBody.getClazz((byte)8,(byte)0), - BasicPublishBody.getMethod((byte)8,(byte)0), - null, - false, - false, - null, - 0), + + MessagePublishInfo info = new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + }; + + AMQMessage message = new AMQMessage(_store.getNewMessageId(), + info, new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9a649421dd..79d428fee8 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,7 +36,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); } public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() @@ -43,7 +44,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore return _metaDataMap; } - public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap() + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { return _contentBodyMap; } |