summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-05 23:16:50 +0000
committerRobert Greig <rgreig@apache.org>2007-01-05 23:16:50 +0000
commita0bc8a970759ed6b4b20fea2ce81386e7cff4f7d (patch)
treec2615a9cd9fc1dfb8d09cec795f76e74e373038e
parentfcfaaaf02693331d0079fb001586d61e2ab9da2e (diff)
downloadqpid-python-a0bc8a970759ed6b4b20fea2ce81386e7cff4f7d.tar.gz
Fix to remove reliance on thread local transaction reference and replace it with a StoreContext
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@493231 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml4
-rw-r--r--java/broker/etc/log4j.xml12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java69
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java11
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java2
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java13
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java10
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java15
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java7
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java6
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java18
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java11
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java26
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java24
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java27
40 files changed, 438 insertions, 260 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index 40e2f468e0..0862588a0d 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -82,8 +82,8 @@
<auto_register>true</auto_register>
</queue>
<store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>-->
+ <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>-->
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
</store>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
</broker>
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml
index 133b5ce259..42f191cc59 100644
--- a/java/broker/etc/log4j.xml
+++ b/java/broker/etc/log4j.xml
@@ -34,9 +34,17 @@
</layout>
</appender>
- <!--<category name="org.apache.qpid.server.queue">
+ <category name="org.apache.qpid.server.store">
<priority value="debug"/>
- </category>-->
+ </category>
+
+ <category name="org.apache.qpid.server.queue">
+ <priority value="debug"/>
+ </category>
+
+ <category name="org.apache.qpid.server.txn">
+ <priority value="debug"/>
+ </category>
<root>
<priority value="info"/>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 24f61b2426..8821830f4d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -37,14 +36,13 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.TxnBuffer;
import java.util.*;
-import java.util.Set;
-import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -93,7 +91,7 @@ public class AMQChannel
private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
private final MessageStore _messageStore;
-
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -102,6 +100,12 @@ public class AMQChannel
private TransactionalContext _txnContext;
+ /**
+ * A context used by the message store enabling it to track context for a given channel even across
+ * thread boundaries
+ */
+ private final StoreContext _storeContext = new StoreContext();
+
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
@@ -117,7 +121,7 @@ public class AMQChannel
_messageStore = messageStore;
_exchanges = exchanges;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks);
+ _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
/**
@@ -125,7 +129,7 @@ public class AMQChannel
*/
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, new TxnBuffer(), _returnMessages);
+ _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages);
}
public boolean isTransactional()
@@ -191,7 +195,7 @@ public class AMQChannel
{
_currentMessage.setContentHeaderBody(contentHeaderBody);
routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
// check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
@@ -213,7 +217,7 @@ public class AMQChannel
// received
try
{
- if (_currentMessage.addContentBodyFrame(contentBody))
+ if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
{
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
@@ -453,10 +457,10 @@ public class AMQChannel
{
if (message.queue == queue)
{
- message.queue = null;
try
{
- message.message.decrementReference();
+ message.discard(_storeContext);
+ message.queue = null;
}
catch (AMQException e)
{
@@ -536,7 +540,7 @@ public class AMQChannel
_browsedAcks.add(deliveryTag);
addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
-
+
private void checkSuspension()
{
boolean suspend;
@@ -608,6 +612,11 @@ public class AMQChannel
return _defaultQueue;
}
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+
public void processReturns(AMQProtocolSession session) throws AMQException
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index efc448bab2..c0a631080e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@ package org.apache.qpid.server.ack;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
import java.util.List;
@@ -94,12 +95,12 @@ public class TxAck implements TxnOp
return false;
}
- public void prepare() throws AMQException
+ public void prepare(StoreContext storeContext) throws AMQException
{
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
- msg.discard();
+ msg.discard(storeContext);
}
}
@@ -115,13 +116,13 @@ public class TxAck implements TxnOp
}
}
- public void commit()
+ public void commit(StoreContext storeContext)
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
}
- public void rollback()
+ public void rollback(StoreContext storeContext)
{
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 0eff5a3cca..26f41e19af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@ package org.apache.qpid.server.ack;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
public class UnacknowledgedMessage
{
@@ -30,7 +31,7 @@ public class UnacknowledgedMessage
public final String consumerTag;
public final long deliveryTag;
public AMQQueue queue;
-
+
public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
{
this.queue = queue;
@@ -39,13 +40,13 @@ public class UnacknowledgedMessage
this.deliveryTag = deliveryTag;
}
- public void discard() throws AMQException
+ public void discard(StoreContext storeContext) throws AMQException
{
if (queue != null)
{
- message.dequeue(queue);
+ message.dequeue(storeContext, queue);
}
- message.decrementReference();
+ message.decrementReference(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 5091dded8a..f30667690f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
@@ -71,8 +72,8 @@ public class AMQMessage
private boolean _deliveredToConsumer;
private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
- private AtomicBoolean _taken;
-
+ private AtomicBoolean _taken = new AtomicBoolean(false);
+
private TransientMessageData _transientMessageData = new TransientMessageData();
/**
@@ -92,7 +93,15 @@ public class AMQMessage
public boolean hasNext()
{
- return _index < _messageHandle.getBodyCount() - 1;
+ try
+ {
+ return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ }
+ catch (AMQException e)
+ {
+ _log.error("Unable to get body count: " + e, e);
+ return false;
+ }
}
public AMQDataBlock next()
@@ -123,7 +132,15 @@ public class AMQMessage
public boolean hasNext()
{
- return _index < _messageHandle.getBodyCount() - 1;
+ try
+ {
+ return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ }
+ catch (AMQException e)
+ {
+ _log.error("Error getting body count: " + e, e);
+ return false;
+ }
}
public ContentBody next()
@@ -203,15 +220,15 @@ public class AMQMessage
public AMQMessage(long messageId, BasicPublishBody publishBody,
TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
- List<ContentBody> contentBodies, MessageStore messageStore,
+ List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, publishBody, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
- routingComplete(messageStore, messageHandleFactory);
+ routingComplete(messageStore, storeContext, messageHandleFactory);
for (ContentBody cb : contentBodies)
{
- addContentBodyFrame(cb);
+ addContentBodyFrame(storeContext, cb);
}
}
@@ -252,7 +269,7 @@ public class AMQMessage
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
- public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException
+ public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(_messageId, store, persistent);
@@ -265,22 +282,22 @@ public class AMQMessage
// persistent store
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _messageHandle.enqueue(_messageId, q);
+ _messageHandle.enqueue(storeContext, _messageId, q);
}
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
- deliver();
+ deliver(storeContext);
}
}
- public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException
+ public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
{
_transientMessageData.addBodyLength(contentBody.getSize());
- _messageHandle.addContentBodyFrame(_messageId, contentBody);
+ _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody);
if (isAllContentReceived())
{
- deliver();
+ deliver(storeContext);
return true;
}
else
@@ -318,7 +335,7 @@ public class AMQMessage
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference() throws MessageCleanupException
+ public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -336,7 +353,7 @@ public class AMQMessage
// and the handle has not yet been constructed
if (_messageHandle != null)
{
- _messageHandle.removeMessage(_messageId);
+ _messageHandle.removeMessage(storeContext, _messageId);
}
}
catch (AMQException e)
@@ -422,7 +439,7 @@ public class AMQMessage
{
_taken.set(false);
}
-
+
public boolean checkToken(Object token)
{
if (_tokens.contains(token))
@@ -449,9 +466,9 @@ public class AMQMessage
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
- _messageHandle.dequeue(_messageId, queue);
+ _messageHandle.dequeue(storeContext, _messageId, queue);
}
public boolean isPersistent() throws AMQException
@@ -515,7 +532,7 @@ public class AMQMessage
_deliveredToConsumer = true;
}
- private void deliver() throws AMQException
+ private void deliver(StoreContext storeContext) throws AMQException
{
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
@@ -524,7 +541,7 @@ public class AMQMessage
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_messageId, _transientMessageData.getPublishBody(),
+ _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(),
_transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
@@ -541,7 +558,7 @@ public class AMQMessage
finally
{
destinationQueues.clear();
- decrementReference();
+ decrementReference(storeContext);
}
}
@@ -572,7 +589,7 @@ public class AMQMessage
}
//
- // Now start writing out the other content bodies
+ // Now start writing out the other content bodies
//
while (bodyFrameIterator.hasNext())
{
@@ -597,7 +614,7 @@ public class AMQMessage
private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
- replyCode, replyText,
+ replyCode, replyText,
getPublishBody().routingKey);
ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
returnFrame.writePayload(buf);
@@ -641,4 +658,10 @@ public class AMQMessage
protocolSession.writeFrame(bodyFrameIterator.next());
}
}
+
+ public String toString()
+ {
+ return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
+ _taken;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index 949b245c35..31b1f668fe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -9,6 +9,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -18,7 +19,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
* even no caching at all to minimise the broker memory footprint.
*
* The method all take a messageId to avoid having to store it in the instance - the AMQMessage container
- * must already keen the messageId so it is pointless storing it twice.
+ * must already keen the messageId so it is pointless storing it twice.
*/
public interface AMQMessageHandle
{
@@ -27,7 +28,7 @@ public interface AMQMessageHandle
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount();
+ int getBodyCount(long messageId) throws AMQException;
/**
* @return the size of the body
@@ -42,23 +43,23 @@ public interface AMQMessageHandle
*/
ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException;
- void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException;
BasicPublishBody getPublishBody(long messageId) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
-
+
boolean isPersistent(long messageId) throws AMQException;
- void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody,
+ void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException;
- void removeMessage(long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
- void enqueue(long messageId, AMQQueue queue) throws AMQException;
+ void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
- void dequeue(long messageId, AMQQueue queue) throws AMQException;
+ void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 7ab48598c4..fa1257b5f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -27,11 +27,10 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import java.text.MessageFormat;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@@ -344,17 +343,17 @@ public class AMQQueue implements Managable, Comparable
/**
* Removes the AMQMessage from the top of the queue.
*/
- public void deleteMessageFromTop() throws AMQException
+ public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.removeAMessageFromTop();
+ _deliveryMgr.removeAMessageFromTop(storeContext);
}
/**
* removes all the messages from the queue.
*/
- public void clearQueue() throws AMQException
+ public void clearQueue(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.clearAllMessages();
+ _deliveryMgr.clearAllMessages(storeContext);
}
public void bind(String routingKey, Exchange exchange)
@@ -378,7 +377,7 @@ public class AMQQueue implements Managable, Comparable
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -464,9 +463,9 @@ public class AMQQueue implements Managable, Comparable
msg.incrementReference();
} */
- public void process(AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- _deliveryMgr.deliver(getName(), msg);
+ _deliveryMgr.deliver(storeContext, getName(), msg);
try
{
msg.checkDeliveredToConsumer();
@@ -476,16 +475,16 @@ public class AMQQueue implements Managable, Comparable
{
// as this message will be returned, it should be removed
// from the queue:
- dequeue(msg);
+ dequeue(storeContext, msg);
}
}
- void dequeue(AMQMessage msg) throws FailedDequeueException
+ void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
- msg.dequeue(this);
- msg.decrementReference();
+ msg.dequeue(storeContext, this);
+ msg.decrementReference(storeContext);
}
catch (MessageCleanupException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 169055bad0..77bbdf7b4b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -20,6 +20,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -44,6 +45,12 @@ import java.util.Iterator;
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
+ /**
+ * Since the MBean is not associated with a real channel we can safely create our own store context
+ * for use in the few methods that require one.
+ */
+ private StoreContext _storeContext = new StoreContext();
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -241,13 +248,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop()
+ * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop
*/
public void deleteMessageFromTop() throws JMException
{
try
{
- _queue.deleteMessageFromTop();
+ _queue.deleteMessageFromTop(_storeContext);
}
catch (AMQException ex)
{
@@ -256,13 +263,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#clearQueue()
+ * @see org.apache.qpid.server.queue.AMQQueue#clearQueue
*/
public void clearQueue() throws JMException
{
try
{
- _queue.clearQueue();
+ _queue.clearQueue(_storeContext);
}
catch (AMQException ex)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
index dda074aca7..a2898ccdce 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.ArrayList;
@@ -200,21 +201,21 @@ public class ConcurrentDeliveryManager implements DeliveryManager
//no-op . This DM has no PreDeliveryQueues
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -293,7 +294,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager
}
}
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException
+ public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 93baa3fc29..8f0c3a5ec7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -26,6 +26,7 @@ import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
import java.util.Iterator;
@@ -167,21 +168,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -279,11 +280,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _messages.poll();
}
- public void deliver(String name, AMQMessage msg) throws AMQException
+ public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+ _log.debug(id() + "deliver :" + msg);
}
//Check if we have someone to deliver the message to.
@@ -296,7 +297,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
}
if (!msg.getPublishBody().immediate)
{
@@ -308,7 +309,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +331,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +346,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 6f31616114..82d8f9538f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import java.util.concurrent.Executor;
import java.util.List;
@@ -66,11 +67,11 @@ interface DeliveryManager
* @param msg the message to deliver
* @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
*/
- void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException;
- void removeAMessageFromTop() throws AMQException;
+ void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
- void clearAllMessages() throws AMQException;
+ void clearAllMessages(StoreContext storeContext) throws AMQException;
List<AMQMessage> getMessages();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 4a8616746c..1ddcdd8919 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -9,6 +9,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
@@ -39,7 +40,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _contentHeaderBody;
}
- public int getBodyCount()
+ public int getBodyCount(long messageId)
{
return _contentBodies.size();
}
@@ -59,7 +60,8 @@ public class InMemoryMessageHandle implements AMQMessageHandle
return _contentBodies.get(index);
}
- public void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody)
+ throws AMQException
{
_contentBodies.add(contentBody);
}
@@ -94,7 +96,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -102,15 +104,18 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_contentHeaderBody = contentHeaderBody;
}
- public void removeMessage(long messageId) throws AMQException
+ public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
{
+ // NO OP
}
- public void enqueue(long messageId, AMQQueue queue) throws AMQException
+ public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
+ // NO OP
}
- public void dequeue(long messageId, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
+ // NO OP
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index d04b6d3f60..2dab551e07 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,5 +44,5 @@ public interface Subscription
void close();
- boolean isBrowser();
+ boolean isBrowser();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 56f3c0323e..0dc1f3b0c1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,20 +21,20 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.Queue;
@@ -211,7 +211,7 @@ public class SubscriptionImpl implements Subscription
}
else
{
- sendToConsumer(msg, queue);
+ sendToConsumer(channel.getStoreContext(), msg, queue);
}
}
else
@@ -239,7 +239,8 @@ public class SubscriptionImpl implements Subscription
}
}
- private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException
+ private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ throws AMQException
{
try
{
@@ -254,7 +255,11 @@ public class SubscriptionImpl implements Subscription
// the message is unacked, it will be lost.
if (!_acks)
{
- queue.dequeue(msg);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
synchronized(channel)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index 7332ffbbee..f290452058 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.log4j.Logger;
import java.util.LinkedList;
@@ -129,21 +130,21 @@ class SynchronizedDeliveryManager implements DeliveryManager
//no-op . This DM has no PreDeliveryQueues
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -233,7 +234,7 @@ class SynchronizedDeliveryManager implements DeliveryManager
* @throws NoConsumersException if there are no active subscribers to deliver
* the message to
*/
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException
+ public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index c6f5c19940..284b34e1bc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -14,10 +14,12 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.lang.ref.WeakReference;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
+import java.util.LinkedList;
/**
* @author Robert Greig (robert.j.greig@jpmorgan.com)
@@ -28,7 +30,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
private WeakReference<BasicPublishBody> _publishBody;
- private List<WeakReference<ContentBody>> _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ private List<WeakReference<ContentBody>> _contentBodies;
private boolean _redelivered;
@@ -52,8 +54,18 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return chb;
}
- public int getBodyCount()
+ public int getBodyCount(long messageId) throws AMQException
{
+ if (_contentBodies == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ int chunkCount = mmd.getContentChunkCount();
+ _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+ for (int i = 0; i < chunkCount; i++)
+ {
+ _contentBodies.add(new WeakReference<ContentBody>(null));
+ }
+ }
return _contentBodies.size();
}
@@ -79,10 +91,14 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return cb;
}
- public void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
{
+ if (_contentBodies == null)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
_contentBodies.add(new WeakReference<ContentBody>(contentBody));
- _messageStore.storeContentBodyChunk(messageId, _contentBodies.size() - 1, contentBody);
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
}
public BasicPublishBody getPublishBody(long messageId) throws AMQException
@@ -122,28 +138,28 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
- _messageStore.storeMessageMetaData(messageId, new MessageMetaData(publishBody, contentHeaderBody,
- _contentBodies.size()));
+ _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
+ _contentBodies.size()));
_publishBody = new WeakReference<BasicPublishBody>(publishBody);
_contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
}
- public void removeMessage(long messageId) throws AMQException
+ public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
{
- _messageStore.removeMessage(messageId);
+ _messageStore.removeMessage(storeContext, messageId);
}
- public void enqueue(long messageId, AMQQueue queue) throws AMQException
+ public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
- _messageStore.enqueueMessage(queue.getName(), messageId);
+ _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
}
- public void dequeue(long messageId, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
{
- _messageStore.dequeueMessage(queue.getName(), messageId);
+ _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 4d6c79d37b..edf2386314 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -85,7 +85,7 @@ public class MemoryMessageStore implements MessageStore
}
}
- public void removeMessage(long messageId)
+ public void removeMessage(StoreContext context, long messageId)
{
if (_log.isDebugEnabled())
{
@@ -105,32 +105,32 @@ public class MemoryMessageStore implements MessageStore
// Not required to do anything
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException
{
// Not required to do anything
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException
{
// Not required to do anything
}
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext context) throws AMQException
{
// Not required to do anything
}
- public void commitTran() throws AMQException
+ public void commitTran(StoreContext context) throws AMQException
{
// Not required to do anything
}
- public void abortTran() throws AMQException
+ public void abortTran(StoreContext context) throws AMQException
{
// Not required to do anything
}
- public boolean inTran()
+ public boolean inTran(StoreContext context)
{
return false;
}
@@ -145,7 +145,8 @@ public class MemoryMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+ throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
if (bodyList == null)
@@ -157,7 +158,8 @@ public class MemoryMessageStore implements MessageStore
bodyList.add(index, contentBody);
}
- public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+ throws AMQException
{
_metaDataMap.put(messageId, messageMetaData);
}
@@ -169,7 +171,7 @@ public class MemoryMessageStore implements MessageStore
public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
{
- List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index eff0818afb..973c661c06 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,23 +48,23 @@ public interface MessageStore
*/
void close() throws Exception;
- void removeMessage(long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(String name) throws AMQException;
- void enqueueMessage(String name, long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void dequeueMessage(String name, long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void beginTran() throws AMQException;
+ void beginTran(StoreContext context) throws AMQException;
- void commitTran() throws AMQException;
+ void commitTran(StoreContext context) throws AMQException;
- void abortTran() throws AMQException;
+ void abortTran(StoreContext context) throws AMQException;
- boolean inTran();
+ boolean inTran(StoreContext context);
/**
* Recreate all queues that were persisted, including re-enqueuing of existing messages
@@ -79,9 +79,9 @@ public interface MessageStore
*/
long getNewMessageId();
- void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException;
+ void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
- void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException;
+ void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
MessageMetaData getMessageMetaData(long messageId) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
new file mode 100644
index 0000000000..55e5067852
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+/**
+ * A context that the store can use to associate with a transactional context. For example, it could store
+ * some kind of txn id.
+ *
+ * @author Apache Software Foundation
+ */
+public class StoreContext
+{
+ private Object _payload;
+
+ public Object getPayload()
+ {
+ return _payload;
+ }
+
+ public void setPayload(Object payload)
+ {
+ _payload = payload;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
index 7ff609b750..6d5d3c42f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -22,6 +22,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.StoreContext;
import java.util.List;
@@ -42,7 +43,7 @@ public class CleanupMessageOperation implements TxnOp
_returns = returns;
}
- public void prepare() throws AMQException
+ public void prepare(StoreContext context) throws AMQException
{
}
@@ -53,7 +54,7 @@ public class CleanupMessageOperation implements TxnOp
//or enqueued on any queues and can be discarded
}
- public void commit()
+ public void commit(StoreContext context)
{
//The routers reference can now be released. This is done
//here to ensure that it happens after the queues that
@@ -61,7 +62,7 @@ public class CleanupMessageOperation implements TxnOp
//memory only operation is done in the commit phase).
try
{
- _msg.decrementReference();
+ _msg.decrementReference(context);
}
catch (AMQException e)
{
@@ -83,7 +84,8 @@ public class CleanupMessageOperation implements TxnOp
}
}
- public void rollback()
+ public void rollback(StoreContext context)
{
+ // NO OP
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
index 059c13c687..78887030e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
@@ -12,6 +12,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
/**
* @author Robert Greig (robert.j.greig@jpmorgan.com)
@@ -31,22 +32,22 @@ public class DeliverMessageOperation implements TxnOp
_msg.incrementReference();
}
- public void prepare() throws AMQException
- {
+ public void prepare(StoreContext context) throws AMQException
+ {
}
public void undoPrepare()
{
}
- public void commit()
+ public void commit(StoreContext context)
{
//do the memeory part of the record()
_msg.incrementReference();
//then process the message
try
{
- _queue.process(_msg);
+ _queue.process(context, _msg);
}
catch (AMQException e)
{
@@ -55,7 +56,7 @@ public class DeliverMessageOperation implements TxnOp
}
}
- public void rollback()
+ public void rollback(StoreContext storeContext)
{
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 06522740cc..ff6abdd58b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.List;
@@ -45,12 +46,15 @@ public class LocalTransactionalContext implements TransactionalContext
private final MessageStore _messageStore;
+ private final StoreContext _storeContext;
+
private boolean _inTran = false;
- public LocalTransactionalContext(MessageStore messageStore,
+ public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
{
_messageStore = messageStore;
+ _storeContext = storeContext;
_txnBuffer = txnBuffer;
_returnMessages = returnMessages;
_txnBuffer.enlist(new StoreMessageOperation(messageStore));
@@ -58,7 +62,7 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
- _txnBuffer.rollback();
+ _txnBuffer.rollback(_storeContext);
}
public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
@@ -125,7 +129,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
- _messageStore.beginTran();
+ _messageStore.beginTran(_storeContext);
_inTran = true;
}
}
@@ -134,11 +138,11 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_ackOp != null)
{
- _ackOp.consolidate();
+ _ackOp.consolidate();
//already enlisted, after commit will reset regardless of outcome
_ackOp = null;
}
- _txnBuffer.commit();
+ _txnBuffer.commit(_storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 7321854034..f09d811b50 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
import java.util.List;
@@ -54,15 +55,18 @@ public class NonTransactionalContext implements TransactionalContext
private final MessageStore _messageStore;
+ private StoreContext _storeContext;
+
/**
* Whether we are in a transaction
*/
private boolean _inTran;
- public NonTransactionalContext(MessageStore messageStore, AMQChannel channel,
+ public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
{
_channel = channel;
+ _storeContext = storeContext;
_returnMessages = returnMessages;
_messageStore = messageStore;
_browsedAcks = browsedAcks;
@@ -72,7 +76,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_inTran)
{
- _messageStore.beginTran();
+ _messageStore.beginTran(_storeContext);
_inTran = true;
}
}
@@ -92,7 +96,7 @@ public class NonTransactionalContext implements TransactionalContext
try
{
message.incrementReference();
- queue.process(message);
+ queue.process(_storeContext, message);
//following check implements the functionality
//required by the 'immediate' flag:
message.checkDeliveredToConsumer();
@@ -122,7 +126,11 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_browsedAcks.contains(deliveryTag))
{
- message.discard();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + message.message.getMessageId());
+ }
+ message.discard(_storeContext);
}
else
{
@@ -150,7 +158,11 @@ public class NonTransactionalContext implements TransactionalContext
{
if (!_browsedAcks.contains(deliveryTag))
{
- msg.discard();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+ msg.discard(_storeContext);
}
else
{
@@ -171,10 +183,11 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
}
- msg.discard();
+ msg.discard(_storeContext);
if (_log.isDebugEnabled())
{
- _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
+ msg.message.getMessageId());
}
}
}
@@ -183,7 +196,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (persistent)
{
- _messageStore.commitTran();
+ _messageStore.commitTran(_storeContext);
_inTran = false;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
index 2a3b524060..e8100b49bc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
@@ -10,6 +10,7 @@ package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
/**
* A transactional operation to store messages in an underlying persistent store. When this operation
@@ -25,7 +26,7 @@ public class StoreMessageOperation implements TxnOp
_messsageStore = messageStore;
}
- public void prepare() throws AMQException
+ public void prepare(StoreContext context) throws AMQException
{
}
@@ -33,13 +34,13 @@ public class StoreMessageOperation implements TxnOp
{
}
- public void commit() throws AMQException
+ public void commit(StoreContext context) throws AMQException
{
- _messsageStore.commitTran();
+ _messsageStore.commitTran(context);
}
- public void rollback() throws AMQException
+ public void rollback(StoreContext context) throws AMQException
{
- _messsageStore.abortTran();
+ _messsageStore.abortTran(context);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index 63cb6f738b..069caf0ae1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.txn;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
import java.util.List;
@@ -39,26 +40,26 @@ public class TxnBuffer
{
}
- public void commit() throws AMQException
+ public void commit(StoreContext context) throws AMQException
{
- if (prepare())
+ if (prepare(context))
{
for (TxnOp op : _ops)
{
- op.commit();
+ op.commit(context);
}
}
_ops.clear();
}
- private boolean prepare()
+ private boolean prepare(StoreContext context)
{
for (int i = 0; i < _ops.size(); i++)
{
TxnOp op = _ops.get(i);
try
{
- op.prepare();
+ op.prepare(context);
}
catch (Exception e)
{
@@ -73,11 +74,11 @@ public class TxnBuffer
return true;
}
- public void rollback() throws AMQException
+ public void rollback(StoreContext context) throws AMQException
{
for (TxnOp op : _ops)
{
- op.rollback();
+ op.rollback(context);
}
_ops.clear();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
index 7cb2515c35..919c078cf0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
/**
* This provides the abstraction of an individual operation within a
@@ -31,12 +32,12 @@ public interface TxnOp
/**
* Do the part of the operation that updates persistent state
*/
- public void prepare() throws AMQException;
+ public void prepare(StoreContext context) throws AMQException;
/**
* Complete the operation started by prepare. Can now update in
* memory state or make netork transfers.
*/
- public void commit() throws AMQException;
+ public void commit(StoreContext context) throws AMQException;
/**
* This is not the same as rollback. Unfortunately the use of an
* in memory reference count as a locking mechanism and a test for
@@ -50,5 +51,5 @@ public interface TxnOp
/**
* Rolls back the operation.
*/
- public void rollback() throws AMQException;
+ public void rollback(StoreContext context) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index adae209493..b395579cea 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -58,10 +59,10 @@ public class ClusteredQueue extends AMQQueue
_subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
}
- public void process(AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
_logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this));
- super.process(msg);
+ super.process(storeContext, msg);
}
protected void autodelete() throws AMQException
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
index ddee643a76..9f41510b5d 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
@@ -109,7 +109,7 @@ public class ServiceProvidingClient
_logger.info("About to create a producer");
_destinationProducer = session.createProducer(responseDest);
_destinationProducer.setDisableMessageTimestamp(true);
- _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _destinationProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
_logger.info("After create a producer");
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
index b52d06558a..12c3ac2afe 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -185,7 +185,7 @@ public class ServiceRequestingClient implements ExceptionListener
AMQQueue destination = new AMQQueue(commandQueueName);
_producer = (MessageProducer) _session.createProducer(destination);
_producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
_tempDestination = new AMQQueue("TempResponse" +
Long.toString(System.currentTimeMillis()), true);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 258bcecc41..ac5b9ed14c 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -91,16 +92,20 @@ public class TxAckTest extends TestCase
private final TxAck _op = new TxAck(_map);
private final List<Long> _acked;
private final List<Long> _unacked;
+ private StoreContext _storeContext = new StoreContext();
Scenario(int messageCount, List<Long> acked, List<Long> unacked)
{
- TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), null,
+ TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(),
+ _storeContext, null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
- TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody(), txnContext);
+ // TODO: fix hardcoded protocol version data
+ TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
+ (byte)0), txnContext);
_map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
@@ -125,7 +130,7 @@ public class TxAckTest extends TestCase
void prepare() throws AMQException
{
_op.consolidate();
- _op.prepare();
+ _op.prepare(_storeContext);
assertCount(_acked, -1);
assertCount(_unacked, 0);
@@ -143,7 +148,7 @@ public class TxAckTest extends TestCase
void commit()
{
_op.consolidate();
- _op.commit();
+ _op.commit(_storeContext);
//check acked messages are removed from map
diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 05c57c2f9f..9a6b6607e1 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
@@ -49,6 +50,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase
*/
private MessageStore _store = new MemoryMessageStore();
+ private StoreContext _storeContext = new StoreContext();
+
private MessageHandleFactory _handleFactory = new MessageHandleFactory();
private int count;
@@ -89,7 +92,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
protected void route(Message m) throws AMQException
{
m.route(exchange);
- m.routingComplete(_store, _handleFactory);
+ m.routingComplete(_store, _storeContext, _handleFactory);
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -198,7 +201,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
private static MessageStore _messageStore = new SkeletonMessageStore();
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
+ private static StoreContext _storeContext = new StoreContext();
+
+ private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 1520e5cec2..6008b8924f 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import java.util.LinkedList;
@@ -41,9 +42,11 @@ public class AMQQueueMBeanTest extends TestCase
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
private MessageStore _messageStore = new SkeletonMessageStore();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null,
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new HashSet<Long>());
private MockProtocolSession _protocolSession;
private AMQChannel _channel;
@@ -140,8 +143,8 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage msg = message(false);
long id = msg.getMessageId();
- _queue.clearQueue();
- _queue.process(msg);
+ _queue.clearQueue(_storeContext);
+ _queue.process(_storeContext, msg);
_queueMBean.viewMessageContent(id);
try
{
@@ -161,7 +164,7 @@ public class AMQQueueMBeanTest extends TestCase
BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1000; // in bytes
+ contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -184,7 +187,7 @@ public class AMQQueueMBeanTest extends TestCase
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(messages[i]);
+ _queue.process(_storeContext, messages[i]);
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
index b9b5634109..0180c2d30c 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -53,6 +54,8 @@ public class AckTest extends TestCase
private TestableMemoryMessageStore _messageStore;
+ private StoreContext _storeContext = new StoreContext();
+
private AMQChannel _channel;
private SubscriptionSet _subscriptionManager;
@@ -82,7 +85,7 @@ public class AckTest extends TestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null,
+ TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
MessageHandleFactory factory = new MessageHandleFactory();
@@ -111,7 +114,7 @@ public class AckTest extends TestCase
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
msg.incrementReference();
- msg.routingComplete(_messageStore, factory);
+ msg.routingComplete(_messageStore, _storeContext, factory);
// we manually send the message to the subscription
_subscription.send(msg, _queue);
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index fe8960c872..8efefaeff5 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ public class ConcurrencyTest extends MessageTestHelper
AMQMessage msg = nextMessage();
if (msg != null)
{
- _deliveryMgr.deliver(toString(), msg);
+ _deliveryMgr.deliver(null, toString(), msg);
}
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index 3631264e5a..fcd2806861 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import junit.framework.TestSuite;
@@ -29,6 +30,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
{
protected final SubscriptionSet _subscriptions = new SubscriptionSet();
protected DeliveryManager _mgr;
+ protected StoreContext _storeContext = new StoreContext();
public DeliveryManagerTest() throws Exception
{
@@ -45,7 +47,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -55,7 +57,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
assertTrue(s1.getMessages().isEmpty());
@@ -93,7 +95,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
assertEquals(batch, s1.getMessages().size());
@@ -107,7 +109,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
s1.setSuspended(true);
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
_mgr.processAsync(new OnCurrentThreadExecutor());
@@ -129,7 +131,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
try
{
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, "Me", msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -151,7 +153,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, "Me", msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 85c7ff90bc..da4627411d 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -40,7 +41,9 @@ class MessageTestHelper extends TestCase
{
private final MessageStore _messageStore = new SkeletonMessageStore();
- private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
+ private final StoreContext _storeContext = new StoreContext();
+
+ private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
@@ -61,7 +64,7 @@ class MessageTestHelper extends TestCase
BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
+ new ContentHeaderBody());
}
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 61edba36f8..dd403f4f9b 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,9 +48,9 @@ public class SkeletonMessageStore implements MessageStore
public void close() throws Exception
{
- }
+ }
- public void removeMessage(long messageId)
+ public void removeMessage(StoreContext s, long messageId)
{
}
@@ -62,28 +62,28 @@ public class SkeletonMessageStore implements MessageStore
{
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException
{
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException
{
}
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext s) throws AMQException
{
}
- public boolean inTran()
+ public boolean inTran(StoreContext sc)
{
return false;
}
-
- public void commitTran() throws AMQException
+
+ public void commitTran(StoreContext storeContext) throws AMQException
{
}
- public void abortTran() throws AMQException
+ public void abortTran(StoreContext storeContext) throws AMQException
{
}
@@ -97,12 +97,12 @@ public class SkeletonMessageStore implements MessageStore
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
{
}
- public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index a40a9bf12f..b874ca9594 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.store;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -36,6 +36,8 @@ public class TestReferenceCounting extends TestCase
{
private TestableMemoryMessageStore _store;
+ private StoreContext _storeContext = new StoreContext();
+
protected void setUp() throws Exception
{
super.setUp();
@@ -48,14 +50,16 @@ public class TestReferenceCounting extends TestCase
public void testMessageGetsRemoved() throws AMQException
{
createPersistentContentHeader();
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null, null),
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+ (byte)0),
+ new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.decrementReference();
+ message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 0);
}
@@ -70,15 +74,17 @@ public class TestReferenceCounting extends TestCase
public void testMessageRemains() throws AMQException
{
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null, null),
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+ (byte)0),
+ new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
message.incrementReference();
- message.decrementReference();
+ message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 1);
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
index 24244d6462..1830a06fd1 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
@@ -24,6 +24,7 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
@@ -43,7 +44,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(op);
buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
@@ -58,7 +59,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new MockOp().expectRollback());
buffer.enlist(new MockOp().expectRollback());
- buffer.rollback();
+ buffer.rollback(null);
validateOps();
store.validate();
@@ -77,7 +78,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new FailedPrepare());
buffer.enlist(new MockOp());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -95,7 +96,7 @@ public class TxnBufferTest extends TestCase
buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new TxnTester(store));
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -127,12 +128,12 @@ public class TxnBufferTest extends TestCase
ops.add(this);
}
- public void prepare()
+ public void prepare(StoreContext context)
{
assertEquals(expected.removeLast(), PREPARE);
}
- public void commit()
+ public void commit(StoreContext context)
{
assertEquals(expected.removeLast(), COMMIT);
}
@@ -142,7 +143,7 @@ public class TxnBufferTest extends TestCase
assertEquals(expected.removeLast(), UNDO_PREPARE);
}
- public void rollback()
+ public void rollback(StoreContext context)
{
assertEquals(expected.removeLast(), ROLLBACK);
}
@@ -249,16 +250,16 @@ public class TxnBufferTest extends TestCase
class NullOp implements TxnOp
{
- public void prepare() throws AMQException
+ public void prepare(StoreContext context) throws AMQException
{
}
- public void commit()
+ public void commit(StoreContext context)
{
}
public void undoPrepare()
{
}
- public void rollback()
+ public void rollback(StoreContext context)
{
}
}
@@ -275,6 +276,8 @@ public class TxnBufferTest extends TestCase
{
private final MessageStore store;
+ private final StoreContext context = new StoreContext();
+
TxnTester(MessageStore store)
{
this.store = store;
@@ -282,12 +285,12 @@ public class TxnBufferTest extends TestCase
public void prepare() throws AMQException
{
- assertTrue("Expected prepare to be performed under txn", store.inTran());
+ assertTrue("Expected prepare to be performed under txn", store.inTran(context));
}
public void commit()
{
- assertTrue("Expected commit not to be performed under txn", !store.inTran());
+ assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
}
}