summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-27 12:43:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-27 12:43:04 +0000
commit980643b9364f2ec7e75f9e4a391754f5db4bc24a (patch)
treea7f43779191f41bc9d8413460c302ff25a67ab76
parent4b1cc6b00ded3584ed2f11431845de09f195ed14 (diff)
downloadqpid-python-980643b9364f2ec7e75f9e4a391754f5db4bc24a.tar.gz
Refactoring updates (job queue changes, enqueue collections..)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@660490 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java43
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java351
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java34
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java40
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java27
-rw-r--r--java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java4
-rw-r--r--java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java8
-rw-r--r--java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java125
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java5
20 files changed, 478 insertions, 304 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 441f88b9b6..d1bea3410b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -248,8 +249,10 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
boolean routed = false;
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
+
if (e.binding.matches(headers))
{
if (_logger.isDebugEnabled())
@@ -257,10 +260,12 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
headers + " to " + e.queue.getName());
}
- payload.enqueue(e.queue);
+ queues.add(e.queue);
+
routed = true;
}
}
+ payload.enqueue(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 8d3110ef18..d07501a188 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -262,8 +262,24 @@ public class TopicExchange extends AbstractExchange
_filteredQueues.put(queue,newFilters);
}
- public Set<AMQQueue> processMessage(IncomingMessage msg, Set<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
{
+ if(queues == null)
+ {
+ if(_filteredQueues.isEmpty())
+ {
+ return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+ }
+ else
+ {
+ queues = new HashSet<AMQQueue>();
+ }
+ }
+ else if(!(queues instanceof Set))
+ {
+ queues = new HashSet<AMQQueue>(queues);
+ }
+
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
@@ -621,11 +637,11 @@ public class TopicExchange extends AbstractExchange
}
else
{
- Set<AMQQueue> queues = new HashSet<AMQQueue>();
+ Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
- ((TopicExchangeResult)result).processMessage(message, queues);
+ queues = ((TopicExchangeResult)result).processMessage(message, queues);
}
return queues;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 0a6bfb15e6..bdb16d0fcb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -444,7 +444,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public boolean channelAwaitingClosure(int channelId)
{
- return _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 05533e0d2d..0e5e7aa68c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -136,6 +136,11 @@ public class AMQMessage implements Filterable<AMQException>
}
}
+ public void clearStoreContext()
+ {
+ _storeContext = new StoreContext();
+ }
+
public StoreContext getStoreContext()
{
return _storeContext;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 780cd49834..570bd97a28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -58,15 +58,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void unregisterSubscription(final Subscription subscription) throws AMQException;
+
int getConsumerCount();
int getActiveConsumerCount();
boolean isUnused();
-
-
-
boolean isEmpty();
int getMessageCount();
@@ -80,10 +78,27 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
long getOldestMessageArrivalTime();
-
boolean isDeleted();
+ int delete() throws AMQException;
+
+
+ QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+
+ void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+
+ void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+
+
+
+ boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+
+
+
+ void addQueueDeleteTask(final Task task);
+
+
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
@@ -91,7 +106,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
QueueEntry getMessageOnTheQueue(long messageId);
-
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
StoreContext storeContext);
@@ -99,9 +113,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
- void quiesce();
- void start();
long getMaximumMessageSize();
@@ -132,27 +144,14 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
- int delete() throws AMQException;
-
-
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
-
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
-
- void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
- void deliverAsync();
-
- void addQueueDeleteTask(final Task task);
-
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
void removeExpiredIfNoSubscribers() throws AMQException;
Set<NotificationCheck> getNotificationChecks();
void flushSubscription(final Subscription sub) throws AMQException;
+ void deliverAsync(final Subscription sub);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 68b429efc6..81c8c04d6d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -32,10 +32,8 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.log4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -198,19 +196,53 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
else
{
+ int offset;
+ final int queueCount = destinationQueues.size();
+ if(queueCount == 1)
+ {
+ offset = 0;
+ }
+ else
+ {
+ offset = ((int)(message.getMessageId().longValue())) % queueCount;
+ if(offset < 0)
+ {
+ offset = -offset;
+ }
+ }
+
+ int i = 0;
for (AMQQueue q : destinationQueues)
{
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
+ if(++i > offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
}
+ i = 0;
+ if(offset != 0)
+ {
+ for (AMQQueue q : destinationQueues)
+ {
+ if(i++ < offset)
+ {
+ // Increment the references to this message for each queue delivery.
+ message.incrementReference();
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(q, message);
+ }
+ }
+ }
+
}
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
-
+ message.clearStoreContext();
return message;
}
finally
@@ -257,16 +289,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
return _messagePublishInfo.isImmediate();
}
-
- public void enqueue(final AMQQueue q) throws AMQException
- {
- if(_destinationQueues == null)
- {
- _destinationQueues = new ArrayList<AMQQueue>();
- }
- _destinationQueues.add(q);
- }
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fea1db97b3..d26d6af7b2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -42,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private final AMQMessage _message;
+ private AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -376,7 +376,7 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
return true;
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 16d24e74ee..5baf48245c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -11,6 +11,8 @@ import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
import org.apache.log4j.Logger;
@@ -21,8 +23,6 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -84,7 +84,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
- private boolean _exclusiveSubscriber;
+ private volatile Subscription _exclusiveSubscriber;
private final QueueEntryList _entries;
@@ -116,9 +116,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
+
+
+ private static final int MAX_ASYNC_DELIVERIES = 10;
+
+
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -155,7 +161,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _asyncDelivery = AsyncDeliveryConfig.getAsyncDeliveryExecutor();
+ _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+
+ AsyncDeliveryConfig.getAsyncDeliveryExecutor();
try
{
@@ -235,11 +243,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// ------ Manage Subscriptions
- public void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
+ public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
{
- if(_exclusiveSubscriber)
+ if(isExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
}
@@ -249,7 +257,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throw new ExistingSubscriptionPreventsExclusive();
}
- _exclusiveSubscriber = exclusive;
+ setExclusiveSubscriber(subscription);
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
@@ -274,7 +282,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void unregisterSubscription(final Subscription subscription) throws AMQException
+ public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
{
if(subscription == null)
{
@@ -289,9 +297,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
subscription.close();
// No longer can the queue have an exclusive consumer
- _exclusiveSubscriber = false;
+ setExclusiveSubscriber(null);
+ QueueEntry lastSeen;
+
+ while((lastSeen = subscription.getLastSeenEntry()) != null)
+ {
+ subscription.setLastSeenEntry(lastSeen, null);
+ }
+
@@ -329,83 +344,84 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_totalMessagesReceived.incrementAndGet();
- QueueEntry entry = _entries.add(message);
+ QueueEntry entry;
+ Subscription exclusiveSub = _exclusiveSubscriber;
+ if(exclusiveSub != null)
+ {
+ exclusiveSub.getSendLock();
- /*
+ try
+ {
+ entry = _entries.add(message);
+ deliverToSubscription(exclusiveSub, entry);
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- */
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if(nextNode == null)
- {
- nextNode = _subscriptionList.getHead().getNext();
- }
- while(nextNode != null)
- {
- if(_lastSubscriptionNode.compareAndSet(node, nextNode))
- {
- break;
- }
- else
- {
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
- if(nextNode == null)
+ // where there is more than one producer there's a reasonable chance that even though there is
+ // no "queueing" we do not deliver because we get an interleving of _entries.add and
+ // deliverToSubscription between threads. Therefore have one more try.
+ if(!(entry.isAcquired() || entry.isDeleted()))
{
- nextNode = _subscriptionList.getHead().getNext();
+ deliverToSubscription(exclusiveSub, entry);
}
}
+ finally
+ {
+ exclusiveSub.releaseSendLock();
+ }
}
+ else
+ {
+ entry = _entries.add(message);
+ /*
+ iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
- // always do one extra loop after we believe we've finished
- // this catches the case where we *just* miss an update
- int loops = 2;
-
- while(!entry.isAcquired() && loops != 0)
- {
+ */
+ SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
+ SubscriptionList.SubscriptionNode nextNode = node.getNext();
if(nextNode == null)
{
- loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _subscriptionList.getHead().getNext();
}
- else
+ while(nextNode != null)
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- synchronized(sub.getSendLock())
+ if(_lastSubscriptionNode.compareAndSet(node, nextNode))
{
- if(subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended()
- && sub.isActive())
+ break;
+ }
+ else
+ {
+ node = _lastSubscriptionNode.get();
+ nextNode = node.getNext();
+ if(nextNode == null)
{
- if( !sub.wouldSuspend(entry))
- {
- if(!sub.isBrowser() && !entry.acquire(sub))
- {
- sub.restoreCredit(entry);
- }
- else
- {
- QueueEntry queueEntryNode = sub.getLastSeenEntry();
- if(_entries.next(queueEntryNode) == entry)
- {
- sub.setLastSeenEntry(queueEntryNode,entry);
- }
-
- deliverMessage(sub, entry);
-
- }
- }
+ nextNode = _subscriptionList.getHead().getNext();
}
}
}
- nextNode = nextNode.getNext();
- }
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
+
+ while(!entry.isAcquired() && loops != 0)
+ {
+ if(nextNode == null)
+ {
+ loops--;
+ nextNode = _subscriptionList.getHead();
+ }
+ else
+ {
+ // if subscription at end, and active, offer
+ Subscription sub = nextNode.getSubscription();
+ deliverToSubscription(sub, entry);
+ }
+ nextNode = nextNode.getNext();
+
+ }
+ }
if(entry.immediateAndNotDelivered())
@@ -413,7 +429,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
- else if(!entry.isAcquired())
+ else if(!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -435,6 +451,42 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ throws AMQException
+ {
+ sub.getSendLock();
+ try
+ {
+ if(subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended()
+ && sub.isActive())
+ {
+ if( !sub.wouldSuspend(entry))
+ {
+ if(!sub.isBrowser() && !entry.acquire(sub))
+ {
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ QueueEntry queueEntryNode = sub.getLastSeenEntry();
+ if(_entries.next(queueEntryNode) == entry)
+ {
+ sub.setLastSeenEntry(queueEntryNode,entry);
+ }
+
+ deliverMessage(sub, entry);
+
+ }
+ }
+ }
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -588,7 +640,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
entry to resend and move back the subscription pointer. */
- synchronized(subscription.getSendLock())
+ subscription.getSendLock();
+ try
{
if(!subscription.isClosed())
{
@@ -600,6 +653,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return false;
}
}
+ finally
+ {
+ subscription.releaseSendLock();
+ }
}
@@ -703,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -722,6 +779,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _atomicQueueSize;
}
+ private boolean isExclusiveSubscriber()
+ {
+ return _exclusiveSubscriber != null;
+ }
+
+ private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ {
+ _exclusiveSubscriber = exclusiveSubscriber;
+ }
+
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -999,22 +1066,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- public void quiesce()
- {
- _quiesced.set(true);
- }
-
- public void start()
- {
- if(_quiesced.compareAndSet(true,false))
- {
- deliverAsync();
- }
- }
-
-
-
-
// ------ Management functions
@@ -1088,6 +1139,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_deleteTaskList.clear();
+ ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
return getMessageCount();
@@ -1098,13 +1150,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_stateChangeCount.incrementAndGet();
- if(_asynchronousRunner.get() == null)
- {
- _asyncDelivery.execute(new Runner());
+ Runner runner = new Runner();
+
+ if(_asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
}
}
- private class Runner implements Runnable
+ public void deliverAsync(Subscription sub)
+ {
+ _asyncDelivery.execute(new SubFlushRunner(sub));
+ }
+
+ private class Runner implements ReadWriteRunnable
{
public void run()
{
@@ -1118,21 +1177,77 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+ }
+
+
+ private class SubFlushRunner implements ReadWriteRunnable
+ {
+ private final Subscription _sub;
+
+
+ public SubFlushRunner(Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public void run()
+ {
+ boolean complete = false;
+ try
+ {
+ complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ if(!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
+
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
}
public void flushSubscription(Subscription sub) throws AMQException
{
+ flushSubscription(sub, Long.MAX_VALUE);
+ }
+
+ public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+ {
boolean atTail = false;
- while(sub.isActive() && !atTail)
+
+ while(!sub.isSuspended() && !atTail && deliveries != 0)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
if(sub.isActive())
{
-
QueueEntry node = moveSubscriptionToNextNode(sub);
-
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
@@ -1148,6 +1263,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
+ deliveries--;
deliverMessage(sub, node);
if(sub.isBrowser())
@@ -1159,8 +1275,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
}
-
-
}
}
@@ -1180,13 +1294,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
}
+
}
atTail = (_entries.next(node) == null);
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
+
+ if(!isExclusiveSubscriber())
+ {
+ advanceAllSubscriptions();
+ }
+
+ return atTail;
+ }
+
+ protected void advanceAllSubscriptions() throws AMQException
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ while(subscriberIter.advance())
+ {
+ SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
+ Subscription sub = subNode.getSubscription();
+ moveSubscriptionToNextNode(sub);
+ }
}
private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
@@ -1227,8 +1364,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean deliveryIncomplete = true;
int extraLoops = 1;
+ int deliveries = MAX_ASYNC_DELIVERIES;
+
+ _asynchronousRunner.compareAndSet(runner,null);
- while(((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+ while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
{
// we want to have one extra loop after the every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1251,17 +1391,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
Subscription sub = subscriptionIter.getNode().getSubscription();
if(sub != null)
{
- synchronized(sub.getSendLock())
+ sub.getSendLock();
+ try
{
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+
if(sub.isActive())
{
boolean advanced = false;
+ boolean subActive = false;
- QueueEntry node = moveSubscriptionToNextNode(sub);
if(!(node.isAcquired() || node.isDeleted()))
{
if(!sub.isSuspended())
{
+ subActive = true;
if(sub.hasInterest(node))
{
if(!sub.wouldSuspend(node))
@@ -1274,6 +1418,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
else
{
deliverMessage(sub, node);
+ deliveries--;
if(sub.isBrowser())
{
@@ -1309,7 +1454,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
final boolean atTail = (_entries.next(node) == null);
- done = done && atTail;
+ done = done && (!subActive || atTail);
if(atTail && !advanced && sub.isAutoClose())
{
@@ -1322,6 +1467,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ finally
+ {
+ sub.releaseSendLock();
+ }
}
if(done)
{
@@ -1346,6 +1495,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+ {
+ _asyncDelivery.execute(runner);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index acbeae0f40..537966e3aa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -28,7 +28,6 @@ import org.apache.qpid.server.queue.QueueEntry;
public interface Subscription
{
- boolean isActive();
public static enum State
@@ -75,7 +74,7 @@ public interface Subscription
boolean wouldSuspend(QueueEntry msg);
Object getSendLock();
-
+ void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
@@ -87,4 +86,9 @@ public interface Subscription
boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+
+ boolean isActive();
+
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 653c7de514..8e124c8b0c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -62,6 +65,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final RecordDeliveryMethod _recordMethod;
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final Lock _stateChangeLock;
+ private final Lock _stateChangeExclusiveLock;
+
static final class BrowserSubscription extends SubscriptionImpl
@@ -254,7 +260,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
private AMQQueue _queue;
- private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
@@ -280,7 +287,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
-
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ _stateChangeLock = readWriteLock.readLock();
+ _stateChangeExclusiveLock = readWriteLock.writeLock();
if (arguments != null)
{
@@ -334,7 +343,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean isSuspended()
{
- return !isActive() || _channel.isSuspended() || _sendLock.get();
+ return !isActive() || _channel.isSuspended() || _deleted.get();
}
/**
@@ -344,7 +353,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
*/
public void queueDeleted(AMQQueue queue)
{
- _sendLock.set(true);
+ _deleted.set(true);
// _channel.queueDeleted(queue);
}
@@ -435,7 +444,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
boolean closed = false;
State state = getState();
- synchronized (_sendLock)
+
+ _stateChangeExclusiveLock.lock();
+ try
{
while(!closed && state != State.CLOSED)
{
@@ -451,6 +462,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
_creditManager.removeListener(this);
}
+ finally
+ {
+ _stateChangeExclusiveLock.unlock();
+ }
+
if (closed)
{
@@ -481,7 +497,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public Object getSendLock()
{
- return _sendLock;
+ _stateChangeLock.lock();
+ return _deleted;
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
}
public void resend(final QueueEntry entry) throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 9924d1c770..ca614e053a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -44,6 +44,7 @@ import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
import java.util.LinkedList;
+import java.util.Collections;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -303,7 +304,7 @@ public class AMQQueueAlertTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(_queue);
+ messages[i].enqueue(Collections.singleton(_queue));
messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 98f78e3d69..bf0a8a6d90 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -48,6 +48,7 @@ import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
+import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -216,7 +217,7 @@ public class AMQQueueMBeanTest extends TestCase
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- msg.enqueue(_queue);
+ msg.enqueue(Collections.singleton(_queue));
msg.routingComplete(_messageStore, new MessageHandleFactory());
msg.addContentBodyFrame(new ContentChunk()
@@ -318,7 +319,7 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(_queue);
+ currentMessage.enqueue(Collections.singleton(_queue));
// route header
currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index f1a7d4970a..c60b9ee0cb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -464,15 +464,49 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
return false;
}
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ final int hashCode = _hashCode;
+
+ final int otherHashCode = otherString._hashCode;
+
+ if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
{
return false;
}
+ final int length = _length;
+
+ if(length != otherString._length)
+ {
+ return false;
+ }
- return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
- || Arrays.equals(getBytes(),otherString.getBytes());
+ final byte[] data = _data;
+
+ final byte[] otherData = otherString._data;
+
+ final int offset = _offset;
+
+ final int otherOffset = otherString._offset;
+
+ if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+ {
+ return Arrays.equals(data, otherData);
+ }
+ else
+ {
+ int thisIdx = offset;
+ int otherIdx = otherOffset;
+ for(int i = length; i-- != 0; )
+ {
+ if(!(data[thisIdx++] == otherData[otherIdx++]))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
index 05141aea7b..8de0f93ce9 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java
@@ -41,16 +41,16 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
private final ReentrantLock _putLock = new ReentrantLock();
- private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>();
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _readJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
- private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>();
+ private final ConcurrentLinkedQueue<ReadWriteRunnable> _writeJobQueue = new ConcurrentLinkedQueue<ReadWriteRunnable>();
private class ReadWriteJobIterator implements Iterator<Runnable>
{
private boolean _onReads;
- private Iterator<Job> _iter = _writeJobQueue.iterator();
+ private Iterator<ReadWriteRunnable> _iter = _writeJobQueue.iterator();
public boolean hasNext()
{
@@ -112,12 +112,12 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
public boolean offer(final Runnable runnable)
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -147,13 +147,13 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
public void put(final Runnable runnable) throws InterruptedException
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -185,13 +185,13 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException
{
- final Job job = (Job) runnable;
+ final ReadWriteRunnable job = (ReadWriteRunnable) runnable;
final ReentrantLock putLock = _putLock;
putLock.lock();
try
{
- if(job.isReadJob())
+ if(job.isRead())
{
_readJobQueue.offer(job);
}
@@ -240,7 +240,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
throw ie;
}
- Job job = _writeJobQueue.poll();
+ ReadWriteRunnable job = _writeJobQueue.poll();
if(job == null)
{
job = _readJobQueue.poll();
@@ -266,7 +266,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
final AtomicInteger count = _count;
long nanos = unit.toNanos(timeout);
takeLock.lockInterruptibly();
- Job job = null;
+ ReadWriteRunnable job = null;
try
{
@@ -322,7 +322,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
_takeLock.lock();
try
{
- Job job;
+ ReadWriteRunnable job;
while((job = _writeJobQueue.peek())!= null)
{
c.add(job);
@@ -356,7 +356,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
_takeLock.lock();
try
{
- Job job;
+ ReadWriteRunnable job;
while(total<=maxElements && (job = _writeJobQueue.peek())!= null)
{
c.add(job);
@@ -391,7 +391,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
{
if(_count.get() > 0)
{
- Job job = _writeJobQueue.poll();
+ ReadWriteRunnable job = _writeJobQueue.poll();
if(job == null)
{
job = _readJobQueue.poll();
@@ -417,7 +417,7 @@ public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements Blocki
takeLock.lock();
try
{
- Job job = _writeJobQueue.peek();
+ ReadWriteRunnable job = _writeJobQueue.peek();
if(job == null)
{
job = _readJobQueue.peek();
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
new file mode 100644
index 0000000000..ad04a923e1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
@@ -0,0 +1,27 @@
+package org.apache.qpid.pool;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface ReadWriteRunnable extends Runnable
+{
+ boolean isRead();
+ boolean isWrite();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index d99973cffb..ce9c6ae4cb 100644
--- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -110,7 +110,7 @@ public class ReferenceCountingExecutorService
*
* @return An executor service.
*/
- ExecutorService acquireExecutorService()
+ public ExecutorService acquireExecutorService()
{
synchronized (_lock)
{
@@ -140,7 +140,7 @@ public class ReferenceCountingExecutorService
* Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
* to zero, the executor service is shut down.
*/
- void releaseExecutorService()
+ public void releaseExecutorService()
{
synchronized (_lock)
{
diff --git a/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index 779d47be1c..0b7e300cec 100644
--- a/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -22,6 +22,8 @@ package org.apache.qpid.extras.exchanges.diagnostic;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -201,8 +203,10 @@ public class DiagnosticExchange extends AbstractExchange
headers.put(key, value);
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
-
- payload.enqueue(q);
+
+ Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(q);
+ payload.enqueue(queues);
}
diff --git a/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java b/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
deleted file mode 100644
index 5323ad28bf..0000000000
--- a/java/systests/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.mina.transport.vmpipe.support;
-
-import org.apache.mina.common.IdleStatus;
-
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * This file is a patch to override MINA, because of the IdentityHashMap bug. Workaround to be supplied in MINA 1.0.7.
- * This patched file will be removed once upgraded onto a newer MINA.
- *
- * Dectects idle sessions and fires <tt>sessionIdle</tt> events to them.
- *
- * @author The Apache Directory Project (mina-dev@directory.apache.org)
- */
-public class VmPipeIdleStatusChecker
-{
- private static final VmPipeIdleStatusChecker INSTANCE = new VmPipeIdleStatusChecker();
-
- public static VmPipeIdleStatusChecker getInstance()
- {
- return INSTANCE;
- }
-
- private final Map sessions = new HashMap(); // will use as a set
-
- private final Worker worker = new Worker();
-
- private VmPipeIdleStatusChecker()
- {
- worker.start();
- }
-
- public void addSession(VmPipeSessionImpl session)
- {
- synchronized (sessions)
- {
- sessions.put(session, session);
- }
- }
-
- private class Worker extends Thread
- {
- private Worker()
- {
- super("VmPipeIdleStatusChecker");
- setDaemon(true);
- }
-
- public void run()
- {
- for (;;)
- {
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- { }
-
- long currentTime = System.currentTimeMillis();
-
- synchronized (sessions)
- {
- Iterator it = sessions.keySet().iterator();
- while (it.hasNext())
- {
- VmPipeSessionImpl session = (VmPipeSessionImpl) it.next();
- if (!session.isConnected())
- {
- it.remove();
- }
- else
- {
- notifyIdleSession(session, currentTime);
- }
- }
- }
- }
- }
- }
-
- private void notifyIdleSession(VmPipeSessionImpl session, long currentTime)
- {
- notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE,
- Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE,
- Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
- notifyIdleSession0(session, currentTime, session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE,
- Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
- }
-
- private void notifyIdleSession0(VmPipeSessionImpl session, long currentTime, long idleTime, IdleStatus status,
- long lastIoTime)
- {
- if ((idleTime > 0) && (lastIoTime != 0) && ((currentTime - lastIoTime) >= idleTime))
- {
- session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
- }
- }
-
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index dae81a875d..bbd6deffd3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.LinkedList;
import java.util.Set;
+import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -145,7 +146,7 @@ public class AckTest extends TestCase
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
- msg.enqueue(_queue);
+ msg.enqueue(Collections.singleton(_queue));
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 7fd46474ab..4dbb550c7c 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -91,6 +91,11 @@ public class SubscriptionTestHelper implements Subscription
return new Object();
}
+ public void releaseSendLock()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void resend(final QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.