summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-13 10:35:38 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-13 10:35:38 +0000
commit72d95cd01b4c6e6d5b26a8baa0a9ed49e0160a97 (patch)
tree0fcb9a74f1b2f60e692b29f900c0c725c7c6e50f
parent5b648b767d546f63a72a5b96d17ef6c8a5ff4a94 (diff)
downloadqpid-python-72d95cd01b4c6e6d5b26a8baa0a9ed49e0160a97.tar.gz
Changes to MessageStore interface
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655798 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java15
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java6
8 files changed, 25 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 03b02707b2..88d5360f3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -222,7 +222,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
try
{
queue.delete();
- _messageStore.removeQueue(new AMQShortString(queueName));
+ _messageStore.removeQueue(queue);
}
catch (AMQException ex)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 37969de795..dfc36f5b93 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -112,7 +112,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
if (queue.isDurable())
{
- store.removeQueue(queue.getName());
+ store.removeQueue(queue);
}
MethodRegistry methodRegistry = session.getMethodRegistry();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 6a3e7e49ef..dcae821604 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
@@ -36,7 +35,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
import org.apache.log4j.Logger;
-import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
@@ -143,7 +141,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
if(q.isDurable())
{
- _messageStore.enqueueMessage(_txnContext.getStoreContext(), q.getName(), _messageId);
+ _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 25116817d7..9c8b703bb5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -336,7 +336,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
- // need to get the enqueue lock
+
incrementQueueCount();
@@ -579,7 +579,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
AMQMessage msg = entry.getMessage();
if(isDurable() && msg.isPersistent())
{
- _virtualHost.getMessageStore().dequeueMessage(storeContext, getName(), msg.getMessageId());
+ _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
}
//entry.dispose(storeContext);
@@ -853,7 +853,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ store.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
// dequeue does not decrement the refence count
entry.dequeue(storeContext);
@@ -946,7 +946,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(message.isReferenced() && message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue.getName(), message.getMessageId());
+ store.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 59b4e62974..cc22569d77 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -736,9 +736,10 @@ public class DerbyMessageStore implements MessageStore
return DriverManager.getConnection(_connectionURL);
}
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
+ AMQShortString name = queue.getName();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
@@ -782,12 +783,13 @@ public class DerbyMessageStore implements MessageStore
}
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+ AMQShortString name = queue.getName();
try
{
@@ -822,9 +824,10 @@ public class DerbyMessageStore implements MessageStore
}
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
+ AMQShortString name = queue.getName();
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index f048059b9b..b02eff957e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -126,17 +126,17 @@ public class MemoryMessageStore implements MessageStore
// Not required to do anything
}
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
// Not required to do anything
}
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
// Not required to do anything
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 3a0d865876..e15e69a414 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -138,33 +138,30 @@ public interface MessageStore
/**
* Removes the specified queue from the persistent store.
*
- * @param name The queue to remove.
- *
+ * @param queue The queue to remove.
* @throws AMQException If the operation fails for any reason.
*/
- void removeQueue(AMQShortString name) throws AMQException;
+ void removeQueue(final AMQQueue queue) throws AMQException;
/**
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to place the message on.
+ * @param queue The queue to place the message on.
* @param messageId The message to enqueue.
- *
* @throws AMQException If the operation fails for any reason.
*/
- void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to take the message from.
+ * @param queue The queue to place the message on.
* @param messageId The message to dequeue.
- *
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
/**
* Begins a transactional context.
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index dea52fea5a..792744903e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -129,17 +129,17 @@ public class SkeletonMessageStore implements MessageStore
return null;
}
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
}
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
}
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
}