summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java134
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java8
4 files changed, 177 insertions, 77 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index ce1db7d26e..c114d2d25f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -157,7 +158,7 @@ public class AMQQueue implements Managable, Comparable
/**
* total messages received by the queue since startup.
*/
- public long _totalMessagesReceived = 0;
+ public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
{
@@ -291,59 +292,77 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * @see ManagedQueue#moveMessages
+ * moves messages from this queue to another queue. to do this the approach is following-
+ * - setup the queue for moving messages (hold the lock and stop the async delivery)
+ * - get all the messages available in the given message id range
+ * - setup the other queue for moving messages (hold the lock and stop the async delivery)
+ * - send these available messages to the other queue (enqueue in other queue)
+ * - Once sending to other Queue is successful, remove messages from this queue
+ * - remove locks from both queues and start async delivery
* @param fromMessageId
* @param toMessageId
* @param queueName
* @param storeContext
- * @throws AMQException
*/
public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext) throws AMQException
+ StoreContext storeContext)
{
+ // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- List<AMQMessage> list = getMessagesOnTheQueue();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
- for (AMQMessage message : list)
+ try
{
- long msgId = message.getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
- {
- foundMessagesList.add(message);
- }
- // break the loop as soon as messages to be removed are found
- if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ startMovingMessages();
+ List<AMQMessage> list = getMessagesOnTheQueue();
+ List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+
+ // Run this loop till you find all the messages or the list has no more messages
+ for (AMQMessage message : list)
{
- break;
+ long msgId = message.getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(message);
+ }
+ // break the loop as soon as messages to be removed are found
+ if (foundMessagesList.size() == maxMessageCountToBeMoved)
+ {
+ break;
+ }
}
- }
- // move messages to another queue
- for (AMQMessage message : foundMessagesList)
+ // move messages to another queue
+ anotherQueue.startMovingMessages();
+ anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+
+ // moving is successful, now remove from original queue
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ finally
{
- try
- {
- anotherQueue.process(storeContext, message);
- }
- catch(AMQException ex)
- {
- foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear();
- // Exception occured, so rollback the changes
- anotherQueue.removeMessages(foundMessagesList);
- throw ex;
- }
+ // remove the lock and start the async delivery
+ anotherQueue.stopMovingMessages();
+ stopMovingMessages();
}
+ }
- // moving is successful, now remove from original queue
- removeMessages(foundMessagesList);
+ public void startMovingMessages()
+ {
+ _deliveryMgr.startMovingMessages();
}
- public synchronized void removeMessages(List<AMQMessage> messageList)
+ private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
{
- _deliveryMgr.removeMessages(messageList);
+ _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
+ _totalMessagesReceived.addAndGet(messageList.size());
}
+ public void stopMovingMessages()
+ {
+ _deliveryMgr.stopMovingMessages();
+ _deliveryMgr.processAsync(_asyncDelivery);
+ }
+
/**
* @return MBean object associated with this Queue
*/
@@ -374,7 +393,7 @@ public class AMQQueue implements Managable, Comparable
public long getReceivedMessageCount()
{
- return _totalMessagesReceived;
+ return _totalMessagesReceived.get();
}
public int getMaximumMessageCount()
@@ -407,7 +426,7 @@ public class AMQQueue implements Managable, Comparable
/**
* Removes the AMQMessage from the top of the queue.
*/
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
@@ -415,7 +434,7 @@ public class AMQQueue implements Managable, Comparable
/**
* removes all the messages from the queue.
*/
- public long clearQueue(StoreContext storeContext) throws AMQException
+ public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
}
@@ -633,7 +652,7 @@ public class AMQQueue implements Managable, Comparable
protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
{
- _totalMessagesReceived++;
+ _totalMessagesReceived.incrementAndGet();
try
{
_managedObject.checkForNotification(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index ba0d3b86d2..eb7e87b923 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -395,24 +395,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");
}
-
- try
- {
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
- }
- catch(AMQException amqex)
- {
- throw new JMException("Error moving messages to " + toQueueName + ": " + amqex);
- }
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
}
-//
-// public ObjectName getObjectName() throws MalformedObjectNameException
-// {
-// String objNameString = super.getObjectName().toString();
-//
-// return new ObjectName(objNameString);
-// }
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 6c89101043..858dd5ee09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -75,7 +75,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private final AMQQueue _queue;
-
+ /**
+ * Flag used while moving messages from this queue to another. For moving messages the async delivery
+ * should also stop. This flat should be set to true to stop async delivery and set to false to enable
+ * async delivery again.
+ */
+ private AtomicBoolean _movingMessages = new AtomicBoolean();
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
* to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
@@ -167,9 +173,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- public synchronized List<AMQMessage> getMessages()
+ public List<AMQMessage> getMessages()
{
- return new ArrayList<AMQMessage>(_messages);
+ _lock.lock();
+ ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+ _lock.unlock();
+ return list;
}
public void populatePreDeliveryQueue(Subscription subscription)
@@ -242,8 +251,52 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void removeMessages(List<AMQMessage> messageList)
+ /**
+ * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
+ * so that the asyn delivery is also stopped.
+ */
+ public void startMovingMessages()
+ {
+ _lock.lock();
+ _movingMessages.set(true);
+ }
+
+ /**
+ * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
+ * so that the async delivery can start again.
+ */
+ public void stopMovingMessages()
+ {
+ _movingMessages.set(false);
+ if (_lock.isHeldByCurrentThread())
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Messages will be removed from this queue and all preDeliveryQueues
+ * @param messageList
+ */
+ public void removeMovedMessages(List<AMQMessage> messageList)
{
+ // Remove from the
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+ if (hasSubscribers)
+ {
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended() && sub.hasFilters())
+ {
+ Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
+ for (AMQMessage msg : messageList)
+ {
+ preDeliveryQueue.remove(msg);
+ }
+ }
+ }
+ }
+
for (AMQMessage msg : messageList)
{
if (_messages.remove(msg))
@@ -253,29 +306,42 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+ /**
+ * Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+ * @param storeContext
+ * @throws AMQException
+ */
+ public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
- AMQMessage msg = poll();
+ _lock.lock();
+ AMQMessage msg = getNextMessage();
if (msg != null)
{
- msg.dequeue(storeContext, _queue);
- _totalMessageSize.getAndAdd(-msg.getSize());
- }
+ // mark this message as taken and get it removed
+ msg.taken();
+ _queue.dequeue(storeContext, msg);
+ getNextMessage();
+ }
+
+ _lock.unlock();
}
- public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
+ public long clearAllMessages(StoreContext storeContext) throws AMQException
{
long count = 0;
- AMQMessage msg = poll();
+ _lock.lock();
+
+ AMQMessage msg = getNextMessage();
while (msg != null)
{
- msg.dequeue(storeContext, _queue);
+ //mark this message as taken and get it removed
+ msg.taken();
+ _queue.dequeue(storeContext, msg);
+ msg = getNextMessage();
count++;
- _totalMessageSize.set(0L);
- msg = poll();
-
}
+ _lock.unlock();
return count;
}
@@ -298,6 +364,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
//remove the already taken message
messages.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
// try the next message
message = messages.peek();
}
@@ -335,6 +402,34 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
+ * enqueues the messages in the list on the queue and all required predelivery queues
+ * @param storeContext
+ * @param movedMessageList
+ */
+ public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+ {
+ _lock.lock();
+ for (AMQMessage msg : movedMessageList)
+ {
+ addMessageToQueue(msg);
+ }
+
+ // enqueue on the pre delivery queues
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ for (AMQMessage msg : movedMessageList)
+ {
+ // Only give the message to those that want them.
+ if (sub.hasInterest(msg))
+ {
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ _lock.unlock();
+ }
+
+ /**
* Only one thread should ever execute this method concurrently, but
* it can do so while other threads invoke deliver().
*/
@@ -343,7 +438,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- while (hasSubscribers && hasQueuedMessages())
+ while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get())
{
hasSubscribers = false;
@@ -378,11 +473,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- private AMQMessage poll()
- {
- return _messages.poll();
- }
-
public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
{
if (_log.isDebugEnabled())
@@ -482,7 +572,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void run()
{
boolean running = true;
- while (running)
+ while (running && !_movingMessages.get())
{
processQueue();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index c6f00bd189..0129c3b895 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -76,7 +76,13 @@ interface DeliveryManager
long clearAllMessages(StoreContext storeContext) throws AMQException;
- void removeMessages(List<AMQMessage> messageListToRemove);
+ void startMovingMessages();
+
+ void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+
+ void stopMovingMessages();
+
+ void removeMovedMessages(List<AMQMessage> messageListToRemove);
List<AMQMessage> getMessages();