diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-04 20:30:32 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-04 20:30:32 +0000 |
commit | f9c2d2e6cd6ea383c2c1c5d7c9bc96f929396e51 (patch) | |
tree | 045b413e34b91d603442a92e29a5a80f36e395a7 | |
parent | 6d07d75135c13d5a1c77d9ca108eb0d245e7ad6e (diff) | |
download | qpid-python-f9c2d2e6cd6ea383c2c1c5d7c9bc96f929396e51.tar.gz |
flattened AbstractSubscription down into QueueSubscription, removed the setting of context from the subscription public api
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564470 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 227 insertions, 398 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index ff27c8159e..20bde5f613 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -727,4 +727,6 @@ public abstract class AbstractExchange implements Exchange } + + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index ea106b0c2c..8cd8a3edee 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -70,11 +70,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa long getTotalEnqueueCount(); - public interface Context - { - QueueEntry getLastSeenEntry(); - } - void setNoLocal(boolean b); boolean isAutoDelete(); @@ -92,8 +87,6 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa final Class<? extends ServerMessage> messageClass, final String consumerName, EnumSet<Subscription.Option> options) throws AMQException; - void unregisterSubscription(final Subscription subscription) throws AMQException; - Collection<Subscription> getConsumers(); interface SubscriptionRegistrationListener diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index ff7c8ae95e..4f87e18e63 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -44,10 +44,10 @@ public abstract class OutOfOrderQueue extends SimpleAMQQueue SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator(); while(subIter.advance() && !entry.isAcquired()) { - final Subscription subscription = subIter.getNode().getSubscription(); + final QueueSubscription subscription = subIter.getNode().getSubscription(); if(!subscription.isClosed()) { - QueueContext context = (QueueContext) subscription.getQueueContext(); + QueueContext context = subscription.getQueueContext(); if(context != null) { QueueEntry released = context.getReleasedEntry(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java index 79279b44c7..861bd3dea1 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -final class QueueContext implements AMQQueue.Context +final class QueueContext { private volatile QueueEntry _lastSeenEntry; private volatile QueueEntry _releasedEntry; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java index 6a10549b16..8bab8e7b4b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSubscription.java @@ -30,32 +30,43 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.subscription.AbstractSubscription; +import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionTarget; import org.apache.qpid.server.util.StateChangeListener; import java.text.MessageFormat; import java.util.EnumMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscription +class QueueSubscription<T extends SubscriptionTarget> implements Subscription { private static final Logger _logger = Logger.getLogger(QueueSubscription.class); private final AtomicBoolean _targetClosed = new AtomicBoolean(false); private final AtomicBoolean _closed = new AtomicBoolean(false); + private final long _subscriptionID; + private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); + private final Lock _stateChangeLock = new ReentrantLock(); + private final long _createTime = System.currentTimeMillis(); + private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); + private final boolean _acquires; + private final boolean _seesRequeues; + private final String _consumerName; + private final boolean _isTransient; + private final AtomicLong _deliveredCount = new AtomicLong(0); + private final AtomicLong _deliveredBytes = new AtomicLong(0); + private final FilterManager _filters; + private final Class<? extends ServerMessage> _messageClass; + private final Object _sessionReference; private SimpleAMQQueue _queue; - private String _traceExclude; - private String _trace; private GenericActor _logActor; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - static final EnumMap<SubscriptionTarget.State, State> STATE_MAP = new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class); @@ -69,6 +80,15 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti private final T _target; private final SubFlushRunner _runner = new SubFlushRunner(this); + private volatile QueueContext _queueContext; + private StateChangeListener<? extends Subscription, State> _stateListener = new StateChangeListener<Subscription, State>() + { + public void stateChanged(Subscription sub, State oldState, State newState) + { + CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + } + }; + private boolean _noLocal; QueueSubscription(final FilterManager filters, final Class<? extends ServerMessage> messageClass, @@ -78,8 +98,14 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti final boolean isTransient, T target) { - super(filters, messageClass, target.getSessionModel().getConnectionReference(), - acquires, seesRequeues, consumerName, isTransient); + _messageClass = messageClass; + _sessionReference = target.getSessionModel().getConnectionReference(); + _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); + _filters = filters; + _acquires = acquires; + _seesRequeues = seesRequeues; + _consumerName = consumerName; + _isTransient = isTransient; _target = target; _target.setStateListener( new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>() @@ -187,12 +213,6 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti } @Override - protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException - { - _target.send(entry, batch); - } - - @Override public void flushBatched() { _target.flushBatched(); @@ -241,9 +261,6 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti } _queue = queue; - _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); - _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); - String queueString = new QueueLogSubject(_queue).toLogString(); _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) @@ -263,18 +280,6 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti } } - - protected final String getTraceExclude() - { - return _traceExclude; - } - - protected final String getTrace() - { - return _trace; - } - - protected final LogSubject getLogSubject() { return _logActor.getLogSubject(); @@ -303,4 +308,166 @@ class QueueSubscription<T extends SubscriptionTarget> extends AbstractSubscripti return _runner; } + public final long getSubscriptionID() + { + return _subscriptionID; + } + + public final StateChangeListener<? extends Subscription, State> getStateListener() + { + return _stateListener; + } + + public final void setStateListener(StateChangeListener<? extends Subscription, State> listener) + { + _stateListener = listener; + } + + final QueueContext getQueueContext() + { + return _queueContext; + } + + final void setQueueContext(QueueContext queueContext) + { + _queueContext = queueContext; + } + + protected boolean updateState(State from, State to) + { + return _state.compareAndSet(from, to); + } + + public final boolean isActive() + { + return getState() == State.ACTIVE; + } + + public final boolean isClosed() + { + return getState() == State.CLOSED; + } + + public final void setNoLocal(boolean noLocal) + { + _noLocal = noLocal; + } + + public final boolean hasInterest(QueueEntry entry) + { + //check that the message hasn't been rejected + if (entry.isRejectedBy(getSubscriptionID())) + { + + return false; + } + + if (entry.getMessage().getClass() == _messageClass) + { + if(_noLocal) + { + Object connectionRef = entry.getMessage().getConnectionReference(); + if (connectionRef != null && connectionRef == _sessionReference) + { + return false; + } + } + } + else + { + // no interest in messages we can't convert + if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), + _messageClass)==null) + { + return false; + } + } + return (_filters == null) || _filters.allAllow(entry.asFilterable()); + } + + protected String getFilterLogString() + { + StringBuilder filterLogString = new StringBuilder(); + String delimiter = ", "; + boolean hasEntries = false; + if (_filters != null && _filters.hasFilters()) + { + filterLogString.append(_filters.toString()); + hasEntries = true; + } + + if (!acquires()) + { + if (hasEntries) + { + filterLogString.append(delimiter); + } + filterLogString.append("Browser"); + hasEntries = true; + } + + return filterLogString.toString(); + } + + public final boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + public final void getSendLock() + { + _stateChangeLock.lock(); + } + + public final void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + public final long getCreateTime() + { + return _createTime; + } + + public final QueueEntry.SubscriptionAcquiredState getOwningState() + { + return _owningState; + } + + public final boolean acquires() + { + return _acquires; + } + + public final boolean seesRequeues() + { + return _seesRequeues; + } + + public final String getName() + { + return _consumerName; + } + + public final boolean isTransient() + { + return _isTransient; + } + + public final long getBytesOut() + { + return _deliveredBytes.longValue(); + } + + public final long getMessagesOut() + { + return _deliveredCount.longValue(); + } + + public final void send(final QueueEntry entry, final boolean batch) throws AMQException + { + _deliveredCount.incrementAndGet(); + _deliveredBytes.addAndGet(entry.getMessage().getSize()); + _target.send(entry, batch); + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index fa0db61591..a1df2bca1b 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -382,7 +382,7 @@ public class SimpleAMQQueue implements AMQQueue, @Override - public Subscription registerSubscription(final SubscriptionTarget target, + public QueueSubscription registerSubscription(final SubscriptionTarget target, final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, @@ -471,7 +471,7 @@ public class SimpleAMQQueue implements AMQQueue, } - public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException + synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException { if (subscription == null) { @@ -828,7 +828,7 @@ public class SimpleAMQQueue implements AMQQueue, private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext subContext = sub.getQueueContext(); if (subContext != null) { QueueEntry releasedEntry = subContext.getReleasedEntry(); @@ -844,7 +844,7 @@ public class SimpleAMQQueue implements AMQQueue, private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry) { - QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueContext subContext = sub.getQueueContext(); if(subContext != null) { QueueEntry oldEntry; @@ -1611,7 +1611,7 @@ public class SimpleAMQQueue implements AMQQueue, private QueueEntry getNextAvailableEntry(final QueueSubscription sub) throws AMQException { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext context = sub.getQueueContext(); if(context != null) { QueueEntry lastSeen = context.getLastSeenEntry(); @@ -1651,7 +1651,7 @@ public class SimpleAMQQueue implements AMQQueue, public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub) { - QueueContext context = (QueueContext) sub.getQueueContext(); + QueueContext context = sub.getQueueContext(); if(context != null) { QueueEntry releasedNode = context.getReleasedEntry(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java deleted file mode 100644 index f3c8fe4843..0000000000 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.util.StateChangeListener; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public abstract class AbstractSubscription implements Subscription -{ - private final long _subscriptionID; - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private final Lock _stateChangeLock = new ReentrantLock(); - private final long _createTime = System.currentTimeMillis(); - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final boolean _acquires; - private final boolean _seesRequeues; - private final String _consumerName; - private final boolean _isTransient; - - - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - - private final FilterManager _filters; - - private volatile AMQQueue.Context _queueContext; - - - private StateChangeListener<? extends Subscription, State> _stateListener = new StateChangeListener<Subscription, State>() - { - public void stateChanged(Subscription sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - - private final Class<? extends ServerMessage> _messageClass; - private final Object _sessionReference; - private boolean _noLocal; - - protected AbstractSubscription(FilterManager filters, - final Class<? extends ServerMessage> messageClass, - final Object sessionReference, - final boolean acquires, - final boolean seesRequeues, - final String consumerName, final boolean isTransient) - { - _messageClass = messageClass; - _sessionReference = sessionReference; - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _filters = filters; - _acquires = acquires; - _seesRequeues = seesRequeues; - _consumerName = consumerName; - _isTransient = isTransient; - } - - public final long getSubscriptionID() - { - return _subscriptionID; - } - - - public final StateChangeListener<? extends Subscription, State> getStateListener() - { - return _stateListener; - } - - public final void setStateListener(StateChangeListener<? extends Subscription, State> listener) - { - _stateListener = listener; - } - - - public final AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public final void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - - public State getState() - { - return _state.get(); - } - - protected boolean updateState(State from, State to) - { - return _state.compareAndSet(from, to); - } - - public final boolean isActive() - { - return getState() == State.ACTIVE; - } - - public final boolean isClosed() - { - return getState() == State.CLOSED; - } - - - public final void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - - public final boolean hasInterest(QueueEntry entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - - return false; - } - - if (entry.getMessage().getClass() == _messageClass) - { - if(_noLocal) - { - Object connectionRef = entry.getMessage().getConnectionReference(); - if (connectionRef != null && connectionRef == _sessionReference) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null) - { - return false; - } - } - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - - protected String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } - - if (!acquires()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - hasEntries = true; - } - - return filterLogString.toString(); - } - - - public final boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public final void getSendLock() - { - _stateChangeLock.lock(); - } - - public final void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - - public final long getCreateTime() - { - return _createTime; - } - - - public final QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - - public final boolean acquires() - { - return _acquires; - } - - public final boolean seesRequeues() - { - return _seesRequeues; - } - - public final String getName() - { - return _consumerName; - } - - public final boolean isTransient() - { - return _isTransient; - } - - - public final long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public final long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - public final void send(final QueueEntry entry, final boolean batch) throws AMQException - { - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getMessage().getSize()); - doSend(entry, batch); - } - - protected abstract void doSend(final QueueEntry entry, final boolean batch) throws AMQException; - -} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 2a1b767815..e90502f023 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -106,11 +106,6 @@ public interface Subscription public State getState(); - AMQQueue.Context getQueueContext(); - - void setQueueContext(AMQQueue.Context queueContext); - - boolean isActive(); void queueEmpty() throws AMQException; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index 35f2887562..d48b09d912 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -109,7 +109,6 @@ public class DistributedTransaction implements ServerTransaction { _branch.enqueue(queue, message); _branch.addPostTransactionAction(postTransactionAction); - enqueue(Collections.singletonList(queue), message, postTransactionAction); } else { diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index bd80daec61..db39eb80a9 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -69,7 +69,7 @@ public class SimpleAMQQueueTest extends QpidTestCase private String _routingKey = "routing key"; private DirectExchange _exchange; private MockSubscription _subscriptionTarget = new MockSubscription(); - private Subscription _subscription; + private QueueSubscription _subscription; private Map<String,Object> _arguments = null; @Override @@ -183,7 +183,7 @@ public class SimpleAMQQueueTest extends QpidTestCase { } assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull(_subscription.getQueueContext().getReleasedEntry()); // Check removing the subscription removes it's information from the queue _subscription.close(); @@ -206,7 +206,7 @@ public class SimpleAMQQueueTest extends QpidTestCase EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull("There should be no releasedEntry after an enqueue", _subscription.getQueueContext().getReleasedEntry()); } /** @@ -222,7 +222,7 @@ public class SimpleAMQQueueTest extends QpidTestCase EnumSet.noneOf(Subscription.Option.class)); Thread.sleep(150); assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull("There should be no releasedEntry after enqueues", _subscription.getQueueContext().getReleasedEntry()); } /** @@ -273,7 +273,7 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); } /** @@ -321,7 +321,7 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size()); assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); } @@ -375,7 +375,7 @@ public class SimpleAMQQueueTest extends QpidTestCase assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", _subscription.getQueueContext().getReleasedEntry()); } @@ -392,11 +392,11 @@ public class SimpleAMQQueueTest extends QpidTestCase MockSubscription target2 = new MockSubscription(); - Subscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test", + QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test", EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); - Subscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test", + QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test", EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES)); @@ -430,8 +430,8 @@ public class SimpleAMQQueueTest extends QpidTestCase assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, target1.getMessages().size() + target2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry()); - assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", subscription1.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", subscription2.getQueueContext().getReleasedEntry()); } public void testExclusiveConsumer() throws AMQException diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9fae6d6c09..3baa2f95f0 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1216,68 +1216,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return getId().compareTo(o.getId()); } - private class MessageDeliveryAction implements ServerTransaction.Action - { - private final MessageReference<AMQMessage> _reference; - private List<? extends BaseQueue> _destinationQueues; - - public MessageDeliveryAction(AMQMessage currentMessage, - List<? extends BaseQueue> destinationQueues) - { - _reference = currentMessage.newReference(); - _destinationQueues = destinationQueues; - } - - public void postCommit() - { - try - { - AMQMessage message = _reference.getMessage(); - final boolean immediate = message.isImmediate(); - - for(int i = 0; i < _destinationQueues.size(); i++) - { - BaseQueue queue = _destinationQueues.get(i); - - Action<QueueEntry> action; - - if(immediate) - { - action = new ImmediateAction(); - } - else - { - action = null; - } - - queue.enqueue(message, action); - if(queue instanceof AMQQueue) - { - ((AMQQueue)queue).checkCapacity(AMQChannel.this); - } - - } - - message.getStoredMessage().flushToStore(); - _reference.release(); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - - public void onRollback() - { - // Maybe keep track of entries that were created and then delete them here in case of failure - // to in memory enqueue - _reference.release(); - } - - - } private class ImmediateAction implements Action<QueueEntry> { diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java index 55c8407ea5..b4f59315df 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java @@ -38,13 +38,11 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.AbstractSubscription; import org.apache.qpid.server.subscription.AbstractSubscriptionTarget; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; diff --git a/java/ivy.nexus.xml b/java/ivy.nexus.xml index e301bcb0cf..55b2d2d729 100644 --- a/java/ivy.nexus.xml +++ b/java/ivy.nexus.xml @@ -39,6 +39,12 @@ <artifact name="qpid-broker" type="jar.asc" ext="jar.asc"/> <artifact name="qpid-broker" type="source" ext="jar" e:classifier="sources"/> <artifact name="qpid-broker" type="source.asc" ext="jar.asc" e:classifier="sources"/> + <artifact name="qpid-broker-core" type="pom" ext="pom"/> + <artifact name="qpid-broker-core" type="pom.asc" ext="pom.asc"/> + <artifact name="qpid-broker-core" type="jar" ext="jar"/> + <artifact name="qpid-broker-core" type="jar.asc" ext="jar.asc"/> + <artifact name="qpid-broker-core" type="source" ext="jar" e:classifier="sources"/> + <artifact name="qpid-broker-core" type="source.asc" ext="jar.asc" e:classifier="sources"/> <artifact name="qpid-broker-plugins-access-control" type="pom" ext="pom"/> <artifact name="qpid-broker-plugins-access-control" type="pom.asc" ext="pom.asc"/> <artifact name="qpid-broker-plugins-access-control" type="jar" ext="jar"/> |