diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-05 23:16:50 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-05 23:16:50 +0000 |
commit | a0bc8a970759ed6b4b20fea2ce81386e7cff4f7d (patch) | |
tree | c2615a9cd9fc1dfb8d09cec795f76e74e373038e | |
parent | fcfaaaf02693331d0079fb001586d61e2ab9da2e (diff) | |
download | qpid-python-a0bc8a970759ed6b4b20fea2ce81386e7cff4f7d.tar.gz |
Fix to remove reliance on thread local transaction reference and replace it with a StoreContext
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@493231 13f79535-47bb-0310-9956-ffa450edef68
40 files changed, 438 insertions, 260 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 40e2f468e0..0862588a0d 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -82,8 +82,8 @@ <auto_register>true</auto_register> </queue> <store> - <class>org.apache.qpid.server.store.MemoryMessageStore</class> - <!--<class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>--> + <!--<class>org.apache.qpid.server.store.MemoryMessageStore</class>--> + <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class> </store> <virtualhosts>${conf}/virtualhosts.xml</virtualhosts> </broker> diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml index 133b5ce259..42f191cc59 100644 --- a/java/broker/etc/log4j.xml +++ b/java/broker/etc/log4j.xml @@ -34,9 +34,17 @@ </layout> </appender> - <!--<category name="org.apache.qpid.server.queue"> + <category name="org.apache.qpid.server.store"> <priority value="debug"/> - </category>--> + </category> + + <category name="org.apache.qpid.server.queue"> + <priority value="debug"/> + </category> + + <category name="org.apache.qpid.server.txn"> + <priority value="debug"/> + </category> <root> <priority value="info"/> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 24f61b2426..8821830f4d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -37,14 +36,13 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.TxnBuffer; import java.util.*; -import java.util.Set; -import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -93,7 +91,7 @@ public class AMQChannel private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>(); private final MessageStore _messageStore; - + private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH); private final AtomicBoolean _suspended = new AtomicBoolean(false); @@ -102,6 +100,12 @@ public class AMQChannel private TransactionalContext _txnContext; + /** + * A context used by the message store enabling it to track context for a given channel even across + * thread boundaries + */ + private final StoreContext _storeContext = new StoreContext(); + private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); @@ -117,7 +121,7 @@ public class AMQChannel _messageStore = messageStore; _exchanges = exchanges; // by default the session is non-transactional - _txnContext = new NonTransactionalContext(_messageStore, this, _returnMessages, _browsedAcks); + _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } /** @@ -125,7 +129,7 @@ public class AMQChannel */ public void setLocalTransactional() { - _txnContext = new LocalTransactionalContext(_messageStore, new TxnBuffer(), _returnMessages); + _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, new TxnBuffer(), _returnMessages); } public boolean isTransactional() @@ -191,7 +195,7 @@ public class AMQChannel { _currentMessage.setContentHeaderBody(contentHeaderBody); routeCurrentMessage(); - _currentMessage.routingComplete(_messageStore, _messageHandleFactory); + _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) @@ -213,7 +217,7 @@ public class AMQChannel // received try { - if (_currentMessage.addContentBodyFrame(contentBody)) + if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case @@ -453,10 +457,10 @@ public class AMQChannel { if (message.queue == queue) { - message.queue = null; try { - message.message.decrementReference(); + message.discard(_storeContext); + message.queue = null; } catch (AMQException e) { @@ -536,7 +540,7 @@ public class AMQChannel _browsedAcks.add(deliveryTag); addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); } - + private void checkSuspension() { boolean suspend; @@ -608,6 +612,11 @@ public class AMQChannel return _defaultQueue; } + public StoreContext getStoreContext() + { + return _storeContext; + } + public void processReturns(AMQProtocolSession session) throws AMQException { for (RequiredDeliveryException bouncedMessage : _returnMessages) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index efc448bab2..c0a631080e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,6 +22,7 @@ package org.apache.qpid.server.ack; import org.apache.qpid.AMQException; import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; import java.util.List; @@ -94,12 +95,12 @@ public class TxAck implements TxnOp return false; } - public void prepare() throws AMQException + public void prepare(StoreContext storeContext) throws AMQException { //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { - msg.discard(); + msg.discard(storeContext); } } @@ -115,13 +116,13 @@ public class TxAck implements TxnOp } } - public void commit() + public void commit(StoreContext storeContext) { //remove the unacked messages from the channels map _map.remove(_unacked); } - public void rollback() + public void rollback(StoreContext storeContext) { } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 0eff5a3cca..26f41e19af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ package org.apache.qpid.server.ack; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; public class UnacknowledgedMessage { @@ -30,7 +31,7 @@ public class UnacknowledgedMessage public final String consumerTag; public final long deliveryTag; public AMQQueue queue; - + public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag) { this.queue = queue; @@ -39,13 +40,13 @@ public class UnacknowledgedMessage this.deliveryTag = deliveryTag; } - public void discard() throws AMQException + public void discard(StoreContext storeContext) throws AMQException { if (queue != null) { - message.dequeue(queue); + message.dequeue(storeContext, queue); } - message.decrementReference(); + message.decrementReference(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 5091dded8a..f30667690f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.message.MessageDecorator; import org.apache.qpid.server.message.jms.JMSMessage; @@ -71,8 +72,8 @@ public class AMQMessage private boolean _deliveredToConsumer; private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; - private AtomicBoolean _taken; - + private AtomicBoolean _taken = new AtomicBoolean(false); + private TransientMessageData _transientMessageData = new TransientMessageData(); /** @@ -92,7 +93,15 @@ public class AMQMessage public boolean hasNext() { - return _index < _messageHandle.getBodyCount() - 1; + try + { + return _index < _messageHandle.getBodyCount(_messageId) - 1; + } + catch (AMQException e) + { + _log.error("Unable to get body count: " + e, e); + return false; + } } public AMQDataBlock next() @@ -123,7 +132,15 @@ public class AMQMessage public boolean hasNext() { - return _index < _messageHandle.getBodyCount() - 1; + try + { + return _index < _messageHandle.getBodyCount(_messageId) - 1; + } + catch (AMQException e) + { + _log.error("Error getting body count: " + e, e); + return false; + } } public ContentBody next() @@ -203,15 +220,15 @@ public class AMQMessage public AMQMessage(long messageId, BasicPublishBody publishBody, TransactionalContext txnContext, ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, - List<ContentBody> contentBodies, MessageStore messageStore, + List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException { this(messageId, publishBody, txnContext, contentHeader); _transientMessageData.setDestinationQueues(destinationQueues); - routingComplete(messageStore, messageHandleFactory); + routingComplete(messageStore, storeContext, messageHandleFactory); for (ContentBody cb : contentBodies) { - addContentBodyFrame(cb); + addContentBodyFrame(storeContext, cb); } } @@ -252,7 +269,7 @@ public class AMQMessage _transientMessageData.setContentHeaderBody(contentHeaderBody); } - public void routingComplete(MessageStore store, MessageHandleFactory factory) throws AMQException + public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory) throws AMQException { final boolean persistent = isPersistent(); _messageHandle = factory.createMessageHandle(_messageId, store, persistent); @@ -265,22 +282,22 @@ public class AMQMessage // persistent store for (AMQQueue q : _transientMessageData.getDestinationQueues()) { - _messageHandle.enqueue(_messageId, q); + _messageHandle.enqueue(storeContext, _messageId, q); } if (_transientMessageData.getContentHeaderBody().bodySize == 0) { - deliver(); + deliver(storeContext); } } - public boolean addContentBodyFrame(ContentBody contentBody) throws AMQException + public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException { _transientMessageData.addBodyLength(contentBody.getSize()); - _messageHandle.addContentBodyFrame(_messageId, contentBody); + _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody); if (isAllContentReceived()) { - deliver(); + deliver(storeContext); return true; } else @@ -318,7 +335,7 @@ public class AMQMessage * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference() throws MessageCleanupException + public void decrementReference(StoreContext storeContext) throws MessageCleanupException { // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after @@ -336,7 +353,7 @@ public class AMQMessage // and the handle has not yet been constructed if (_messageHandle != null) { - _messageHandle.removeMessage(_messageId); + _messageHandle.removeMessage(storeContext, _messageId); } } catch (AMQException e) @@ -422,7 +439,7 @@ public class AMQMessage { _taken.set(false); } - + public boolean checkToken(Object token) { if (_tokens.contains(token)) @@ -449,9 +466,9 @@ public class AMQMessage _transientMessageData.addDestinationQueue(queue); } - public void dequeue(AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException { - _messageHandle.dequeue(_messageId, queue); + _messageHandle.dequeue(storeContext, _messageId, queue); } public boolean isPersistent() throws AMQException @@ -515,7 +532,7 @@ public class AMQMessage _deliveredToConsumer = true; } - private void deliver() throws AMQException + private void deliver(StoreContext storeContext) throws AMQException { // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible @@ -524,7 +541,7 @@ public class AMQMessage { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks - _messageHandle.setPublishAndContentHeaderBody(_messageId, _transientMessageData.getPublishBody(), + _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getPublishBody(), _transientMessageData.getContentHeaderBody()); // we then allow the transactional context to do something with the message content @@ -541,7 +558,7 @@ public class AMQMessage finally { destinationQueues.clear(); - decrementReference(); + decrementReference(storeContext); } } @@ -572,7 +589,7 @@ public class AMQMessage } // - // Now start writing out the other content bodies + // Now start writing out the other content bodies // while (bodyFrameIterator.hasNext()) { @@ -597,7 +614,7 @@ public class AMQMessage private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException { AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, - replyCode, replyText, + replyCode, replyText, getPublishBody().routingKey); ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem? returnFrame.writePayload(buf); @@ -641,4 +658,10 @@ public class AMQMessage protocolSession.writeFrame(bodyFrameIterator.next()); } } + + public String toString() + { + return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + + _taken; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 949b245c35..31b1f668fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -9,6 +9,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -18,7 +19,7 @@ import org.apache.qpid.framing.ContentHeaderBody; * even no caching at all to minimise the broker memory footprint. * * The method all take a messageId to avoid having to store it in the instance - the AMQMessage container - * must already keen the messageId so it is pointless storing it twice. + * must already keen the messageId so it is pointless storing it twice. */ public interface AMQMessageHandle { @@ -27,7 +28,7 @@ public interface AMQMessageHandle /** * @return the number of body frames associated with this message */ - int getBodyCount(); + int getBodyCount(long messageId) throws AMQException; /** * @return the size of the body @@ -42,23 +43,23 @@ public interface AMQMessageHandle */ ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException; - void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException; + void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException; BasicPublishBody getPublishBody(long messageId) throws AMQException; boolean isRedelivered(); void setRedelivered(boolean redelivered); - + boolean isPersistent(long messageId) throws AMQException; - void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody, + void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException; - void removeMessage(long messageId) throws AMQException; + void removeMessage(StoreContext storeContext, long messageId) throws AMQException; - void enqueue(long messageId, AMQQueue queue) throws AMQException; + void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; - void dequeue(long messageId, AMQQueue queue) throws AMQException; + void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 7ab48598c4..fa1257b5f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -27,11 +27,10 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.server.store.StoreContext; import javax.management.JMException; import java.text.MessageFormat; -import java.util.Iterator; import java.util.List; import java.util.concurrent.Executor; @@ -344,17 +343,17 @@ public class AMQQueue implements Managable, Comparable /** * Removes the AMQMessage from the top of the queue. */ - public void deleteMessageFromTop() throws AMQException + public void deleteMessageFromTop(StoreContext storeContext) throws AMQException { - _deliveryMgr.removeAMessageFromTop(); + _deliveryMgr.removeAMessageFromTop(storeContext); } /** * removes all the messages from the queue. */ - public void clearQueue() throws AMQException + public void clearQueue(StoreContext storeContext) throws AMQException { - _deliveryMgr.clearAllMessages(); + _deliveryMgr.clearAllMessages(storeContext); } public void bind(String routingKey, Exchange exchange) @@ -378,7 +377,7 @@ public class AMQQueue implements Managable, Comparable { if (_deliveryMgr.hasQueuedMessages()) { - _deliveryMgr.populatePreDeliveryQueue(subscription); + _deliveryMgr.populatePreDeliveryQueue(subscription); } } @@ -464,9 +463,9 @@ public class AMQQueue implements Managable, Comparable msg.incrementReference(); } */ - public void process(AMQMessage msg) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { - _deliveryMgr.deliver(getName(), msg); + _deliveryMgr.deliver(storeContext, getName(), msg); try { msg.checkDeliveredToConsumer(); @@ -476,16 +475,16 @@ public class AMQQueue implements Managable, Comparable { // as this message will be returned, it should be removed // from the queue: - dequeue(msg); + dequeue(storeContext, msg); } } - void dequeue(AMQMessage msg) throws FailedDequeueException + void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException { try { - msg.dequeue(this); - msg.decrementReference(); + msg.dequeue(storeContext, this); + msg.decrementReference(storeContext); } catch (MessageCleanupException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 169055bad0..77bbdf7b4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -44,6 +45,12 @@ import java.util.Iterator; @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue { + /** + * Since the MBean is not associated with a real channel we can safely create our own store context + * for use in the few methods that require one. + */ + private StoreContext _storeContext = new StoreContext(); + private AMQQueue _queue = null; private String _queueName = null; // OpenMBean data types for viewMessages method @@ -241,13 +248,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } /** - * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop() + * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop */ public void deleteMessageFromTop() throws JMException { try { - _queue.deleteMessageFromTop(); + _queue.deleteMessageFromTop(_storeContext); } catch (AMQException ex) { @@ -256,13 +263,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } /** - * @see org.apache.qpid.server.queue.AMQQueue#clearQueue() + * @see org.apache.qpid.server.queue.AMQQueue#clearQueue */ public void clearQueue() throws JMException { try { - _queue.clearQueue(); + _queue.clearQueue(_storeContext); } catch (AMQException ex) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index dda074aca7..a2898ccdce 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import java.util.ArrayList; @@ -200,21 +201,21 @@ public class ConcurrentDeliveryManager implements DeliveryManager //no-op . This DM has no PreDeliveryQueues } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -293,7 +294,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - public void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException + public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 93baa3fc29..8f0c3a5ec7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -26,6 +26,7 @@ import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; import java.util.Iterator; @@ -167,21 +168,21 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -279,11 +280,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.poll(); } - public void deliver(String name, AMQMessage msg) throws AMQException + public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :" + System.identityHashCode(msg)); + _log.debug(id() + "deliver :" + msg); } //Check if we have someone to deliver the message to. @@ -296,7 +297,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); + _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery"); } if (!msg.getPublishBody().immediate) { @@ -308,7 +309,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -330,7 +331,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg); @@ -345,7 +346,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isDebugEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); } //Deliver the message diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 6f31616114..82d8f9538f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import java.util.concurrent.Executor; import java.util.List; @@ -66,11 +67,11 @@ interface DeliveryManager * @param msg the message to deliver * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued */ - void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException; + void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException; - void removeAMessageFromTop() throws AMQException; + void removeAMessageFromTop(StoreContext storeContext) throws AMQException; - void clearAllMessages() throws AMQException; + void clearAllMessages(StoreContext storeContext) throws AMQException; List<AMQMessage> getMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 4a8616746c..1ddcdd8919 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -9,6 +9,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; @@ -39,7 +40,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _contentHeaderBody; } - public int getBodyCount() + public int getBodyCount(long messageId) { return _contentBodies.size(); } @@ -59,7 +60,8 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _contentBodies.get(index); } - public void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException + public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) + throws AMQException { _contentBodies.add(contentBody); } @@ -94,7 +96,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException { @@ -102,15 +104,18 @@ public class InMemoryMessageHandle implements AMQMessageHandle _contentHeaderBody = contentHeaderBody; } - public void removeMessage(long messageId) throws AMQException + public void removeMessage(StoreContext storeContext, long messageId) throws AMQException { + // NO OP } - public void enqueue(long messageId, AMQQueue queue) throws AMQException + public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { + // NO OP } - public void dequeue(long messageId, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { + // NO OP } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index d04b6d3f60..2dab551e07 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,5 +44,5 @@ public interface Subscription void close(); - boolean isBrowser(); + boolean isBrowser(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 56f3c0323e..0dc1f3b0c1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,20 +21,20 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; -import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import java.util.Queue; @@ -211,7 +211,7 @@ public class SubscriptionImpl implements Subscription } else { - sendToConsumer(msg, queue); + sendToConsumer(channel.getStoreContext(), msg, queue); } } else @@ -239,7 +239,8 @@ public class SubscriptionImpl implements Subscription } } - private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException + private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) + throws AMQException { try { @@ -254,7 +255,11 @@ public class SubscriptionImpl implements Subscription // the message is unacked, it will be lost. if (!_acks) { - queue.dequeue(msg); + if (_logger.isDebugEnabled()) + { + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + queue.dequeue(storeContext, msg); } synchronized(channel) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index 7332ffbbee..f290452058 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import org.apache.log4j.Logger; import java.util.LinkedList; @@ -129,21 +130,21 @@ class SynchronizedDeliveryManager implements DeliveryManager //no-op . This DM has no PreDeliveryQueues } - public synchronized void removeAMessageFromTop() throws AMQException + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); if (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); } } - public synchronized void clearAllMessages() throws AMQException + public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); while (msg != null) { - msg.dequeue(_queue); + msg.dequeue(storeContext, _queue); msg = poll(); } } @@ -233,7 +234,7 @@ class SynchronizedDeliveryManager implements DeliveryManager * @throws NoConsumersException if there are no active subscribers to deliver * the message to */ - public void deliver(String name, AMQMessage msg) throws FailedDequeueException, AMQException + public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index c6f5c19940..284b34e1bc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -14,10 +14,12 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import java.lang.ref.WeakReference; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; +import java.util.LinkedList; /** * @author Robert Greig (robert.j.greig@jpmorgan.com) @@ -28,7 +30,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle private WeakReference<BasicPublishBody> _publishBody; - private List<WeakReference<ContentBody>> _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + private List<WeakReference<ContentBody>> _contentBodies; private boolean _redelivered; @@ -52,8 +54,18 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return chb; } - public int getBodyCount() + public int getBodyCount(long messageId) throws AMQException { + if (_contentBodies == null) + { + MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + int chunkCount = mmd.getContentChunkCount(); + _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount); + for (int i = 0; i < chunkCount; i++) + { + _contentBodies.add(new WeakReference<ContentBody>(null)); + } + } return _contentBodies.size(); } @@ -79,10 +91,14 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return cb; } - public void addContentBodyFrame(long messageId, ContentBody contentBody) throws AMQException + public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException { + if (_contentBodies == null) + { + _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + } _contentBodies.add(new WeakReference<ContentBody>(contentBody)); - _messageStore.storeContentBodyChunk(messageId, _contentBodies.size() - 1, contentBody); + _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody); } public BasicPublishBody getPublishBody(long messageId) throws AMQException @@ -122,28 +138,28 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException { - _messageStore.storeMessageMetaData(messageId, new MessageMetaData(publishBody, contentHeaderBody, - _contentBodies.size())); + _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody, + _contentBodies.size())); _publishBody = new WeakReference<BasicPublishBody>(publishBody); _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody); } - public void removeMessage(long messageId) throws AMQException + public void removeMessage(StoreContext storeContext, long messageId) throws AMQException { - _messageStore.removeMessage(messageId); + _messageStore.removeMessage(storeContext, messageId); } - public void enqueue(long messageId, AMQQueue queue) throws AMQException + public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { - _messageStore.enqueueMessage(queue.getName(), messageId); + _messageStore.enqueueMessage(storeContext, queue.getName(), messageId); } - public void dequeue(long messageId, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { - _messageStore.dequeueMessage(queue.getName(), messageId); + _messageStore.dequeueMessage(storeContext, queue.getName(), messageId); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 4d6c79d37b..edf2386314 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -85,7 +85,7 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(long messageId) + public void removeMessage(StoreContext context, long messageId) { if (_log.isDebugEnabled()) { @@ -105,32 +105,32 @@ public class MemoryMessageStore implements MessageStore // Not required to do anything } - public void enqueueMessage(String name, long messageId) throws AMQException + public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException { // Not required to do anything } - public void dequeueMessage(String name, long messageId) throws AMQException + public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException { // Not required to do anything } - public void beginTran() throws AMQException + public void beginTran(StoreContext context) throws AMQException { // Not required to do anything } - public void commitTran() throws AMQException + public void commitTran(StoreContext context) throws AMQException { // Not required to do anything } - public void abortTran() throws AMQException + public void abortTran(StoreContext context) throws AMQException { // Not required to do anything } - public boolean inTran() + public boolean inTran(StoreContext context) { return false; } @@ -145,7 +145,8 @@ public class MemoryMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException + public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) + throws AMQException { List<ContentBody> bodyList = _contentBodyMap.get(messageId); if (bodyList == null) @@ -157,7 +158,8 @@ public class MemoryMessageStore implements MessageStore bodyList.add(index, contentBody); } - public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) + throws AMQException { _metaDataMap.put(messageId, messageMetaData); } @@ -169,7 +171,7 @@ public class MemoryMessageStore implements MessageStore public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException { - List<ContentBody> bodyList = _contentBodyMap.get(messageId); + List<ContentBody> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index eff0818afb..973c661c06 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,23 +48,23 @@ public interface MessageStore */ void close() throws Exception; - void removeMessage(long messageId) throws AMQException; + void removeMessage(StoreContext storeContext, long messageId) throws AMQException; void createQueue(AMQQueue queue) throws AMQException; void removeQueue(String name) throws AMQException; - void enqueueMessage(String name, long messageId) throws AMQException; + void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException; - void dequeueMessage(String name, long messageId) throws AMQException; + void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException; - void beginTran() throws AMQException; + void beginTran(StoreContext context) throws AMQException; - void commitTran() throws AMQException; + void commitTran(StoreContext context) throws AMQException; - void abortTran() throws AMQException; + void abortTran(StoreContext context) throws AMQException; - boolean inTran(); + boolean inTran(StoreContext context); /** * Recreate all queues that were persisted, including re-enqueuing of existing messages @@ -79,9 +79,9 @@ public interface MessageStore */ long getNewMessageId(); - void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException; + void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException; - void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException; + void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException; MessageMetaData getMessageMetaData(long messageId) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java new file mode 100644 index 0000000000..55e5067852 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +/** + * A context that the store can use to associate with a transactional context. For example, it could store + * some kind of txn id. + * + * @author Apache Software Foundation + */ +public class StoreContext +{ + private Object _payload; + + public Object getPayload() + { + return _payload; + } + + public void setPayload(Object payload) + { + _payload = payload; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java index 7ff609b750..6d5d3c42f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -22,6 +22,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.store.StoreContext; import java.util.List; @@ -42,7 +43,7 @@ public class CleanupMessageOperation implements TxnOp _returns = returns; } - public void prepare() throws AMQException + public void prepare(StoreContext context) throws AMQException { } @@ -53,7 +54,7 @@ public class CleanupMessageOperation implements TxnOp //or enqueued on any queues and can be discarded } - public void commit() + public void commit(StoreContext context) { //The routers reference can now be released. This is done //here to ensure that it happens after the queues that @@ -61,7 +62,7 @@ public class CleanupMessageOperation implements TxnOp //memory only operation is done in the commit phase). try { - _msg.decrementReference(); + _msg.decrementReference(context); } catch (AMQException e) { @@ -83,7 +84,8 @@ public class CleanupMessageOperation implements TxnOp } } - public void rollback() + public void rollback(StoreContext context) { + // NO OP } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java index 059c13c687..78887030e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java @@ -12,6 +12,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; /** * @author Robert Greig (robert.j.greig@jpmorgan.com) @@ -31,22 +32,22 @@ public class DeliverMessageOperation implements TxnOp _msg.incrementReference(); } - public void prepare() throws AMQException - { + public void prepare(StoreContext context) throws AMQException + { } public void undoPrepare() { } - public void commit() + public void commit(StoreContext context) { //do the memeory part of the record() _msg.incrementReference(); //then process the message try { - _queue.process(_msg); + _queue.process(context, _msg); } catch (AMQException e) { @@ -55,7 +56,7 @@ public class DeliverMessageOperation implements TxnOp } } - public void rollback() + public void rollback(StoreContext storeContext) { } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 06522740cc..ff6abdd58b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.List; @@ -45,12 +46,15 @@ public class LocalTransactionalContext implements TransactionalContext private final MessageStore _messageStore; + private final StoreContext _storeContext; + private boolean _inTran = false; - public LocalTransactionalContext(MessageStore messageStore, + public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages) { _messageStore = messageStore; + _storeContext = storeContext; _txnBuffer = txnBuffer; _returnMessages = returnMessages; _txnBuffer.enlist(new StoreMessageOperation(messageStore)); @@ -58,7 +62,7 @@ public class LocalTransactionalContext implements TransactionalContext public void rollback() throws AMQException { - _txnBuffer.rollback(); + _txnBuffer.rollback(_storeContext); } public void deliver(AMQMessage message, AMQQueue queue) throws AMQException @@ -125,7 +129,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (!_inTran) { - _messageStore.beginTran(); + _messageStore.beginTran(_storeContext); _inTran = true; } } @@ -134,11 +138,11 @@ public class LocalTransactionalContext implements TransactionalContext { if (_ackOp != null) { - _ackOp.consolidate(); + _ackOp.consolidate(); //already enlisted, after commit will reset regardless of outcome _ackOp = null; } - _txnBuffer.commit(); + _txnBuffer.commit(_storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 7321854034..f09d811b50 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; import java.util.List; @@ -54,15 +55,18 @@ public class NonTransactionalContext implements TransactionalContext private final MessageStore _messageStore; + private StoreContext _storeContext; + /** * Whether we are in a transaction */ private boolean _inTran; - public NonTransactionalContext(MessageStore messageStore, AMQChannel channel, + public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel, List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks) { _channel = channel; + _storeContext = storeContext; _returnMessages = returnMessages; _messageStore = messageStore; _browsedAcks = browsedAcks; @@ -72,7 +76,7 @@ public class NonTransactionalContext implements TransactionalContext { if (!_inTran) { - _messageStore.beginTran(); + _messageStore.beginTran(_storeContext); _inTran = true; } } @@ -92,7 +96,7 @@ public class NonTransactionalContext implements TransactionalContext try { message.incrementReference(); - queue.process(message); + queue.process(_storeContext, message); //following check implements the functionality //required by the 'immediate' flag: message.checkDeliveredToConsumer(); @@ -122,7 +126,11 @@ public class NonTransactionalContext implements TransactionalContext { if (!_browsedAcks.contains(deliveryTag)) { - message.discard(); + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + message.message.getMessageId()); + } + message.discard(_storeContext); } else { @@ -150,7 +158,11 @@ public class NonTransactionalContext implements TransactionalContext { if (!_browsedAcks.contains(deliveryTag)) { - msg.discard(); + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + msg.discard(_storeContext); } else { @@ -171,10 +183,11 @@ public class NonTransactionalContext implements TransactionalContext throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); } - msg.discard(); + msg.discard(_storeContext); if (_log.isDebugEnabled()) { - _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag); + _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + + msg.message.getMessageId()); } } } @@ -183,7 +196,7 @@ public class NonTransactionalContext implements TransactionalContext { if (persistent) { - _messageStore.commitTran(); + _messageStore.commitTran(_storeContext); _inTran = false; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java index 2a3b524060..e8100b49bc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java @@ -10,6 +10,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; /** * A transactional operation to store messages in an underlying persistent store. When this operation @@ -25,7 +26,7 @@ public class StoreMessageOperation implements TxnOp _messsageStore = messageStore; } - public void prepare() throws AMQException + public void prepare(StoreContext context) throws AMQException { } @@ -33,13 +34,13 @@ public class StoreMessageOperation implements TxnOp { } - public void commit() throws AMQException + public void commit(StoreContext context) throws AMQException { - _messsageStore.commitTran(); + _messsageStore.commitTran(context); } - public void rollback() throws AMQException + public void rollback(StoreContext context) throws AMQException { - _messsageStore.abortTran(); + _messsageStore.abortTran(context); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index 63cb6f738b..069caf0ae1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.txn; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; import java.util.ArrayList; import java.util.List; @@ -39,26 +40,26 @@ public class TxnBuffer { } - public void commit() throws AMQException + public void commit(StoreContext context) throws AMQException { - if (prepare()) + if (prepare(context)) { for (TxnOp op : _ops) { - op.commit(); + op.commit(context); } } _ops.clear(); } - private boolean prepare() + private boolean prepare(StoreContext context) { for (int i = 0; i < _ops.size(); i++) { TxnOp op = _ops.get(i); try { - op.prepare(); + op.prepare(context); } catch (Exception e) { @@ -73,11 +74,11 @@ public class TxnBuffer return true; } - public void rollback() throws AMQException + public void rollback(StoreContext context) throws AMQException { for (TxnOp op : _ops) { - op.rollback(); + op.rollback(context); } _ops.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java index 7cb2515c35..919c078cf0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.txn; import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.StoreContext; /** * This provides the abstraction of an individual operation within a @@ -31,12 +32,12 @@ public interface TxnOp /** * Do the part of the operation that updates persistent state */ - public void prepare() throws AMQException; + public void prepare(StoreContext context) throws AMQException; /** * Complete the operation started by prepare. Can now update in * memory state or make netork transfers. */ - public void commit() throws AMQException; + public void commit(StoreContext context) throws AMQException; /** * This is not the same as rollback. Unfortunately the use of an * in memory reference count as a locking mechanism and a test for @@ -50,5 +51,5 @@ public interface TxnOp /** * Rolls back the operation. */ - public void rollback() throws AMQException; + public void rollback(StoreContext context) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index adae209493..b395579cea 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -58,10 +59,10 @@ public class ClusteredQueue extends AMQQueue _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); } - public void process(AMQMessage msg) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this)); - super.process(msg); + super.process(storeContext, msg); } protected void autodelete() throws AMQException diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java index ddee643a76..9f41510b5d 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java @@ -109,7 +109,7 @@ public class ServiceProvidingClient _logger.info("About to create a producer"); _destinationProducer = session.createProducer(responseDest); _destinationProducer.setDisableMessageTimestamp(true); - _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _destinationProducer.setDeliveryMode(DeliveryMode.PERSISTENT); _logger.info("After create a producer"); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java index b52d06558a..12c3ac2afe 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -185,7 +185,7 @@ public class ServiceRequestingClient implements ExceptionListener AMQQueue destination = new AMQQueue(commandQueueName); _producer = (MessageProducer) _session.createProducer(destination); _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _producer.setDeliveryMode(DeliveryMode.PERSISTENT); _tempDestination = new AMQQueue("TempResponse" + Long.toString(System.currentTimeMillis()), true); diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index 258bcecc41..ac5b9ed14c 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -91,16 +92,20 @@ public class TxAckTest extends TestCase private final TxAck _op = new TxAck(_map); private final List<Long> _acked; private final List<Long> _unacked; + private StoreContext _storeContext = new StoreContext(); Scenario(int messageCount, List<Long> acked, List<Long> unacked) { - TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), null, + TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), + _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); for(int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; - TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody(), txnContext); + // TODO: fix hardcoded protocol version data + TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, + (byte)0), txnContext); _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); } _acked = acked; @@ -125,7 +130,7 @@ public class TxAckTest extends TestCase void prepare() throws AMQException { _op.consolidate(); - _op.prepare(); + _op.prepare(_storeContext); assertCount(_acked, -1); assertCount(_unacked, 0); @@ -143,7 +148,7 @@ public class TxAckTest extends TestCase void commit() { _op.consolidate(); - _op.commit(); + _op.commit(_storeContext); //check acked messages are removed from map diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 05c57c2f9f..9a6b6607e1 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; @@ -49,6 +50,8 @@ public class AbstractHeadersExchangeTestBase extends TestCase */ private MessageStore _store = new MemoryMessageStore(); + private StoreContext _storeContext = new StoreContext(); + private MessageHandleFactory _handleFactory = new MessageHandleFactory(); private int count; @@ -89,7 +92,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase protected void route(Message m) throws AMQException { m.route(exchange); - m.routingComplete(_store, _handleFactory); + m.routingComplete(_store, _storeContext, _handleFactory); } protected void routeAndTest(Message m, TestQueue... expected) throws AMQException @@ -198,7 +201,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase { private static MessageStore _messageStore = new SkeletonMessageStore(); - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null, + private static StoreContext _storeContext = new StoreContext(); + + private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, + null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 1520e5cec2..6008b8924f 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; import javax.management.JMException; import java.util.LinkedList; @@ -41,9 +42,11 @@ public class AMQQueueMBeanTest extends TestCase private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; private MessageStore _messageStore = new SkeletonMessageStore(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null, + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new HashSet<Long>()); private MockProtocolSession _protocolSession; private AMQChannel _channel; @@ -140,8 +143,8 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); - _queue.clearQueue(); - _queue.process(msg); + _queue.clearQueue(_storeContext); + _queue.process(_storeContext, msg); _queueMBean.viewMessageContent(id); try { @@ -161,7 +164,7 @@ public class AMQQueueMBeanTest extends TestCase BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1000; // in bytes + contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -184,7 +187,7 @@ public class AMQQueueMBeanTest extends TestCase } for (int i = 0; i < messageCount; i++) { - _queue.process(messages[i]); + _queue.process(_storeContext, messages[i]); } } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java index b9b5634109..0180c2d30c 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.TestApplicationRegistry; @@ -53,6 +54,8 @@ public class AckTest extends TestCase private TestableMemoryMessageStore _messageStore; + private StoreContext _storeContext = new StoreContext(); + private AMQChannel _channel; private SubscriptionSet _subscriptionManager; @@ -82,7 +85,7 @@ public class AckTest extends TestCase private void publishMessages(int count, boolean persistent) throws AMQException { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null, + TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); MessageHandleFactory factory = new MessageHandleFactory(); @@ -111,7 +114,7 @@ public class AckTest extends TestCase // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription msg.incrementReference(); - msg.routingComplete(_messageStore, factory); + msg.routingComplete(_messageStore, _storeContext, factory); // we manually send the message to the subscription _subscription.send(msg, _queue); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java index fe8960c872..8efefaeff5 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ public class ConcurrencyTest extends MessageTestHelper AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(toString(), msg); + _deliveryMgr.deliver(null, toString(), msg); } } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index 3631264e5a..fcd2806861 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import junit.framework.TestSuite; @@ -29,6 +30,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper { protected final SubscriptionSet _subscriptions = new SubscriptionSet(); protected DeliveryManager _mgr; + protected StoreContext _storeContext = new StoreContext(); public DeliveryManagerTest() throws Exception { @@ -45,7 +47,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -55,7 +57,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } assertTrue(s1.getMessages().isEmpty()); @@ -93,7 +95,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } assertEquals(batch, s1.getMessages().size()); @@ -107,7 +109,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -129,7 +131,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper try { AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, "Me", msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -151,7 +153,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, "Me", msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java index 85c7ff90bc..da4627411d 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; @@ -40,7 +41,9 @@ class MessageTestHelper extends TestCase { private final MessageStore _messageStore = new SkeletonMessageStore(); - private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null, + private final StoreContext _storeContext = new StoreContext(); + + private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); @@ -61,7 +64,7 @@ class MessageTestHelper extends TestCase BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody()); + new ContentHeaderBody()); } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 61edba36f8..dd403f4f9b 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,9 +48,9 @@ public class SkeletonMessageStore implements MessageStore public void close() throws Exception { - } + } - public void removeMessage(long messageId) + public void removeMessage(StoreContext s, long messageId) { } @@ -62,28 +62,28 @@ public class SkeletonMessageStore implements MessageStore { } - public void enqueueMessage(String name, long messageId) throws AMQException + public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException { } - public void dequeueMessage(String name, long messageId) throws AMQException + public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException { } - public void beginTran() throws AMQException + public void beginTran(StoreContext s) throws AMQException { } - public boolean inTran() + public boolean inTran(StoreContext sc) { return false; } - - public void commitTran() throws AMQException + + public void commitTran(StoreContext storeContext) throws AMQException { } - public void abortTran() throws AMQException + public void abortTran(StoreContext storeContext) throws AMQException { } @@ -97,12 +97,12 @@ public class SkeletonMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException { } - public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException { } diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index a40a9bf12f..b874ca9594 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.store; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -36,6 +36,8 @@ public class TestReferenceCounting extends TestCase { private TestableMemoryMessageStore _store; + private StoreContext _storeContext = new StoreContext(); + protected void setUp() throws Exception { super.setUp(); @@ -48,14 +50,16 @@ public class TestReferenceCounting extends TestCase public void testMessageGetsRemoved() throws AMQException { createPersistentContentHeader(); - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null, null), + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, + (byte)0), + new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertTrue(_store.getMessageMetaDataMap().size() == 1); - message.decrementReference(); + message.decrementReference(_storeContext); assertTrue(_store.getMessageMetaDataMap().size() == 0); } @@ -70,15 +74,17 @@ public class TestReferenceCounting extends TestCase public void testMessageRemains() throws AMQException { - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null, null), + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, + (byte)0), + new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertTrue(_store.getMessageMetaDataMap().size() == 1); message.incrementReference(); - message.decrementReference(); + message.decrementReference(_storeContext); assertTrue(_store.getMessageMetaDataMap().size() == 1); } diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java index 24244d6462..1830a06fd1 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -24,6 +24,7 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; @@ -43,7 +44,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(op); buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); @@ -58,7 +59,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(new MockOp().expectRollback()); buffer.enlist(new MockOp().expectRollback()); - buffer.rollback(); + buffer.rollback(null); validateOps(); store.validate(); @@ -77,7 +78,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(new FailedPrepare()); buffer.enlist(new MockOp()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -95,7 +96,7 @@ public class TxnBufferTest extends TestCase buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new TxnTester(store)); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -127,12 +128,12 @@ public class TxnBufferTest extends TestCase ops.add(this); } - public void prepare() + public void prepare(StoreContext context) { assertEquals(expected.removeLast(), PREPARE); } - public void commit() + public void commit(StoreContext context) { assertEquals(expected.removeLast(), COMMIT); } @@ -142,7 +143,7 @@ public class TxnBufferTest extends TestCase assertEquals(expected.removeLast(), UNDO_PREPARE); } - public void rollback() + public void rollback(StoreContext context) { assertEquals(expected.removeLast(), ROLLBACK); } @@ -249,16 +250,16 @@ public class TxnBufferTest extends TestCase class NullOp implements TxnOp { - public void prepare() throws AMQException + public void prepare(StoreContext context) throws AMQException { } - public void commit() + public void commit(StoreContext context) { } public void undoPrepare() { } - public void rollback() + public void rollback(StoreContext context) { } } @@ -275,6 +276,8 @@ public class TxnBufferTest extends TestCase { private final MessageStore store; + private final StoreContext context = new StoreContext(); + TxnTester(MessageStore store) { this.store = store; @@ -282,12 +285,12 @@ public class TxnBufferTest extends TestCase public void prepare() throws AMQException { - assertTrue("Expected prepare to be performed under txn", store.inTran()); + assertTrue("Expected prepare to be performed under txn", store.inTran(context)); } public void commit() { - assertTrue("Expected commit not to be performed under txn", !store.inTran()); + assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); } } |