summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java3
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java53
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURL.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLHelper.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java11
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java4
25 files changed, 205 insertions, 123 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index fa4219ecd1..8b36576a30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -112,7 +113,7 @@ public class AMQChannel
* A context used by the message store enabling it to track context for a given channel even across
* thread boundaries
*/
- private final StoreContext _storeContext = new StoreContext();
+ private final StoreContext _storeContext;
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
@@ -120,12 +121,16 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+ private final AMQProtocolSession _session;
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
+ _session = session;
_channelId = channelId;
+ _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
@@ -338,7 +343,8 @@ public class AMQChannel
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
+ _txnContext.commit();
+
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -386,8 +392,10 @@ public class AMQChannel
_txnContext.deliver(unacked.message, unacked.queue);
}
}
+
}
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
@@ -403,7 +411,7 @@ public class AMQChannel
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
{
msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
}
@@ -417,6 +425,7 @@ public class AMQChannel
msgToRequeue.add(message);
}
}
+
// false means continue processing
return false;
}
@@ -430,6 +439,7 @@ public class AMQChannel
{
_txnContext.deliver(message.message, message.queue);
_unacknowledgedMessageMap.remove(message.deliveryTag);
+ message.message.decrementReference(_storeContext);
}
}
@@ -559,6 +569,8 @@ public class AMQChannel
public void rollback() throws AMQException
{
_txnContext.rollback();
+
+
}
public String toString()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index bbfab8132c..c987c12154 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -100,6 +100,7 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.restoreTransientMessageData();
msg.discard(storeContext);
}
}
@@ -112,6 +113,7 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.clearTransientMessageData();
msg.message.incrementReference();
}
}
@@ -120,6 +122,11 @@ public class TxAck implements TxnOp
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
+ for (UnacknowledgedMessage msg : _unacked)
+ {
+ msg.clearTransientMessageData();
+ }
+
}
public void rollback(StoreContext storeContext)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index ff3c901be5..3f2348b71b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -50,5 +50,15 @@ public class UnacknowledgedMessage
}
message.decrementReference(storeContext);
}
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ message.restoreTransientMessageData();
+ }
+
+ public void clearTransientMessageData()
+ {
+ message.clearTransientMessageData();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index fb198ef4f7..03fc7a3926 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -49,7 +49,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index f23ec85391..be81734ae4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -111,7 +108,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -124,7 +121,7 @@ public class AMQMessage
{
try
{
- ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
return ContentBody.createAMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -141,6 +138,11 @@ public class AMQMessage
}
}
+ private StoreContext getStoreContext()
+ {
+ return _txnContext.getStoreContext();
+ }
+
private class BodyContentIterator implements Iterator<ContentBody>
{
@@ -150,7 +152,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -163,7 +165,7 @@ public class AMQMessage
{
try
{
- return _messageHandle.getContentBody(_messageId, ++_index);
+ return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
}
catch (AMQException e)
{
@@ -201,10 +203,11 @@ public class AMQMessage
* @param factory
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
+ _txnContext = txnConext;
_transientMessageData = null;
}
@@ -276,7 +279,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.getContentHeaderBody(_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
}
}
@@ -342,14 +345,16 @@ public class AMQMessage
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
}
}
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -365,7 +370,9 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
+
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -386,7 +393,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -475,7 +482,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.isPersistent(_messageId);
+ return _messageHandle.isPersistent(getStoreContext(),_messageId);
}
}
@@ -504,7 +511,7 @@ public class AMQMessage
}
else
{
- pb = _messageHandle.getPublishBody(_messageId);
+ pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
}
return pb;
}
@@ -541,7 +548,7 @@ public class AMQMessage
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId);
+ _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
}
try
{
@@ -575,7 +582,7 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -591,7 +598,7 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -603,7 +610,7 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -619,7 +626,7 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -634,7 +641,7 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -646,7 +653,7 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -749,13 +756,30 @@ public class AMQMessage
}
catch (AMQException e)
{
- _log.error(e);
+ _log.error(e.toString(),e);
return 0;
}
}
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ TransientMessageData transientMessageData = new TransientMessageData();
+ transientMessageData.setPublishBody(getPublishBody());
+ transientMessageData.setContentHeaderBody(getContentHeaderBody());
+ transientMessageData.addBodyLength(getContentHeaderBody().getSize());
+ _transientMessageData = transientMessageData;
+ }
+
+
+ public void clearTransientMessageData()
+ {
+ _transientMessageData = null;
+ }
+
+
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index 6aa8f98403..210c9f01a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -35,17 +35,17 @@ import org.apache.qpid.server.store.StoreContext;
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
+ ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount(Long messageId) throws AMQException;
+ int getBodyCount(StoreContext context, Long messageId) throws AMQException;
/**
* @return the size of the body
*/
- long getBodySize(Long messageId) throws AMQException;
+ long getBodySize(StoreContext context, Long messageId) throws AMQException;
/**
* Get a particular content body
@@ -53,17 +53,17 @@ public interface AMQMessageHandle
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(Long messageId) throws AMQException;
+ BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
- boolean isPersistent(Long messageId) throws AMQException;
+ boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 5890d7b72c..79f875ce1e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -49,22 +49,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle
{
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
return _contentHeaderBody;
}
- public int getBodyCount(Long messageId)
+ public int getBodyCount(StoreContext context, Long messageId)
{
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -80,7 +80,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
return _publishBody;
}
@@ -96,10 +96,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 8e270f9772..05841ccfc0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -267,9 +267,11 @@ public class SubscriptionImpl implements Subscription
if (_acks)
{
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ msg.decrementReference(storeContext);
}
msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 161913ef15..670d895950 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -56,21 +56,21 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_messageStore = messageStore;
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
if (chb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
chb = mmd.getContentHeaderBody();
}
return chb;
}
- private MessageMetaData loadMessageMetaData(Long messageId)
+ private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
throws AMQException
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
populateFromMessageMetaData(mmd);
return mmd;
}
@@ -82,11 +82,11 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
}
- public int getBodyCount(Long messageId) throws AMQException
+ public int getBodyCount(StoreContext context, Long messageId) throws AMQException
{
if (_contentBodies == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
int chunkCount = mmd.getContentChunkCount();
_contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
@@ -97,12 +97,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -113,7 +113,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
ContentBody cb = wr.get();
if (cb == null)
{
- cb = _messageStore.getContentBodyChunk(messageId, index);
+ cb = _messageStore.getContentBodyChunk(context, messageId, index);
_contentBodies.set(index, new WeakReference<ContentBody>(cb));
}
return cb;
@@ -145,12 +145,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
if (bpb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
bpb = mmd.getPublishBody();
}
@@ -167,10 +167,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 6c4ad10429..f678cea630 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -174,12 +174,12 @@ public class MemoryMessageStore implements MessageStore
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
{
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index d707ece8da..7fa46eb1ca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -84,8 +84,8 @@ public interface MessageStore
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
- MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
+ MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
+ ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index 55e5067852..2e2f2ba7d6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store;
+import org.apache.log4j.Logger;
+
+
/**
* A context that the store can use to associate with a transactional context. For example, it could store
* some kind of txn id.
@@ -28,8 +31,22 @@ package org.apache.qpid.server.store;
*/
public class StoreContext
{
+
+ private static final Logger _logger = Logger.getLogger(StoreContext.class);
+
+ private String _name;
private Object _payload;
+ public StoreContext()
+ {
+ _name = super.toString();
+ }
+
+ public StoreContext(String name)
+ {
+ _name = name;
+ }
+
public Object getPayload()
{
return _payload;
@@ -37,6 +54,7 @@ public class StoreContext
public void setPayload(Object payload)
{
+ _logger.debug("["+_name+"] Setting payload: " + payload);
_payload = payload;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 7481a96ae4..5c915b5c84 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -168,7 +168,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Starting transaction on message store");
+ _log.debug("Starting transaction on message store: " + this);
}
_messageStore.beginTran(_storeContext);
_inTran = true;
@@ -179,7 +179,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Committing transactional context");
+ _log.debug("Committing transactional context: " + this);
}
if (_ackOp != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 17ce6debbd..c04380ba8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -75,7 +75,7 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ throw URLHelper.parseError(0, transport.length(), "Unknown transport", url);
}
}
}
@@ -89,7 +89,7 @@ public class AMQBrokerDetails implements BrokerDetails
if (transport == null)
{
- URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+ throw URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
" In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
}
@@ -144,7 +144,7 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ throw URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
"Illegal character in port number", connection.toString());
}
@@ -172,7 +172,7 @@ public class AMQBrokerDetails implements BrokerDetails
throw(URLSyntaxException) uris;
}
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index 928aa55ea2..fea83d3128 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -68,7 +68,7 @@ public class AMQConnectionURL implements ConnectionURL
String uid = AMQConnectionFactory.getUniqueClientID();
if (uid == null)
{
- URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ throw URLHelper.parseError(-1, "Client Name not specified", fullURL);
}
else
{
@@ -106,7 +106,7 @@ public class AMQConnectionURL implements ConnectionURL
if (userInfo == null)
{
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
"User information not found on url", fullURL);
}
else
@@ -126,11 +126,11 @@ public class AMQConnectionURL implements ConnectionURL
int testIndex = start + authLength;
if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
{
- URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ throw URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
}
else
{
- URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ throw URLHelper.parseError(-1, "Virtual host not specified", fullURL);
}
}
@@ -155,17 +155,17 @@ public class AMQConnectionURL implements ConnectionURL
if (slash == -1)
{
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
else
{
if (slash != 0 && fullURL.charAt(slash - 1) == ':')
{
- URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
}
else
{
- URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ throw URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
}
}
@@ -180,7 +180,7 @@ public class AMQConnectionURL implements ConnectionURL
if (colonIndex == -1)
{
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
"Null password in user information not allowed.", _url);
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 0698da3eba..a994dbc670 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -76,7 +76,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
- _queueName = new AMQShortString(binding.getQueueName());
+ _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName, AMQShortString queueName)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
index bf975c426c..c05667902f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,7 +43,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+ private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public JMSTextMessage() throws JMSException
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index fc635cc7ea..eea660c4f0 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -107,7 +107,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
public OneUseChannel(int channelId, VirtualHost virtualHost)
throws AMQException
{
- super(channelId,
+ super(ClusteredProtocolSession.this,channelId,
virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index 04d152acf5..2ee4ce21cb 100644
--- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -26,9 +26,12 @@ import java.util.HashMap;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.log4j.Logger;
public class AMQBindingURL implements BindingURL
{
+ private static final Logger _logger = Logger.getLogger(AMQBindingURL.class);
+
String _url;
AMQShortString _exchangeClass;
AMQShortString _exchangeName;
@@ -41,7 +44,7 @@ public class AMQBindingURL implements BindingURL
{
//format:
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-
+ _logger.debug("Parsing URL: " + url);
_url = url;
_options = new HashMap<String, String>();
@@ -73,17 +76,19 @@ public class AMQBindingURL implements BindingURL
if (exchangeName == null)
{
- URLHelper.parseError(-1, "Exchange Name not specified.", _url);
+ throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
}
else
{
setExchangeName(exchangeName);
}
+ String queueName;
+
if (connection.getPath() == null ||
connection.getPath().equals(""))
{
- URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+ throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination or Queue requried", _url);
}
else
@@ -91,7 +96,7 @@ public class AMQBindingURL implements BindingURL
int slash = connection.getPath().indexOf("/", 1);
if (slash == -1)
{
- URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
+ throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
"Destination requried", _url);
}
else
@@ -99,7 +104,10 @@ public class AMQBindingURL implements BindingURL
String path = connection.getPath();
setDestinationName(path.substring(1, slash));
- setQueueName(path.substring(slash + 1));
+ // We don't set queueName yet as the actual value we use depends on options set
+ // when we are dealing with durable subscriptions
+
+ queueName = path.substring(slash + 1);
}
}
@@ -108,14 +116,19 @@ public class AMQBindingURL implements BindingURL
processOptions();
+ // We can now call setQueueName as the URL is full parsed.
+
+ setQueueName(queueName);
+
//Fragment is #string (not used)
//System.out.println(connection.getFragment());
+ _logger.debug("URL Parsed: " + this);
}
catch (URISyntaxException uris)
{
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
}
}
@@ -125,7 +138,7 @@ public class AMQBindingURL implements BindingURL
setExchangeClass(new AMQShortString(exchangeClass));
}
- private void setQueueName(String name)
+ private void setQueueName(String name) throws URLSyntaxException
{
setQueueName(new AMQShortString(name));
}
@@ -155,8 +168,9 @@ public class AMQBindingURL implements BindingURL
return _exchangeClass;
}
- public void setExchangeClass(AMQShortString exchangeClass)
+ private void setExchangeClass(AMQShortString exchangeClass)
{
+
_exchangeClass = exchangeClass;
}
@@ -165,7 +179,7 @@ public class AMQBindingURL implements BindingURL
return _exchangeName;
}
- public void setExchangeName(AMQShortString name)
+ private void setExchangeName(AMQShortString name)
{
_exchangeName = name;
@@ -180,40 +194,43 @@ public class AMQBindingURL implements BindingURL
return _destinationName;
}
- public void setDestinationName(AMQShortString name)
+ private void setDestinationName(AMQShortString name)
{
_destinationName = name;
}
public AMQShortString getQueueName()
{
+ return _queueName;
+ }
+
+ public void setQueueName(AMQShortString name) throws URLSyntaxException
+ {
if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
{
if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
{
- return new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
+ _queueName = new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
}
else
{
- return getDestinationName();
+ throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
+
}
}
else
{
- return getDestinationName();
+ _queueName = null;
}
}
else
{
- return _queueName;
+ _queueName = name;
}
- }
- public void setQueueName(AMQShortString name)
- {
- _queueName = name;
+
}
public String getOption(String key)
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 86a8420d30..67be2db86f 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -40,29 +40,17 @@ public interface BindingURL
AMQShortString getExchangeClass();
- void setExchangeClass(AMQShortString name);
-
AMQShortString getExchangeName();
- void setExchangeName(AMQShortString name);
-
AMQShortString getDestinationName();
- void setDestinationName(AMQShortString name);
-
AMQShortString getQueueName();
- void setQueueName(AMQShortString name);
-
String getOption(String key);
- void setOption(String key, String value);
-
boolean containsOption(String key);
AMQShortString getRoutingKey();
- void setRoutingKey(AMQShortString key);
-
String toString();
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
index 2121346c02..806f879818 100644
--- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
+++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -114,11 +114,11 @@ public class URLHelper
if (sepIndex >= options.length() || sepIndex == 0)
{
- parseError(valueIndex, "Unterminated option", options);
+ throw parseError(valueIndex, "Unterminated option", options);
}
else
{
- parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
+ throw parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
options.charAt(sepIndex) + "'", options);
}
}
@@ -136,14 +136,14 @@ public class URLHelper
}
- public static void parseError(int index, String error, String url) throws URLSyntaxException
+ public static URLSyntaxException parseError(int index, String error, String url)
{
- parseError(index, 1, error, url);
+ return parseError(index, 1, error, url);
}
- public static void parseError(int index, int length, String error, String url) throws URLSyntaxException
+ public static URLSyntaxException parseError(int index, int length, String error, String url)
{
- throw new URLSyntaxException(url, error, index, length);
+ return new URLSyntaxException(url, error, index, length);
}
public static String printOptions(HashMap<String, String> options)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index dac0f06744..da1455294a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -58,7 +58,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
assertTrue(channelCount == 1);
AMQQueue queue = new org.apache.qpid.server.queue.AMQQueue(new AMQShortString("testQueue_" + System.currentTimeMillis()),
false, new AMQShortString("test"), true, _virtualHost);
- AMQChannel channel = new AMQChannel(2, _messageStore, null);
+ AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore, null);
channel.setDefaultQueue(queue);
_protocolSession.addChannel(channel);
channelCount = _mbean.channels().size();
@@ -69,7 +69,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
// check APIs
- AMQChannel channel3 = new AMQChannel(3, _messageStore, null);
+ AMQChannel channel3 = new AMQChannel(_protocolSession,3, _messageStore, null);
channel3.setLocalTransactional();
_protocolSession.addChannel(channel3);
_mbean.rollbackTransactions(2);
@@ -89,14 +89,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase
}
// check if closing of session works
- _protocolSession.addChannel(new AMQChannel(5, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession,5, _messageStore, null));
_mbean.closeConnection();
try
{
channelCount = _mbean.channels().size();
assertTrue(channelCount == 0);
// session is now closed so adding another channel should throw an exception
- _protocolSession.addChannel(new AMQChannel(6, _messageStore, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession,6, _messageStore, null));
fail();
}
catch(AMQException ex)
@@ -109,13 +109,14 @@ public class AMQProtocolSessionMBeanTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _channel = new AMQChannel(1, _messageStore, null);
+
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_virtualHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _virtualHost.getQueueRegistry();
_exchangeRegistry = _virtualHost.getExchangeRegistry();
_mockIOSession = new MockIoSession();
_protocolSession = new AMQMinaProtocolSession(_mockIOSession, appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true));
+ _channel = new AMQChannel(_protocolSession,1, _messageStore, null);
_protocolSession.addChannel(_channel);
_mbean = (AMQProtocolSessionMBean)_protocolSession.getManagedObject();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 84dde9dd6f..c35d38e4ab 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -78,8 +78,9 @@ public class AMQQueueMBeanTest extends TestCase
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- _channel = new AMQChannel(1, _messageStore, null);
+
_protocolSession = new MockProtocolSession(_messageStore);
+ _channel = new AMQChannel(_protocolSession, 1, _messageStore, null);
_protocolSession.addChannel(_channel);
_queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index ccc7752fd3..93050af2b7 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -75,8 +75,9 @@ public class AckTest extends TestCase
{
super.setUp();
_messageStore = new TestableMemoryMessageStore();
- _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/);
_protocolSession = new MockProtocolSession(_messageStore);
+ _channel = new AMQChannel(_protocolSession,5, _messageStore, null/*dont need exchange registry*/);
+
_protocolSession.addChannel(_channel);
_subscriptionManager = new SubscriptionSet();
_queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager);
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 42dd1a4b74..89889ca017 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -97,12 +97,12 @@ public class SkeletonMessageStore implements MessageStore
}
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
{
return null;
}
- public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
{
return null;
}