diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-13 10:35:38 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-13 10:35:38 +0000 |
commit | 72d95cd01b4c6e6d5b26a8baa0a9ed49e0160a97 (patch) | |
tree | 0fcb9a74f1b2f60e692b29f900c0c725c7c6e50f | |
parent | 5b648b767d546f63a72a5b96d17ef6c8a5ff4a94 (diff) | |
download | qpid-python-72d95cd01b4c6e6d5b26a8baa0a9ed49e0160a97.tar.gz |
Changes to MessageStore interface
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655798 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 25 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 03b02707b2..88d5360f3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -222,7 +222,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { queue.delete(); - _messageStore.removeQueue(new AMQShortString(queueName)); + _messageStore.removeQueue(queue); } catch (AMQException ex) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 37969de795..dfc36f5b93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -112,7 +112,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB if (queue.isDurable()) { - store.removeQueue(queue.getName()); + store.removeQueue(queue); } MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 6a3e7e49ef..dcae821604 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; @@ -36,7 +35,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.ClientProperties; import org.apache.log4j.Logger; -import java.util.List; import java.util.ArrayList; import java.util.Collection; @@ -143,7 +141,7 @@ public class IncomingMessage implements Filterable<RuntimeException> if(q.isDurable()) { - _messageStore.enqueueMessage(_txnContext.getStoreContext(), q.getName(), _messageId); + _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 25116817d7..9c8b703bb5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -336,7 +336,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { - // need to get the enqueue lock + incrementQueueCount(); @@ -579,7 +579,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener AMQMessage msg = entry.getMessage(); if(isDurable() && msg.isPersistent()) { - _virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId()); + _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); } //entry.dispose(storeContext); @@ -853,7 +853,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + store.enqueueMessage(storeContext, toQueue, message.getMessageId()); } // dequeue does not decrement the refence count entry.dequeue(storeContext); @@ -946,7 +946,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(message.isReferenced() && message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId()); + store.enqueueMessage(storeContext, toQueue, message.getMessageId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 59b4e62974..cc22569d77 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -736,9 +736,10 @@ public class DerbyMessageStore implements MessageStore return DriverManager.getConnection(_connectionURL); } - public void removeQueue(AMQShortString name) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQException { + AMQShortString name = queue.getName(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); Connection conn = null; @@ -782,12 +783,13 @@ public class DerbyMessageStore implements MessageStore } - public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { boolean localTx = getOrCreateTransaction(context); Connection conn = getConnection(context); ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); + AMQShortString name = queue.getName(); try { @@ -822,9 +824,10 @@ public class DerbyMessageStore implements MessageStore } - public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { + AMQShortString name = queue.getName(); boolean localTx = getOrCreateTransaction(context); Connection conn = getConnection(context); ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index f048059b9b..b02eff957e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -126,17 +126,17 @@ public class MemoryMessageStore implements MessageStore // Not required to do anything } - public void removeQueue(AMQShortString name) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQException { // Not required to do anything } - public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { // Not required to do anything } - public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { // Not required to do anything } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 3a0d865876..e15e69a414 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -138,33 +138,30 @@ public interface MessageStore /** * Removes the specified queue from the persistent store. * - * @param name The queue to remove. - * + * @param queue The queue to remove. * @throws AMQException If the operation fails for any reason. */ - void removeQueue(AMQShortString name) throws AMQException; + void removeQueue(final AMQQueue queue) throws AMQException; /** * Places a message onto a specified queue, in a given transactional context. * * @param context The transactional context for the operation. - * @param name The name of the queue to place the message on. + * @param queue The queue to place the message on. * @param messageId The message to enqueue. - * * @throws AMQException If the operation fails for any reason. */ - void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException; /** * Extracts a message from a specified queue, in a given transactional context. * * @param context The transactional context for the operation. - * @param name The name of the queue to take the message from. + * @param queue The queue to place the message on. * @param messageId The message to dequeue. - * * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. */ - void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException; /** * Begins a transactional context. diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index dea52fea5a..792744903e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -129,17 +129,17 @@ public class SkeletonMessageStore implements MessageStore return null; } - public void removeQueue(AMQShortString name) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQException { } - public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } - public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } |