summaryrefslogtreecommitdiff
path: root/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
diff options
context:
space:
mode:
Diffstat (limited to 'broker/src/org/apache/qpid/server/queue/SubscriptionSet.java')
-rw-r--r--broker/src/org/apache/qpid/server/queue/SubscriptionSet.java183
1 files changed, 0 insertions, 183 deletions
diff --git a/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java b/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
deleted file mode 100644
index 7cc3f5f719..0000000000
--- a/broker/src/org/apache/qpid/server/queue/SubscriptionSet.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
-class SubscriptionSet implements WeightedSubscriptionManager
-{
- private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
-
- /**
- * List of registered subscribers
- */
- private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
-
- /**
- * Used to control the round robin delivery of content
- */
- private int _currentSubscriber;
-
- /**
- * Accessor for unit tests.
- */
- int getCurrentSubscriber()
- {
- return _currentSubscriber;
- }
-
- public void addSubscriber(Subscription subscription)
- {
- _subscriptions.add(subscription);
- }
-
- /**
- * Remove the subscription, returning it if it was found
- * @param subscription
- * @return null if no match was found
- */
- public Subscription removeSubscriber(Subscription subscription)
- {
- boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
- if (isRemoved)
- {
- return subscription;
- }
- else
- {
- debugDumpSubscription(subscription);
- return null;
- }
- }
-
- private void debugDumpSubscription(Subscription subscription)
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Subscription " + subscription + " not found. Dumping subscriptions:");
- for (Subscription s : _subscriptions)
- {
- _log.debug("Subscription: " + s);
- }
- _log.debug("Subscription dump complete");
- }
- }
-
- /**
- * Return the next unsuspended subscription or null if not found.
- *
- * Performance note:
- * This method can scan all items twice when looking for a subscription that is not
- * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
- * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
- * race conditions and when subscriptions are removed between calls to nextSubscriber, the
- * IndexOutOfBoundsException also causes the scan to start at the beginning.
- */
- public Subscription nextSubscriber(AMQMessage msg)
- {
- if (_subscriptions.isEmpty())
- {
- return null;
- }
-
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
- _currentSubscriber = 0;
- return nextSubscriber();
- } else {
- return result;
- }
- } catch (IndexOutOfBoundsException e) {
- _currentSubscriber = 0;
- return nextSubscriber();
- }
- }
-
- private Subscription nextSubscriber()
- {
- final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
- Subscription subscription = iterator.next();
- ++_currentSubscriber;
- subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
- }
- }
- return null;
- }
-
- /**
- * Overridden in test classes.
- */
- protected void subscriberScanned()
- {
- }
-
- public boolean isEmpty()
- {
- return _subscriptions.isEmpty();
- }
-
- public boolean hasActiveSubscribers()
- {
- for (Subscription s : _subscriptions)
- {
- if (!s.isSuspended()) return true;
- }
- return false;
- }
-
- public int getWeight()
- {
- int count = 0;
- for (Subscription s : _subscriptions)
- {
- if (!s.isSuspended()) count++;
- }
- return count;
- }
-
- /**
- * Notification that a queue has been deleted. This is called so that the subscription can inform the
- * channel, which in turn can update its list of unacknowledged messages.
- * @param queue
- */
- public void queueDeleted(AMQQueue queue)
- {
- for (Subscription s : _subscriptions)
- {
- s.queueDeleted(queue);
- }
- }
-
- int size() {
- return _subscriptions.size();
- }
-}