summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java115
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java57
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java35
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentBody.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java104
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java55
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java36
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java29
-rw-r--r--java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java103
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java17
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java39
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java27
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java75
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java5
50 files changed, 1076 insertions, 306 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 0879b77f37..7271bd6e43 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -33,17 +33,16 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -202,9 +201,11 @@ public class AMQChannel
}
- public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
{
- _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), publishBody,
+
+
+ _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
// TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
@@ -252,7 +253,7 @@ public class AMQChannel
// returns true iff the message was delivered (i.e. if all data was
// received
- if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
+ if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody)))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index af38a9abe5..a35a46f305 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -198,12 +198,18 @@ public class VirtualHostConfiguration
for(Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- exchange.registerQueue(routingKey, queue, null);
+
+
+ queue.bind(routingKey, null, exchange);
- queue.bind(routingKey, exchange);
_logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
}
+
+ if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+ {
+ queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 9b9765524c..4774383642 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -28,6 +28,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
@@ -39,23 +41,32 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
private Exchange _defaultExchange;
+ private VirtualHost _host;
- public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
+ public DefaultExchangeRegistry(VirtualHost host)
{
//create 'standard' exchanges:
- try
- {
- new ExchangeInitialiser().initialise(exchangeFactory, this);
- }
- catch(AMQException e)
- {
- _log.error("Failed to initialise exchanges: ", e);
- }
+ _host = host;
+
}
- public void registerExchange(Exchange exchange)
+ public void initialise() throws AMQException
+ {
+ new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _host.getMessageStore();
+ }
+
+ public void registerExchange(Exchange exchange) throws AMQException
{
_exchangeMap.put(exchange.getName(), exchange);
+ if(exchange.isDurable())
+ {
+ getMessageStore().createExchange(exchange);
+ }
}
public void setDefaultExchange(Exchange exchange)
@@ -74,6 +85,10 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
Exchange e = _exchangeMap.remove(name);
if (e != null)
{
+ if(e.isDurable())
+ {
+ getMessageStore().removeExchange(e);
+ }
e.close();
}
else
@@ -102,7 +117,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
*/
public void routeContent(AMQMessage payload) throws AMQException
{
- final AMQShortString exchange = payload.getPublishBody().exchange;
+ final AMQShortString exchange = payload.getMessagePublishInfo().getExchange();
final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
// the BasicPublish being received (where the exchange is validated) and the final
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 93e9ff2c5b..4d66e37628 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -43,6 +43,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -126,8 +127,7 @@ public class DestNameExchange extends AbstractExchange
try
{
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), DestNameExchange.this);
+ queue.bind(new AMQShortString(binding), null, DestNameExchange.this);
}
catch (AMQException ex)
{
@@ -170,7 +170,7 @@ public class DestNameExchange extends AbstractExchange
}
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -184,13 +184,13 @@ public class DestNameExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- final BasicPublishBody publishBody = payload.getPublishBody();
- final AMQShortString routingKey = publishBody.routingKey;
+ final MessagePublishInfo info = payload.getMessagePublishInfo();
+ final AMQShortString routingKey = info.getRoutingKey();
final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (publishBody.mandatory)
+ if (info.isMandatory())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 636b4558c6..8a50e93bf9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -45,6 +45,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -125,8 +126,7 @@ public class DestWildExchange extends AbstractExchange
try
{
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), DestWildExchange.this);
+ queue.bind(new AMQShortString(binding), null, DestWildExchange.this);
}
catch (AMQException ex)
{
@@ -168,9 +168,9 @@ public class DestWildExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- BasicPublishBody publishBody = payload.getPublishBody();
+ MessagePublishInfo info = payload.getMessagePublishInfo();
- final AMQShortString routingKey = publishBody.routingKey;
+ final AMQShortString routingKey = info.getRoutingKey();
List<AMQQueue> queues = _routingKey2queues.get(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
@@ -221,7 +221,7 @@ public class DestWildExchange extends AbstractExchange
return !_routingKey2queues.isEmpty();
}
- public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 7702e8b315..a5f77cc2a4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -47,7 +47,7 @@ public interface Exchange
void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException;
+ void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
void route(AMQMessage message) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index a022b86299..d3a466565f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
public interface ExchangeRegistry extends MessageRouter
{
- void registerExchange(Exchange exchange);
+ void registerExchange(Exchange exchange) throws AMQException;
/**
* Unregister an exchange
@@ -42,4 +42,6 @@ public interface ExchangeRegistry extends MessageRouter
void setDefaultExchange(Exchange exchange);
Exchange getDefaultExchange();
+
+ void initialise() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 01a1b0bbc8..095fd2b7e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -19,8 +19,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -98,9 +98,8 @@ public class FanoutExchange extends AbstractExchange
}
try
- {
- registerQueue(new AMQShortString(binding), queue, null);
- queue.bind(new AMQShortString(binding), FanoutExchange.this);
+ {
+ queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
{
@@ -144,10 +143,10 @@ public class FanoutExchange extends AbstractExchange
}
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
- assert routingKey != null;
+
if (!_queues.remove(queue))
{
@@ -158,12 +157,12 @@ public class FanoutExchange extends AbstractExchange
public void route(AMQMessage payload) throws AMQException
{
- final BasicPublishBody publishBody = payload.getPublishBody();
- final AMQShortString routingKey = publishBody.routingKey;
+ final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
+ final AMQShortString routingKey = publishInfo.getRoutingKey();
if (_queues == null || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishBody.mandatory)
+ if (publishInfo.isMandatory())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index f3dc8131b3..204e2f9f93 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -200,10 +200,10 @@ public class HeadersExchange extends AbstractExchange
_bindings.add(new Registration(new HeadersBinding(args), queue));
}
- public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
_logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
- _bindings.remove(new Registration(null, queue));
+ _bindings.remove(new Registration(new HeadersBinding(args), queue));
}
public void route(AMQMessage payload) throws AMQException
@@ -232,7 +232,7 @@ public class HeadersExchange extends AbstractExchange
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getPublishBody().mandatory)
+ if (payload.getMessagePublishInfo().isMandatory())
{
throw new NoRouteException(msg, payload);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 3798918428..67ade0a744 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -85,7 +86,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.setPublishFrame(body, session);
+ MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+ channel.setPublishFrame(info, session);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 8c722d33cc..4dc67b1970 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -98,8 +98,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
try
- {
- exch.registerQueue(body.routingKey, queue, body.arguments);
+ {
+ queue.bind(body.routingKey, body.arguments, exch);
}
catch (AMQInvalidRoutingKeyException rke)
{
@@ -109,7 +109,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
}
- queue.bind(body.routingKey, exch);
+
if (_log.isInfoEnabled())
{
_log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index a35cb9f7d3..8b2467f47d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -108,8 +108,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (autoRegister)
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- defaultExchange.registerQueue(body.queue, queue, null);
- queue.bind(body.queue, defaultExchange);
+
+ queue.bind(body.queue, null, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
index fa8f13127a..29d55ce763 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
@@ -41,6 +41,9 @@ public class ExchangeInitialiser
private void define(ExchangeRegistry r, ExchangeFactory f,
AMQShortString name, AMQShortString type) throws AMQException
{
- r.registerExchange(f.createExchange(name, type, true, false, 0));
+ if(r.getExchange(name)== null)
+ {
+ r.registerExchange(f.createExchange(name, type, true, false, 0));
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index be81734ae4..c60c22c4e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -27,20 +27,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicGetOkBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.SmallCompositeAMQDataBlock;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
/**
@@ -98,10 +91,12 @@ public class AMQMessage
private int _channel;
private int _index = -1;
+ private AMQProtocolSession _protocolSession;
- private BodyFrameIterator(int channel)
+ private BodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
_channel = channel;
+ _protocolSession = protocolSession;
}
public boolean hasNext()
@@ -121,8 +116,9 @@ public class AMQMessage
{
try
{
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
- return ContentBody.createAMQFrame(_channel, cb);
+
+ AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+ return new AMQFrame(_channel, cb);
}
catch (AMQException e)
{
@@ -132,6 +128,11 @@ public class AMQMessage
}
+ private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolSession.getRegistry().getProtocolVersionMethodConverter();
+ }
+
public void remove()
{
throw new UnsupportedOperationException();
@@ -143,7 +144,7 @@ public class AMQMessage
return _txnContext.getStoreContext();
}
- private class BodyContentIterator implements Iterator<ContentBody>
+ private class BodyContentIterator implements Iterator<ContentChunk>
{
private int _index = -1;
@@ -161,11 +162,11 @@ public class AMQMessage
}
}
- public ContentBody next()
+ public ContentChunk next()
{
try
{
- return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
}
catch (AMQException e)
{
@@ -179,13 +180,13 @@ public class AMQMessage
}
}
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
- _immediate = publishBody.immediate;
- _transientMessageData.setPublishBody(publishBody);
+ _immediate = info.isImmediate();
+ _transientMessageData.setMessagePublishInfo(info);
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
@@ -215,14 +216,14 @@ public class AMQMessage
* Used in testing only. This allows the passing of the content header immediately
* on construction.
* @param messageId
- * @param publishBody
+ * @param info
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
{
- this(messageId, publishBody, txnContext);
+ this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
}
@@ -230,23 +231,23 @@ public class AMQMessage
* Used in testing only. This allows the passing of the content header and some body fragments on
* construction.
* @param messageId
- * @param publishBody
+ * @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
* @throws AMQException
*/
- public AMQMessage(Long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
+ List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
MessageHandleFactory messageHandleFactory) throws AMQException
{
- this(messageId, publishBody, txnContext, contentHeader);
+ this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
routingComplete(messageStore, storeContext, messageHandleFactory);
- for (ContentBody cb : contentBodies)
+ for (ContentChunk cb : contentBodies)
{
addContentBodyFrame(storeContext, cb);
}
@@ -261,12 +262,12 @@ public class AMQMessage
_transientMessageData = msg._transientMessageData;
}
- public Iterator<AMQDataBlock> getBodyFrameIterator(int channel)
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
- return new BodyFrameIterator(channel);
+ return new BodyFrameIterator(protocolSession, channel);
}
- public Iterator<ContentBody> getContentBodyIterator()
+ public Iterator<ContentChunk> getContentBodyIterator()
{
return new BodyContentIterator();
}
@@ -311,11 +312,11 @@ public class AMQMessage
}
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
{
- _transientMessageData.addBodyLength(contentBody.getSize());
+ _transientMessageData.addBodyLength(contentChunk.getSize());
final boolean allContentReceived = isAllContentReceived();
- _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived);
+ _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived);
if (allContentReceived)
{
deliver(storeContext);
@@ -502,16 +503,16 @@ public class AMQMessage
}
}
- public BasicPublishBody getPublishBody() throws AMQException
+ public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
- BasicPublishBody pb;
+ MessagePublishInfo pb;
if (_transientMessageData != null)
{
- pb = _transientMessageData.getPublishBody();
+ pb = _transientMessageData.getMessagePublishInfo();
}
else
{
- pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
}
return pb;
}
@@ -554,7 +555,7 @@ public class AMQMessage
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(),
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
_transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
@@ -598,9 +599,9 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
- AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
protocolSession.writeFrame(compositeBlock);
@@ -610,8 +611,8 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
- protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -641,9 +642,9 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
- AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
protocolSession.writeFrame(compositeBlock);
@@ -653,8 +654,8 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
- protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -667,10 +668,10 @@ public class AMQMessage
private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- BasicPublishBody pb = getPublishBody();
+ MessagePublishInfo pb = getMessagePublishInfo();
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
- pb.routingKey);
+ deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
@@ -680,14 +681,14 @@ public class AMQMessage
private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
throws AMQException
{
- BasicPublishBody pb = getPublishBody();
+ MessagePublishInfo pb = getMessagePublishInfo();
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.exchange,
+ deliveryTag, pb.getExchange(),
queueSize,
_messageHandle.isRedelivered(),
- pb.routingKey);
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,9 +700,9 @@ public class AMQMessage
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- getPublishBody().exchange,
+ getMessagePublishInfo().getExchange(),
replyCode, replyText,
- getPublishBody().routingKey);
+ getMessagePublishInfo().getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
returnFrame.writePayload(buf);
buf.flip();
@@ -716,7 +717,7 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
+ Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
@@ -767,7 +768,7 @@ public class AMQMessage
public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
- transientMessageData.setPublishBody(getPublishBody());
+ transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
_transientMessageData = transientMessageData;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index 210c9f01a8..ede55b3bbf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
/**
* A pluggable way of getting message data. Implementations can provide intelligent caching for example or
@@ -53,11 +53,11 @@ public interface AMQMessageHandle
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
- void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
+ MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException;
boolean isRedelivered();
@@ -65,7 +65,7 @@ public interface AMQMessageHandle
boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
- void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 557d82359f..e9ebe6c541 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -441,9 +441,24 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.clearAllMessages(storeContext);
}
- public void bind(AMQShortString routingKey, Exchange exchange)
+ public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
- _bindings.addBinding(routingKey, exchange);
+ exchange.registerQueue(routingKey, this, arguments);
+ if(isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+ }
+ _bindings.addBinding(routingKey, arguments, exchange);
+ }
+
+ public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
+ {
+ exchange.deregisterQueue(routingKey, this, arguments);
+ if(isDurable() && exchange.isDurable())
+ {
+ _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+ }
+ _bindings.remove(routingKey, arguments, exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index c0b22b541b..4fd89f39da 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -44,6 +44,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
@@ -322,16 +323,16 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
- Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
+ Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
while (cBodies.hasNext())
{
- ContentBody body = cBodies.next();
+ ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
if (body.getSize() != 0)
{
- ByteBuffer slice = body.payload.slice();
+ ByteBuffer slice = body.getData().slice();
for (int j = 0; j < slice.limit(); j++)
{
msgContent.add(slice.get());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index d8bc19fcea..0fc8753a87 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -114,11 +114,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
- Iterator<ContentBody> it = msg.getContentBodyIterator();
+ Iterator<ContentChunk> it = msg.getContentBodyIterator();
while (it.hasNext())
{
- ContentBody cb = it.next();
- cb.reduceBufferToFit();
+ ContentChunk cb = it.next();
+ cb.reduceToFit();
}
}
@@ -493,7 +493,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
}
- if (!msg.getPublishBody().immediate)
+ if (!msg.getMessagePublishInfo().isImmediate())
{
addMessageToQueue(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
index d15cca72d2..a8247aa2db 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
/**
@@ -35,42 +36,55 @@ import org.apache.qpid.server.exchange.Exchange;
*/
class ExchangeBindings
{
+ private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
+
static class ExchangeBinding
{
- private final Exchange exchange;
- private final AMQShortString routingKey;
+ private final Exchange _exchange;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
ExchangeBinding(AMQShortString routingKey, Exchange exchange)
{
- this.routingKey = routingKey;
- this.exchange = exchange;
+ this(routingKey, exchange,EMPTY_ARGUMENTS);
+ }
+
+ ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
+ {
+ _routingKey = routingKey;
+ _exchange = exchange;
+ _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
}
void unbind(AMQQueue queue) throws AMQException
{
- exchange.deregisterQueue(routingKey, queue);
+ _exchange.deregisterQueue(_routingKey, queue, _arguments);
}
public Exchange getExchange()
{
- return exchange;
+ return _exchange;
}
public AMQShortString getRoutingKey()
{
- return routingKey;
+ return _routingKey;
}
public int hashCode()
{
- return (exchange == null ? 0 : exchange.hashCode()) + (routingKey == null ? 0 : routingKey.hashCode());
+ return (_exchange == null ? 0 : _exchange.hashCode())
+ + (_routingKey == null ? 0 : _routingKey.hashCode())
+ + (_arguments == null ? 0 : _arguments.hashCode());
}
public boolean equals(Object o)
{
if (!(o instanceof ExchangeBinding)) return false;
ExchangeBinding eb = (ExchangeBinding) o;
- return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+ return _exchange.equals(eb._exchange)
+ && _routingKey.equals(eb._routingKey)
+ && _arguments.equals(eb._arguments);
}
}
@@ -88,11 +102,18 @@ class ExchangeBindings
* are being tracked by the instance has been bound to the exchange
* @param exchange the exchange bound to
*/
- void addBinding(AMQShortString routingKey, Exchange exchange)
+ void addBinding(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+ {
+ _bindings.add(new ExchangeBinding(routingKey, exchange, arguments ));
+ }
+
+
+ public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
{
- _bindings.add(new ExchangeBinding(routingKey, exchange));
+ _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments ));
}
+
/**
* Deregisters this queue from any exchange it has been bound to
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 79f875ce1e..630186991b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -25,9 +25,10 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -37,9 +38,9 @@ public class InMemoryMessageHandle implements AMQMessageHandle
private ContentHeaderBody _contentHeaderBody;
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
- private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
+ private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
private boolean _redelivered;
@@ -64,7 +65,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -74,15 +75,15 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _contentBodies.get(index);
}
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody)
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
throws AMQException
{
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
{
- return _publishBody;
+ return _messagePublishInfo;
}
public boolean isRedelivered()
@@ -106,15 +107,15 @@ public class InMemoryMessageHandle implements AMQMessageHandle
/**
* This is called when all the content has been received.
- * @param publishBody
+ * @param messagePublishInfo
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
_arrivalTime = System.currentTimeMillis();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
index a66a85e54d..285f05fb20 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
@@ -16,8 +16,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
/**
* Encapsulates a publish body and a content header. In the context of the message store these are treated as a
@@ -25,7 +25,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
*/
public class MessageMetaData
{
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
@@ -33,15 +33,15 @@ public class MessageMetaData
private long _arrivalTime;
- public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
{
this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
}
- public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
{
_contentHeaderBody = contentHeaderBody;
- _publishBody = publishBody;
+ _messagePublishInfo = publishBody;
_contentChunkCount = contentChunkCount;
_arrivalTime = arrivalTime;
}
@@ -66,14 +66,14 @@ public class MessageMetaData
_contentHeaderBody = contentHeaderBody;
}
- public BasicPublishBody getPublishBody()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _publishBody;
+ return _messagePublishInfo;
}
- public void setPublishBody(BasicPublishBody publishBody)
+ public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
}
public long getArrivalTime()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
index 9f3d64f77e..7c8064789e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
@@ -21,8 +21,8 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
/**
@@ -40,7 +40,7 @@ public class TransientMessageData
* Stored temporarily until the header has been received at which point it is used when
* constructing the handle
*/
- private BasicPublishBody _publishBody;
+ private MessagePublishInfo _messagePublishInfo;
/**
* Also stored temporarily.
@@ -59,14 +59,14 @@ public class TransientMessageData
*/
private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
- public BasicPublishBody getPublishBody()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _publishBody;
+ return _messagePublishInfo;
}
- public void setPublishBody(BasicPublishBody publishBody)
+ public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
{
- _publishBody = publishBody;
+ _messagePublishInfo = messagePublishInfo;
}
public List<AMQQueue> getDestinationQueues()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 670d895950..373a64e2eb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -27,9 +27,9 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -40,9 +40,9 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
{
private WeakReference<ContentHeaderBody> _contentHeaderBody;
- private WeakReference<BasicPublishBody> _publishBody;
+ private WeakReference<MessagePublishInfo> _messagePublishInfo;
- private List<WeakReference<ContentBody>> _contentBodies;
+ private List<WeakReference<ContentChunk>> _contentBodies;
private boolean _redelivered;
@@ -79,7 +79,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
{
_arrivalTime = mmd.getArrivalTime();
_contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
- _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
}
public int getBodyCount(StoreContext context, Long messageId) throws AMQException
@@ -88,10 +88,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
{
MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
int chunkCount = mmd.getContentChunkCount();
- _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+ _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
{
- _contentBodies.add(new WeakReference<ContentBody>(null));
+ _contentBodies.add(new WeakReference<ContentChunk>(null));
}
}
return _contentBodies.size();
@@ -102,19 +102,19 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
(_contentBodies.size() - 1));
}
- WeakReference<ContentBody> wr = _contentBodies.get(index);
- ContentBody cb = wr.get();
+ WeakReference<ContentChunk> wr = _contentBodies.get(index);
+ ContentChunk cb = wr.get();
if (cb == null)
{
cb = _messageStore.getContentBodyChunk(context, messageId, index);
- _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+ _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
}
return cb;
}
@@ -124,35 +124,36 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
*
* @param storeContext
* @param messageId
- * @param contentBody
+ * @param contentChunk
* @param isLastContentBody
* @throws AMQException
*/
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
{
if (_contentBodies == null && isLastContentBody)
{
- _contentBodies = new ArrayList<WeakReference<ContentBody>>(1);
+ _contentBodies = new ArrayList<WeakReference<ContentChunk>>(1);
}
else
{
if (_contentBodies == null)
{
- _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
}
}
- _contentBodies.add(new WeakReference<ContentBody>(contentBody));
- _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
+ _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1,
+ contentChunk, isLastContentBody);
}
- public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
{
- BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
+ MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
if (bpb == null)
{
MessageMetaData mmd = loadMessageMetaData(context, messageId);
- bpb = mmd.getPublishBody();
+ bpb = mmd.getMessagePublishInfo();
}
return bpb;
}
@@ -182,7 +183,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -190,7 +191,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
// create en empty list here
if (contentHeaderBody.bodySize == 0)
{
- _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ _contentBodies = new LinkedList<WeakReference<ContentChunk>>();
}
final long arrivalTime = System.currentTimeMillis();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
new file mode 100644
index 0000000000..90aa7bb998
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/ContentChunkAdapter.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class ContentChunkAdapter
+{
+ public static ContentBody toConentBody(ContentChunk contentBodyChunk)
+ {
+ return new ContentBody(contentBodyChunk.getData());
+ }
+
+ public static ContentChunk toConentChunk(final ContentBody contentBodyChunk)
+ {
+ return new ContentChunk() {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index f678cea630..8ccb0be0a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -31,10 +31,12 @@ import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
/**
* A simple message store that stores the messages in a threadsafe structure in memory.
@@ -49,7 +51,7 @@ public class MemoryMessageStore implements MessageStore
protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
- protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
+ protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
@@ -57,7 +59,7 @@ public class MemoryMessageStore implements MessageStore
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
@@ -65,7 +67,7 @@ public class MemoryMessageStore implements MessageStore
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
_log.info("Using capacity " + hashtableCapacity + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
}
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
@@ -97,6 +99,26 @@ public class MemoryMessageStore implements MessageStore
_contentBodyMap.remove(messageId);
}
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+
+ }
+
public void createQueue(AMQQueue queue) throws AMQException
{
// Not required to do anything
@@ -147,10 +169,10 @@ public class MemoryMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody)
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
throws AMQException
{
- List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
if(bodyList == null && lastContentBody)
{
@@ -160,7 +182,7 @@ public class MemoryMessageStore implements MessageStore
{
if (bodyList == null)
{
- bodyList = new ArrayList<ContentBody>();
+ bodyList = new ArrayList<ContentChunk>();
_contentBodyMap.put(messageId, bodyList);
}
@@ -179,9 +201,9 @@ public class MemoryMessageStore implements MessageStore
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
- List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
new file mode 100644
index 0000000000..6ee2fa784d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessagePublishInfoAdapter.java
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+public class MessagePublishInfoAdapter
+{
+ private final byte _majorVersion;
+ private final byte _minorVersion;
+ private final int _classId;
+ private final int _methodId;
+
+
+ public MessagePublishInfoAdapter(byte majorVersion, byte minorVersion)
+ {
+ _majorVersion = majorVersion;
+ _minorVersion = minorVersion;
+ _classId = BasicPublishBody.getClazz(majorVersion,minorVersion);
+ _methodId = BasicPublishBody.getMethod(majorVersion,minorVersion);
+ }
+
+ public BasicPublishBody toMethodBody(MessagePublishInfo pubInfo)
+ {
+ return new BasicPublishBody(_majorVersion,
+ _minorVersion,
+ _classId,
+ _methodId,
+ pubInfo.getExchange(),
+ pubInfo.isImmediate(),
+ pubInfo.isMandatory(),
+ pubInfo.getRoutingKey(),
+ 0) ; // ticket
+ }
+
+ public MessagePublishInfo toMessagePublishInfo(final BasicPublishBody body)
+ {
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 7fa46eb1ca..21988d97a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store;
-import java.util.List;
-
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
public interface MessageStore
{
@@ -51,6 +51,15 @@ public interface MessageStore
void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
+ void createExchange(Exchange exchange) throws AMQException;
+
+ void removeExchange(Exchange exchange) throws AMQException;
+
+ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(AMQShortString name) throws AMQException;
@@ -68,24 +77,17 @@ public interface MessageStore
boolean inTran(StoreContext context);
/**
- * Recreate all queues that were persisted, including re-enqueuing of existing messages
- * @return
- * @throws AMQException
- */
- List<AMQQueue> createQueues() throws AMQException;
-
- /**
* Return a valid, currently unused message id.
* @return a message id
*/
Long getNewMessageId();
- void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException;
+ void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException;
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+ ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index abbd18eff4..e09ce9326c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -99,9 +99,11 @@ public class VirtualHost
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
_messageStore = store;
+
+ _exchangeRegistry.initialise();
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -117,10 +119,12 @@ public class VirtualHost
_queueRegistry = new DefaultQueueRegistry(this);
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory);
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
initialiseMessageStore(hostConfig);
+ _exchangeRegistry.initialise();
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b08a97bc84..c1e5c8b555 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1156,6 +1156,41 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+
+ public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
+ {
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+ }
+
+
+ public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
+ {
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
+
+
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ }
+
/**
* Declare the queue.
*
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
index 51c57efdae..7e5563460f 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
@@ -22,13 +22,19 @@ package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.MethodConverter_8_0;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.queue.AMQMessage;
import java.util.Iterator;
public class SimpleSendable implements Sendable
{
+
+ //todo fixme - remove 0-8 hard coding
+ ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0();
+
private final AMQMessage _message;
public SimpleSendable(AMQMessage message)
@@ -38,12 +44,12 @@ public class SimpleSendable implements Sendable
public void send(int channel, Member member) throws AMQException
{
- member.send(new AMQFrame(channel, _message.getPublishBody()));
+ member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo())));
member.send(new AMQFrame(channel, _message.getContentHeaderBody()));
- Iterator<ContentBody> it = _message.getContentBodyIterator();
+ Iterator<ContentChunk> it = _message.getContentBodyIterator();
while (it.hasNext())
{
- member.send(new AMQFrame(channel, it.next()));
+ member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next())));
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index d0a64c7d6f..2a83d65ae5 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -31,8 +31,6 @@ import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.concurrent.Executor;
-
/**
* TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
* not require all the functionality currently in AMQQueue.
@@ -81,8 +79,11 @@ public class RemoteQueueProxy extends AMQQueue
void relay(AMQMessage msg) throws AMQException
{
- BasicPublishBody publish = msg.getPublishBody();
- publish.immediate = false; //can't as yet handle the immediate flag in a cluster
+ // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object
+ // if cluster can handle immediate then it should wrap the wrapper...
+
+// BasicPublishBody publish = msg.getMessagePublishInfo();
+// publish.immediate = false; //can't as yet handle the immediate flag in a cluster
// send this on to the broker for which it is acting as proxy:
_groupMgr.send(_target, new SimpleSendable(msg));
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index c35fc0a6c4..be38695384 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -96,6 +96,8 @@ public class ContentBody extends AMQBody
}
}
+
+
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
final AMQFrame frame = new AMQFrame(channelId, body);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index 4e3768e4d4..f94cd4934c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -641,8 +641,7 @@ public class EncodingUtils
public static void writeTimestamp(ByteBuffer buffer, long timestamp)
{
- writeUnsignedInteger(buffer, 0/*timestamp msb*/);
- writeUnsignedInteger(buffer, timestamp);
+ writeLong(buffer, timestamp);
}
public static boolean[] readBooleans(ByteBuffer buffer)
@@ -765,8 +764,8 @@ public class EncodingUtils
public static long readTimestamp(ByteBuffer buffer)
{
// Discard msb from AMQ timestamp
- buffer.getUnsignedInt();
- return buffer.getUnsignedInt();
+ //buffer.getUnsignedInt();
+ return buffer.getLong();
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
new file mode 100644
index 0000000000..dd93cc97fa
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
@@ -0,0 +1,104 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
+ _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody body = (BasicPublishBody) methodBody;
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBody(getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ _basicPublishClassId,
+ _basicPublishMethodId,
+ info.getExchange(),
+ info.isImmediate(),
+ info.isMandatory(),
+ info.getRoutingKey(),
+ 0) ; // ticket
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
index 1df62c7b1b..ec371453aa 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +38,53 @@ public class VersionSpecificRegistry
private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+ private ProtocolVersionMethodConverter _protocolVersionConverter;
+
public VersionSpecificRegistry(byte major, byte minor)
{
_protocolMajorVersion = major;
_protocolMinorVersion = minor;
+
+ _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+ }
+
+ private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion)
+ {
+ try
+ {
+ Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+ (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion);
+ return versionMethodConverterClass.newInstance();
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+ if(protocolMinorVersion != 0)
+ {
+ protocolMinorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else if (protocolMajorVersion != 0)
+ {
+ protocolMajorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else
+ {
+ return null;
+ }
+
+
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
}
public byte getProtocolMajorVersion()
@@ -138,4 +183,14 @@ public class VersionSpecificRegistry
}
+
+ public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolVersionConverter;
+ }
+
+ public void configure()
+ {
+ _protocolVersionConverter.configure();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
new file mode 100644
index 0000000000..5490d482a1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private final byte _protocolMajorVersion;
+
+
+ private final byte _protocolMinorVersion;
+
+ public AbstractMethodConverter(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+
+ public final byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public final byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
new file mode 100644
index 0000000000..6312e478a8
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+ int getSize();
+ ByteBuffer getData();
+
+ void reduceToFit();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
new file mode 100644
index 0000000000..706499c1b0
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+ public AMQShortString getExchange();
+
+ public boolean isImmediate();
+
+ public boolean isMandatory();
+
+ public AMQShortString getRoutingKey();
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
new file mode 100644
index 0000000000..c9e15f18e3
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
new file mode 100644
index 0000000000..52e82cdf07
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+ AMQBody convertToBody(ContentChunk contentBody);
+ ContentChunk convertToContentChunk(AMQBody body);
+
+ void configure();
+}
diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
index ffbdf730a9..0f706ac553 100644
--- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
+++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
@@ -154,7 +154,7 @@ public class BasicContentHeaderPropertiesTest extends TestCase
public void testSetGetTimestamp()
{
- long timestamp = 999999999;
+ long timestamp = System.currentTimeMillis();
_testProperties.setTimestamp(timestamp);
assertEquals(timestamp, _testProperties.getTimestamp());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 10f5cd5667..9fcd88b1a8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.ack;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -103,16 +105,32 @@ public class TxAckTest extends TestCase
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
- // TODO: fix hardcoded protocol version data
- TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0), txnContext);
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
_map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
@@ -174,7 +192,7 @@ public class TxAckTest extends TestCase
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
{
super(messageId, publishBody, txnContext);
_tag = tag;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index a9d7299bec..6beeb92053 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.exchange;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -149,15 +150,97 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return headers;
}
- static BasicPublishBody getPublishRequest(String id)
+
+ static final class MessagePublishInfoImpl implements MessagePublishInfo
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,false,false,new AMQShortString(id),0);
-
+ private AMQShortString _exchange;
+ private boolean _immediate;
+ private boolean _mandatory;
+ private AMQShortString _routingKey;
+
+
+ public MessagePublishInfoImpl(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+
+ public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void setImmediate(boolean immediate)
+ {
+ _immediate = immediate;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ _mandatory = mandatory;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+ }
+
+ static MessagePublishInfo getPublishRequest(final String id)
+ {
+ MessagePublishInfo request = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString(id);
+ }
+ };
+
return request;
}
@@ -221,7 +304,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
this(getPublishRequest(id), getContentHeader(headers), null);
}
- private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
{
super(_messageStore.getNewMessageId(), publish, _txnContext, header);
}
@@ -265,7 +348,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
try
{
- return getPublishBody().routingKey;
+ return getMessagePublishInfo().getRoutingKey();
}
catch (AMQException e)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 70da7d1692..eca642b556 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.framing.BasicPublishBody;
@@ -55,13 +54,13 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
Message m7 = new Message("Message7", "XXXXX");
- BasicPublishBody pb7 = m7.getPublishBody();
- pb7.mandatory = true;
+ MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
+ pb7.setMandatory(true);
routeAndTest(m7,true);
Message m8 = new Message("Message8", "F0000");
- BasicPublishBody pb8 = m8.getPublishBody();
- pb8.mandatory = true;
+ MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
+ pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -88,10 +87,10 @@ public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
bindDefault("F0000");
Message m1 = new Message("Message1", "XXXXX");
Message m2 = new Message("Message2", "F0000");
- BasicPublishBody pb1 = m1.getPublishBody();
- pb1.mandatory = true;
- BasicPublishBody pb2 = m2.getPublishBody();
- pb2.mandatory = true;
+ MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
+ pb1.setMandatory(true);
+ MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
+ pb2.setMandatory(true);
routeAndTest(m1,true);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index c35d38e4ab..2d0315d7f5 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -22,6 +22,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -164,20 +165,32 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 93050af2b7..ae2209c629 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
@@ -98,15 +99,29 @@ public class AckTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- new AMQShortString("someExchange"),
- false,
- false,
- new AMQShortString("rk"),
- 0);
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("someExchange");
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("rk");
+ }
+ };
AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
if (persistent)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index cf5baa77bd..03a56df487 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -57,20 +59,32 @@ class MessageTestHelper extends TestCase
return message(false);
}
- AMQMessage message(boolean immediate) throws AMQException
+ AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
new ContentHeaderBody());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 89889ca017..6ffa3e0e02 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -24,9 +24,12 @@ import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +59,26 @@ public class SkeletonMessageStore implements MessageStore
{
}
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void createQueue(AMQQueue queue) throws AMQException
{
}
@@ -87,7 +110,7 @@ public class SkeletonMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
}
@@ -102,7 +125,7 @@ public class SkeletonMessageStore implements MessageStore
return null;
}
- public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
{
return null;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 6eacd5168f..2f0eaac29a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -50,16 +52,32 @@ public class TestReferenceCounting extends TestCase
public void testMessageGetsRemoved() throws AMQException
{
createPersistentContentHeader();
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
@@ -81,16 +99,33 @@ public class TestReferenceCounting extends TestCase
public void testMessageRemains() throws AMQException
{
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(),
+ info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9a649421dd..79d428fee8 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -35,7 +36,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
@@ -43,7 +44,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
return _metaDataMap;
}
- public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
return _contentBodyMap;
}