summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-07 16:57:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-07 16:57:49 +0000
commit65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1 (patch)
tree427d5ce851b9336fea70eb8fc6f87135ad239065 /qpid/java/broker-plugins
parent3ab4f9bdc9bbc8375534f45022a02257eb6e030d (diff)
downloadqpid-python-65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1.tar.gz
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565726 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java580
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java31
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java26
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java12
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java95
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java104
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java944
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java397
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java32
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java552
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java76
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java29
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java68
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java109
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java858
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java37
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java37
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java69
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java94
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java171
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java30
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java96
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java25
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (renamed from qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java)288
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java3
-rwxr-xr-xqpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java59
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java106
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java14
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java114
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java25
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java7
47 files changed, 2109 insertions, 3070 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
new file mode 100644
index 0000000000..6ad9de22cb
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -0,0 +1,580 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.*;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private static final Option[] BATCHED = new Option[] { Option.BATCH };
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+ private final String _name;
+
+
+ private FlowCreditManager_0_10 _creditManager;
+
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private MessageFlowMode _flowMode;
+ private final ServerSession _session;
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
+ private final Map<String, Object> _arguments;
+ private int _deferredMessageCredit;
+ private long _deferredSizeCredit;
+ private Consumer _consumer;
+
+
+ public ConsumerTarget_0_10(ServerSession session,
+ String name,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ Map<String, Object> arguments)
+ {
+ super(State.SUSPENDED);
+ _session = session;
+ _postIdSettingAction = new AddMessageDispositionListenerAction(session);
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _flowMode = flowMode;
+ _creditManager.addStateListener(this);
+ _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
+ Collections.<String, Object> unmodifiableMap(arguments);
+ _name = name;
+ }
+
+ public Consumer getConsumer()
+ {
+ return _consumer;
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
+ }
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getConsumer().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ getConsumer().releaseSendLock();
+ }
+
+ return closed;
+
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+
+ public static class AddMessageDispositionListenerAction implements Runnable
+ {
+ private MessageTransfer _xfr;
+ private ServerSession.MessageDispositionChangeListener _action;
+ private ServerSession _session;
+
+ public AddMessageDispositionListenerAction(ServerSession session)
+ {
+ _session = session;
+ }
+
+ public void setXfr(MessageTransfer xfr)
+ {
+ _xfr = xfr;
+ }
+
+ public void setAction(ServerSession.MessageDispositionChangeListener action)
+ {
+ _action = action;
+ }
+
+ public void run()
+ {
+ if(_action != null)
+ {
+ _session.onMessageDispositionChange(_xfr, _action);
+ }
+ }
+ }
+
+ private final AddMessageDispositionListenerAction _postIdSettingAction;
+
+ public void send(final MessageInstance entry, boolean batch) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransfer xfr;
+
+ DeliveryProperties deliveryProps;
+ MessageProperties messageProps = null;
+
+ MessageTransferMessage msg;
+
+ if(serverMsg instanceof MessageTransferMessage)
+ {
+
+ msg = (MessageTransferMessage) serverMsg;
+
+ }
+ else
+ {
+ MessageConverter converter =
+ MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
+
+ msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost());
+ }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
+
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+ if(origDeliveryProps.hasTimestamp())
+ {
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+ }
+ if(origDeliveryProps.hasTtl())
+ {
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
+ }
+
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
+
+
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ deferredAddCredit(1, entry.getMessage().getSize());
+ }
+ });
+ }
+
+
+ _postIdSettingAction.setXfr(xfr);
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ }
+ else
+ {
+ _postIdSettingAction.setAction(null);
+ }
+
+
+ _session.sendMessage(xfr, _postIdSettingAction);
+ entry.incrementDeliveryCount();
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ forceDequeue(entry, false);
+ }
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
+
+ }
+
+ void recordUnacknowledged(MessageInstance entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getMessage().getSize());
+ }
+
+ private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
+ {
+ _deferredMessageCredit += deferredMessageCredit;
+ _deferredSizeCredit += deferredSizeCredit;
+
+ }
+
+ public void flushCreditState(boolean strict)
+ {
+ if(strict || !isSuspended() || _deferredMessageCredit >= 200
+ || !(_creditManager instanceof WindowCreditManager)
+ || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
+ {
+ _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
+ _deferredMessageCredit = 0;
+ _deferredSizeCredit = 0l;
+ }
+ }
+
+ private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
+ {
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
+ dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if (restoreCredit)
+ {
+ restoreCredit(entry.getMessage());
+ }
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ void reject(final MessageInstance entry)
+ {
+ entry.setRedelivered();
+ entry.routeToAlternate(null, null);
+ if(entry.isAcquiredBy(getConsumer()))
+ {
+ entry.delete();
+ }
+ }
+
+ void release(final MessageInstance entry, final boolean setRedelivered)
+ {
+ if (setRedelivered)
+ {
+ entry.setRedelivered();
+ }
+
+ if (getSessionModel().isClosing() || !setRedelivered)
+ {
+ entry.decrementDeliveryCount();
+ }
+
+ if (isMaxDeliveryLimitReached(entry))
+ {
+ sendToDLQOrDiscard(entry);
+ }
+ else
+ {
+ entry.release();
+ }
+ }
+
+ protected void sendToDLQOrDiscard(MessageInstance entry)
+ {
+ final LogActor logActor = CurrentActor.get();
+ final ServerMessage msg = entry.getMessage();
+
+ int requeues = entry.routeToAlternate(new Action<MessageInstance>()
+ {
+ @Override
+ public void performAction(final MessageInstance requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getOwningResource().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
+ {
+ TransactionLogResource owningResource = entry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
+ {
+ final AMQQueue queue = (AMQQueue)owningResource;
+ final Exchange alternateExchange = queue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
+ }
+ else
+ {
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
+ }
+ }
+ }
+ }
+
+ private boolean isMaxDeliveryLimitReached(MessageInstance entry)
+ {
+ final int maxDeliveryLimit = entry.getMaximumDeliveryCount();
+ return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
+ }
+
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean allocateCredit(ServerMessage message)
+ {
+ return _creditManager.useCreditForMessage(message.getSize());
+ }
+
+ public void restoreCredit(ServerMessage message)
+ {
+ _creditManager.restoreCredit(1, message.getSize());
+ }
+
+ public FlowCreditManager_0_10 getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public void stop()
+ {
+ try
+ {
+ getConsumer().getSendLock();
+
+ updateState(State.ACTIVE, State.SUSPENDED);
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+ finally
+ {
+ getConsumer().releaseSendLock();
+ }
+ }
+
+ public void addCredit(MessageCreditUnit unit, long value)
+ {
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+
+ switch (unit)
+ {
+ case MESSAGE:
+
+ creditManager.addCredit(value, 0L);
+ break;
+ case BYTE:
+ creditManager.addCredit(0l, value);
+ break;
+ }
+
+ _stopped.set(false);
+
+ if(creditManager.hasCredit())
+ {
+ updateState(State.SUSPENDED, State.ACTIVE);
+ }
+
+ }
+
+ public void setFlowMode(MessageFlowMode flowMode)
+ {
+
+
+ _creditManager.removeListener(this);
+
+ switch(flowMode)
+ {
+ case CREDIT:
+ _creditManager = new CreditCreditManager(0l,0l);
+ break;
+ case WINDOW:
+ _creditManager = new WindowCreditManager(0l,0l);
+ break;
+ default:
+ throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ }
+ _flowMode = flowMode;
+ updateState(State.ACTIVE, State.SUSPENDED);
+
+ _creditManager.addStateListener(this);
+
+ }
+
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+
+ public void acknowledge(MessageInstance entry)
+ {
+ // TODO Fix Store Context / cleanup
+ if(entry.isAcquiredBy(getConsumer()))
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
+ entry.delete();
+ }
+ }
+
+ public void flush() throws AMQException
+ {
+ flushCreditState(true);
+ getConsumer().flush();
+ stop();
+ }
+
+ public ServerSession getSessionModel()
+ {
+ return _session;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return _arguments;
+ }
+
+ public void queueEmpty()
+ {
+ }
+
+ public void flushBatched()
+ {
+ _session.getConnection().flush();
+ }
+
+
+ @Override
+ public void consumerAdded(final Consumer sub)
+ {
+ _consumer = sub;
+ }
+
+ @Override
+ public void consumerRemoved(final Consumer sub)
+ {
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4b38b8a1a3..4420709a91 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private final ConsumerTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.getSessionModel().acknowledge(subscription, _entry);
+ _target.getSessionModel().acknowledge(_target, _entry);
}
else
{
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- return _entry.acquire(getSubscription());
+ return _entry.acquire(_target.getConsumer());
}
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
index ce0155b789..c459364dbb 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
@@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private ConsumerTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onReject()
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
- boolean acquired = _entry.acquire(getSubscription());
+ boolean acquired = _entry.acquire(_target.getConsumer());
if(acquired)
{
- getSubscription().recordUnacknowledged(_entry);
+ _target.recordUnacknowledged(_entry);
}
return acquired;
}
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index f5f2a8d43f..cd1146ac0b 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -21,17 +21,17 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final Subscription_0_10 _sub;
- private final QueueEntry _entry;
+ private final ConsumerTarget_0_10 _sub;
+ private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -44,9 +44,9 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry);
+ _sub.restoreCredit(_entry.getMessage());
}
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_sub.getConsumer()))
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 2e74621814..687331e51d 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index a15fea1200..c85a415ce5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
+ final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 93d886687c..53022c333e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -104,14 +106,7 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry entry)
- {
- entry.getQueue().checkCapacity(ServerSession.this);
- }
- };
+ private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
public static interface MessageDispositionChangeListener
{
@@ -126,12 +121,6 @@ public class ServerSession extends Session
}
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
@@ -142,9 +131,9 @@ public class ServerSession extends Session
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -194,7 +183,7 @@ public class ServerSession extends Session
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
- final Exchange exchange)
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -386,9 +375,9 @@ public class ServerSession extends Session
}
_messageDispositionListenerMap.clear();
- for (Task task : _taskList)
+ for (Action<ServerSession> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,9 +394,9 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -426,42 +415,26 @@ public class ServerSession extends Session
});
}
- public Collection<Subscription_0_10> getSubscriptions()
+ public Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, Subscription_0_10 sub)
+ public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public Subscription_0_10 getSubscription(String destination)
+ public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(Subscription_0_10 sub)
+ public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
- try
- {
- sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
+
}
public boolean isTransactional()
@@ -638,12 +611,12 @@ public class ServerSession extends Session
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Task task)
+ public void addSessionCloseTask(Action<ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeSessionCloseTask(Action<ServerSession> task)
{
_taskList.remove(task);
}
@@ -829,8 +802,8 @@ public class ServerSession extends Session
void unregisterSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -838,8 +811,8 @@ public class ServerSession extends Session
void stopSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -848,8 +821,8 @@ public class ServerSession extends Session
public void receivedComplete()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
@@ -955,4 +928,16 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ {
+ @Override
+ public void performAction(final MessageInstance<C> entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index dcca696529..9a90b74656 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
@@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -45,6 +48,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -193,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate
String queueName = method.getQueue();
VirtualHost vhost = getVirtualHost(session);
- final AMQQueue queue = vhost.getQueue(queueName);
+ final MessageSource queue = vhost.getMessageSource(queueName);
if(queue == null)
{
@@ -214,9 +219,9 @@ public class ServerSessionDelegate extends SessionDelegate
ServerSession s = (ServerSession) session;
queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getExclusiveOwningSession() == session)
{
@@ -228,9 +233,9 @@ public class ServerSessionDelegate extends SessionDelegate
if(queue.getAuthorizationHolder() == null)
{
queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getAuthorizationHolder() == session)
{
@@ -254,25 +259,42 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ method.getArguments()
+ );
- ((ServerSession)session).register(destination, sub);
+ ((ServerSession)session).register(destination, target);
try
{
- queue.registerSubscription(sub, method.getExclusive());
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ }
+ if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ if(method.getExclusive())
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
+ Consumer sub =
+ queue.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
- catch (AMQQueue.ExistingExclusiveSubscription existing)
+ catch (AMQQueue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
@@ -288,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -307,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- final Exchange exchangeInUse;
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
@@ -385,7 +406,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -393,12 +414,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = sub.getQueue();
((ServerSession)session).unregister(sub);
- if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
- {
- queue.setAuthorizationHolder(null);
- }
}
}
@@ -407,7 +423,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -814,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate
return getVirtualHost(session).getExchange(exchangeName);
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
{
VirtualHost virtualHost = getVirtualHost(ssn);
- Exchange exchange;
+ MessageDestination destination;
if(xfr.hasDestination())
{
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
+ destination = virtualHost.getMessageDestination(xfr.getDestination());
+ if(destination == null)
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
- return exchange;
+ return destination;
}
private VirtualHost getVirtualHost(Session session)
@@ -1249,9 +1265,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (autoDelete && exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
try
{
@@ -1265,9 +1281,9 @@ public class ServerSessionDelegate extends SessionDelegate
};
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(deleteQueueTask);
}
@@ -1276,9 +1292,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
+ final Action<ServerSession> removeExclusive = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
q.setAuthorizationHolder(null);
q.setExclusiveOwningSession(null);
@@ -1287,9 +1303,9 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerSession s = (ServerSession) session;
q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(removeExclusive);
}
@@ -1461,7 +1477,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1478,7 +1494,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = stop.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1496,7 +1512,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = flow.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
deleted file mode 100644
index 357b565365..0000000000
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ /dev/null
@@ -1,944 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.messages.ChannelMessages;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
-
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
-{
- private final long _subscriptionID;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private static final Option[] BATCHED = new Option[] { Option.BATCH };
-
- private final Lock _stateChangeLock = new ReentrantLock();
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
- private FlowCreditManager_0_10 _creditManager;
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
- CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
- }
- };
- private AMQQueue _queue;
- private final String _destination;
- private boolean _noLocal;
- private final FilterManager _filters;
- private final MessageAcceptMode _acceptMode;
- private final MessageAcquireMode _acquireMode;
- private MessageFlowMode _flowMode;
- private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
-
- private LogActor _logActor;
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private String _traceExclude;
- private String _trace;
- private final long _createTime = System.currentTimeMillis();
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private final Map<String, Object> _arguments;
- private int _deferredMessageCredit;
- private long _deferredSizeCredit;
-
-
- public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments)
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _session = session;
- _postIdSettingAction = new AddMessageDispositionListenerAction(session);
- _destination = destination;
- _acceptMode = acceptMode;
- _acquireMode = acquireMode;
- _creditManager = creditManager;
- _flowMode = flowMode;
- _filters = filters;
- _creditManager.addStateListener(this);
- _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
- Collections.<String, Object> unmodifiableMap(arguments);
- _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
-
- }
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
- _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
- String filterLogString = null;
-
- _logActor = GenericActor.getInstance(this);
- if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- filterLogString = getFilterLogString();
- CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
- filterLogString.length() > 0));
- }
- }
-
- public String getConsumerName()
- {
- return _destination;
- }
-
- public boolean isSuspended()
- {
- return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
-
-
-
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
-
- return false;
- }
-
- if (entry.getMessage() instanceof MessageTransferMessage)
- {
- if(_noLocal)
- {
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
- {
- return false;
- }
- }
- }
- else
- {
- // no interest in messages we can't convert
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null)
- {
- return false;
- }
- }
-
-
- return checkFilters(entry);
-
-
- }
-
- private boolean checkFilters(QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
- public boolean isBrowser()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean seesRequeues()
- {
- return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- finally
- {
- _stateChangeLock.unlock();
- }
-
-
-
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- }
-
-
- public static class AddMessageDispositionListenerAction implements Runnable
- {
- private MessageTransfer _xfr;
- private ServerSession.MessageDispositionChangeListener _action;
- private ServerSession _session;
-
- public AddMessageDispositionListenerAction(ServerSession session)
- {
- _session = session;
- }
-
- public void setXfr(MessageTransfer xfr)
- {
- _xfr = xfr;
- }
-
- public void setAction(ServerSession.MessageDispositionChangeListener action)
- {
- _action = action;
- }
-
- public void run()
- {
- if(_action != null)
- {
- _session.onMessageDispositionChange(_xfr, _action);
- }
- }
- }
-
- private final AddMessageDispositionListenerAction _postIdSettingAction;
-
- public void send(final QueueEntry entry, boolean batch) throws AMQException
- {
- ServerMessage serverMsg = entry.getMessage();
-
-
- MessageTransfer xfr;
-
- DeliveryProperties deliveryProps;
- MessageProperties messageProps = null;
-
- MessageTransferMessage msg;
-
- if(serverMsg instanceof MessageTransferMessage)
- {
-
- msg = (MessageTransferMessage) serverMsg;
-
- }
- else
- {
- MessageConverter converter =
- MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
-
-
- msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
- }
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
-
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
-
- boolean excludeDueToFederation = false;
-
- if(_trace != null)
- {
- if(!messageProps.hasApplicationHeaders())
- {
- messageProps.setApplicationHeaders(new HashMap<String,Object>());
- }
- Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
- String trace = (String) appHeaders.get("x-qpid.trace");
- if(trace == null)
- {
- trace = _trace;
- }
- else
- {
- if(_traceExclude != null)
- {
- excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
- }
- trace+=","+_trace;
- }
- appHeaders.put("x-qpid.trace",trace);
- }
-
- if(!excludeDueToFederation)
- {
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
- }
- else if(_flowMode == MessageFlowMode.WINDOW)
- {
- xfr.setCompletionListener(new Method.CompletionListener()
- {
- public void onComplete(Method method)
- {
- deferredAddCredit(1, entry.getSize());
- }
- });
- }
-
-
- _postIdSettingAction.setXfr(xfr);
- if(_acceptMode == MessageAcceptMode.EXPLICIT)
- {
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
- }
- else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
- {
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
- }
- else
- {
- _postIdSettingAction.setAction(null);
- }
-
-
- _session.sendMessage(xfr, _postIdSettingAction);
- entry.incrementDeliveryCount();
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(entry.getSize());
- if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- forceDequeue(entry, false);
- }
- else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
- {
- recordUnacknowledged(entry);
- }
- }
- else
- {
- forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
-
- }
- }
-
- void recordUnacknowledged(QueueEntry entry)
- {
- _unacknowledgedCount.incrementAndGet();
- _unacknowledgedBytes.addAndGet(entry.getSize());
- }
-
- private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
- {
- _deferredMessageCredit += deferredMessageCredit;
- _deferredSizeCredit += deferredSizeCredit;
-
- }
-
- public void flushCreditState(boolean strict)
- {
- if(strict || !isSuspended() || _deferredMessageCredit >= 200
- || !(_creditManager instanceof WindowCreditManager)
- || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 )
- {
- _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit);
- _deferredMessageCredit = 0;
- _deferredSizeCredit = 0l;
- }
- }
-
- private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
- {
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- if (restoreCredit)
- {
- restoreCredit(entry);
- }
- entry.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
- void reject(final QueueEntry entry)
- {
- entry.setRedelivered();
- entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(this))
- {
- entry.delete();
- }
- }
-
- void release(final QueueEntry entry, final boolean setRedelivered)
- {
- if (setRedelivered)
- {
- entry.setRedelivered();
- }
-
- if (getSessionModel().isClosing() || !setRedelivered)
- {
- entry.decrementDeliveryCount();
- }
-
- if (isMaxDeliveryLimitReached(entry))
- {
- sendToDLQOrDiscard(entry);
- }
- else
- {
- entry.release();
- }
- }
-
- protected void sendToDLQOrDiscard(QueueEntry entry)
- {
- final LogActor logActor = CurrentActor.get();
- final ServerMessage msg = entry.getMessage();
-
- int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry requeueEntry)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
- }
- }, null);
-
- if (requeues == 0)
- {
- final AMQQueue queue = entry.getQueue();
- final Exchange alternateExchange = queue.getAlternateExchange();
-
- if(alternateExchange != null)
- {
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
- }
- else
- {
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
- queue.getName(),
- msg.getRoutingKey()));
- }
- }
- }
-
- private boolean isMaxDeliveryLimitReached(QueueEntry entry)
- {
- final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
- return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean wouldSuspend(QueueEntry entry)
- {
- return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void restoreCredit(QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void onDequeue(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void releaseQueueEntry(QueueEntry queueEntry)
- {
- // no-op for 0-10, credit restored by completing command.
- }
-
- public void setStateListener(StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public FlowCreditManager_0_10 getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void stop()
- {
- try
- {
- getSendLock();
-
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
- }
- finally
- {
- releaseSendLock();
- }
- }
-
- public void addCredit(MessageCreditUnit unit, long value)
- {
- FlowCreditManager_0_10 creditManager = getCreditManager();
-
- switch (unit)
- {
- case MESSAGE:
-
- creditManager.addCredit(value, 0L);
- break;
- case BYTE:
- creditManager.addCredit(0l, value);
- break;
- }
-
- _stopped.set(false);
-
- if(creditManager.hasCredit())
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- }
-
- }
-
- public void setFlowMode(MessageFlowMode flowMode)
- {
-
-
- _creditManager.removeListener(this);
-
- switch(flowMode)
- {
- case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
- break;
- case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
- break;
- default:
- throw new RuntimeException("Unknown message flow mode: " + flowMode);
- }
- _flowMode = flowMode;
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
-
- _creditManager.addStateListener(this);
-
- }
-
- public boolean isStopped()
- {
- return _stopped.get();
- }
-
- public boolean acquires()
- {
- return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
- }
-
- public void acknowledge(QueueEntry entry)
- {
- // TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(this))
- {
- _unacknowledgedBytes.addAndGet(-entry.getSize());
- _unacknowledgedCount.decrementAndGet();
- entry.delete();
- }
- }
-
- public void flush() throws AMQException
- {
- flushCreditState(true);
- _queue.flushSubscription(this);
- stop();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public ServerSession getSessionModel()
- {
- return _session;
- }
-
- public boolean isBrowsing()
- {
- return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
-
- public boolean isExplicitAcknowledge()
- {
- return _acceptMode == MessageAcceptMode.EXPLICIT;
- }
-
- public String getCreditMode()
- {
- return _flowMode.toString();
- }
-
- public String getName()
- {
- return _destination;
- }
-
- public Map<String, Object> getArguments()
- {
- return _arguments;
- }
-
- public boolean isSessionTransactional()
- {
- return _session.isTransactional();
- }
-
- public void queueEmpty()
- {
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public String toLogString()
- {
- String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
- _queue.getName());
- String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
- // queueString is "vh(/{0})/qu({1}) " so need to trim
- + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
- return result;
- }
-
- private String getFilterLogString()
- {
- StringBuilder filterLogString = new StringBuilder();
- String delimiter = ", ";
- boolean hasEntries = false;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString.append(_filters.toString());
- hasEntries = true;
- }
-
- if (isBrowser())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Browser");
- hasEntries = true;
- }
-
- if (isDurable())
- {
- if (hasEntries)
- {
- filterLogString.append(delimiter);
- }
- filterLogString.append("Durable");
- hasEntries = true;
- }
-
- return filterLogString.toString();
- }
-
- public LogSubject getLogSubject()
- {
- return (LogSubject) this;
- }
-
-
- public void flushBatched()
- {
- _session.getConnection().flush();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e139887284..aa465d373f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -21,19 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -66,25 +56,28 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -122,7 +115,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+ private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
private final MessageStore _messageStore;
@@ -155,7 +148,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private volatile boolean _rollingBack;
private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
- private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
private long _createTime = System.currentTimeMillis();
@@ -266,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return _channelId;
}
- public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+ public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -275,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
throw new AMQSecurityException("Permission denied: " + e.getName());
}
_currentMessage = new IncomingMessage(info);
- _currentMessage.setExchange(e);
+ _currentMessage.setMessageDestination(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -360,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
immediate ? _immediateAction : _capacityCheckAction);
if(enqueues == 0)
{
@@ -497,62 +490,89 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public Subscription getSubscription(AMQShortString subscription)
+ public Consumer getSubscription(AMQShortString tag)
{
- return _tag2SubscriptionMap.get(subscription);
+ final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getConsumer();
}
/**
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
+ *
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
+ * @param source the queue to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ FieldTable filters, boolean exclusive) throws AMQException
{
if (tag == null)
{
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_tag2SubscriptionMap.containsKey(tag))
+ if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- Subscription subscription =
- SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+
+ if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ {
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ }
+ else if(acks)
+ {
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ if(exclusive)
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _tag2SubscriptionMap.put(tag, subscription);
+ _tag2SubscriptionTargetMap.put(tag, target);
try
{
- queue.registerSubscription(subscription, exclusive);
+ Consumer sub =
+ source.addConsumer(target,
+ FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
}
catch (AMQException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
catch (RuntimeException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
return tag;
@@ -567,18 +587,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Consumer sub = target == null ? null : target.getConsumer();
if (sub != null)
{
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
return true;
}
else
@@ -633,7 +646,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
if (_logger.isInfoEnabled())
{
- if (!_tag2SubscriptionMap.isEmpty())
+ if (!_tag2SubscriptionTargetMap.isEmpty())
{
_logger.info("Unsubscribing all consumers on channel " + toString());
}
@@ -643,28 +656,21 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+ for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue();
+ Consumer sub = me.getValue().getConsumer();
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+
+ sub.close();
}
- _tag2SubscriptionMap.clear();
+ _tag2SubscriptionTargetMap.clear();
}
/**
@@ -673,24 +679,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param subscription The consumer that is to acknowledge this message.
+ * @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
{
if (_logger.isDebugEnabled())
{
- if (entry.getQueue() == null)
- {
- _logger.debug("Adding unacked message with a null queue:" + entry);
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + subscription);
- }
- }
+ + ") for " + consumer + " on " + entry.getOwningResource().getName());
+
}
_unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -713,7 +710,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
if (!messagesToBeDelivered.isEmpty())
{
@@ -724,21 +721,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- for (QueueEntry unacked : messagesToBeDelivered)
+ for (MessageInstance unacked : messagesToBeDelivered)
{
- if (!unacked.isQueueDeleted())
- {
- // Mark message redelivered
- unacked.setRedelivered();
-
- // Ensure message is released for redelivery
- unacked.release();
+ // Mark message redelivered
+ unacked.setRedelivered();
- }
- else
- {
- unacked.delete();
- }
+ // Ensure message is released for redelivery
+ unacked.release();
}
}
@@ -752,7 +741,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
public void requeue(long deliveryTag) throws AMQException
{
- QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
@@ -760,20 +749,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
unacked.setRedelivered();
// Ensure message is released for redelivery
- if (!unacked.isQueueDeleted())
- {
-
- // Ensure message is released for redelivery
- unacked.release();
-
- }
- else
- {
- _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ unacked.release();
- unacked.delete();
- }
}
else
{
@@ -786,10 +763,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
return maximumDeliveryCount > 0;
}
@@ -798,10 +775,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean isDeliveredTooManyTimes(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
final int numDeliveries = queueEntry.getDeliveryCount();
return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
}
@@ -812,16 +789,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @param requeue Are the messages to be requeued or dropped.
- *
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue) throws AMQException
+ public void resend() throws AMQException
{
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
if (_logger.isDebugEnabled())
{
@@ -833,9 +808,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
msgToRequeue,
- msgToResend,
- requeue,
- _messageStore));
+ msgToResend
+ ));
// Process Messages to Resend
@@ -851,39 +825,20 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
message.decrementDeliveryCount();
- AMQQueue queue = message.getQueue();
-
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
message.setRedelivered();
- Subscription sub = message.getDeliveredSubscription();
-
- if (sub != null)
- {
-
- if(!queue.resend(message,sub))
- {
- msgToRequeue.put(deliveryTag, message);
- }
- }
- else
+ if (!message.resend())
{
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
- + ")to prevent loss");
- }
- // move this message to requeue
msgToRequeue.put(deliveryTag, message);
}
} // for all messages
@@ -898,9 +853,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
// Process Messages to Requeue at the front of the queue
- for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
@@ -926,11 +881,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+ Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
}
- private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+ private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
{
return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -976,9 +931,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getConsumer().externalStateChange();
}
}
@@ -992,15 +947,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if (!wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSendLock();
+ s.getConsumer().getSendLock();
}
finally
{
- s.releaseSendLock();
+ s.getConsumer().releaseSendLock();
}
}
}
@@ -1077,10 +1032,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSendLock();
- sub.releaseSendLock();
+ sub.getConsumer().getSendLock();
+ sub.getConsumer().releaseSendLock();
}
try
@@ -1098,16 +1053,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
postRollbackTask.run();
- for(QueueEntry entry : _resendList)
+ for(MessageInstance entry : _resendList)
{
- Subscription sub = entry.getDeliveredSubscription();
+ Consumer sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
}
else
{
- sub.getQueue().resend(entry, sub);
+ entry.resend();
}
}
_resendList.clear();
@@ -1115,9 +1070,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(requiresSuspend)
{
_suspended.set(false);
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getQueue().deliverAsync(sub);
+ sub.getConsumer().externalStateChange();
}
}
@@ -1173,7 +1128,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1234,78 +1189,96 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
{
public ImmediateAction()
{
}
- public void onEnqueue(QueueEntry entry)
+ public void performAction(MessageInstance<C> entry)
{
- AMQQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
- {
- @Override
- public void postCommit()
+ MessageReference ref = message.newReference();
+ try
+ {
+ entry.delete();
+ txn.dequeue(queue, message,
+ new ServerTransaction.Action()
{
- try
+ @Override
+ public void postCommit()
{
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
+
+ @Override
+ public void onRollback()
{
- throw new RuntimeException(e);
+
}
- super.postCommit();
}
- }
- );
- txn.commit();
+ );
+ txn.commit();
+ }
+ finally
+ {
+ ref.release();
+ }
}
else
{
- queue.checkCapacity(AMQChannel.this);
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
}
- private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final MessageInstance<C> entry)
{
- AMQQueue queue = entry.getQueue();
- queue.checkCapacity(AMQChannel.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
private class MessageAcknowledgeAction implements ServerTransaction.Action
{
- private final Collection<QueueEntry> _ackedMessages;
+ private final Collection<MessageInstance> _ackedMessages;
- public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+ public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
{
_ackedMessages = ackedMessages;
}
@@ -1314,7 +1287,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- for(QueueEntry entry : _ackedMessages)
+ for(MessageInstance entry : _ackedMessages)
{
entry.delete();
}
@@ -1337,10 +1310,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- for(QueueEntry entry : _ackedMessages)
- {
- entry.release();
- }
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.release();
+ }
}
finally
{
@@ -1505,7 +1478,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+ final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
@@ -1514,36 +1487,42 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
+ final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
- int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void onEnqueue(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
if(requeues == 0)
{
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- if (altExchange == null)
+ final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ final AMQQueue queue = (AMQQueue) owningResource;
- }
- else
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- _actor.message(_logSubject,
- ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ final Exchange altExchange = queue.getAlternateExchange();
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
@@ -1604,6 +1583,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
@Override
public int getConsumerCount()
{
- return _tag2SubscriptionMap.size();
+ return _tag2SubscriptionTargetMap.size();
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index c7a84fa3b6..e83e86981b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -94,8 +94,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message,
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
props,
_channelId,
deliveryTag,
- ((SubscriptionImpl)sub).getConsumerTag());
+ new AMQShortString(sub.getName()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 85d995518a..6bcd4b9d49 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
new file mode 100644
index 0000000000..2e362c11f8
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -0,0 +1,32 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface ClientDeliveryMethod
+{
+ void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
+ final long deliveryTag) throws AMQException;
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
new file mode 100644
index 0000000000..47700f812f
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -0,0 +1,552 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
+ */
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
+ new StateChangeListener<MessageInstance, MessageInstance.State>()
+ {
+ @Override
+ public void stateChanged(final MessageInstance entry,
+ final MessageInstance.State oldSate,
+ final MessageInstance.State newState)
+ {
+ if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ {
+ restoreCredit(entry.getMessage());
+ }
+ entry.removeStateChangeListener(this);
+ }
+ };
+
+ private final ClientDeliveryMethod _deliveryMethod;
+ private final RecordDeliveryMethod _recordMethod;
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ private Consumer _consumer;
+
+
+ public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
+ {
+ return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
+ static final class BrowserConsumer extends ConsumerTarget_0_8
+ {
+ public BrowserConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag,
+ filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ }
+
+ }
+
+ @Override
+ public boolean allocateCredit(ServerMessage msg)
+ {
+ return true;
+ }
+
+ }
+
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
+ {
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public static class NoAckConsumer extends ConsumerTarget_0_8
+ {
+ private final AutoCommitTransaction _txn;
+
+ public NoAckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+
+ _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore());
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
+
+ ServerMessage message = entry.getMessage();
+ MessageReference ref = message.newReference();
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ sendToClient(message, props, deliveryTag);
+
+ }
+ ref.release();
+
+
+ }
+
+ @Override
+ public boolean allocateCredit(ServerMessage msg)
+ {
+ return true;
+ }
+
+ private static final ServerTransaction.Action NOOP =
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ }
+
+ @Override
+ public void onRollback()
+ {
+ }
+ };
+ }
+
+ /**
+ * NoAck Subscription for use with BasicGet method.
+ */
+ public static final class GetNoAckConsumer extends NoAckConsumer
+ {
+ public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean allocateCredit(ServerMessage msg)
+ {
+ return getCreditManager().useCreditForMessage(msg.getSize());
+ }
+
+ }
+
+
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager)
+ throws AMQException
+ {
+ return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ }
+
+
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+ }
+
+ static final class AckConsumer extends ConsumerTarget_0_8
+ {
+ public AckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(MessageInstance entry, boolean batch) throws AMQException
+ {
+
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ addUnacknowledgedMessage(entry);
+ recordMessageDelivery(entry, deliveryTag);
+ entry.addStateChangeListener(getReleasedStateChangeListener());
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ entry.incrementDeliveryCount();
+
+ }
+ }
+
+
+
+ }
+
+
+ private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class);
+
+ private final AMQChannel _channel;
+
+ private final AMQShortString _consumerTag;
+
+ private final FlowCreditManager _creditManager;
+
+ private final Boolean _autoClose;
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+
+ public ConsumerTarget_0_8(AMQChannel channel,
+ AMQShortString consumerTag,
+ FieldTable arguments,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(State.ACTIVE);
+
+ _channel = channel;
+ _consumerTag = consumerTag;
+
+ _creditManager = creditManager;
+ creditManager.addStateListener(this);
+
+ _deliveryMethod = deliveryMethod;
+ _recordMethod = recordMethod;
+
+ if (arguments != null)
+ {
+ Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+
+ public Consumer getConsumer()
+ {
+ return _consumer;
+ }
+
+ @Override
+ public void consumerRemoved(final Consumer sub)
+ {
+ }
+
+ @Override
+ public void consumerAdded(final Consumer sub)
+ {
+ _consumer = sub;
+ }
+
+ public AMQSessionModel getSessionModel()
+ {
+ return _channel;
+ }
+
+ public String toString()
+ {
+ String subscriber = "[channel=" + _channel +
+ ", consumerTag=" + _consumerTag +
+ ", session=" + getProtocolSession().getKey() ;
+
+ return subscriber + "]";
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ *
+ */
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getConsumer().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ return closed;
+ }
+ finally
+ {
+ getConsumer().releaseSendLock();
+ }
+ }
+
+
+ public boolean allocateCredit(ServerMessage msg)
+ {
+ return _creditManager.useCreditForMessage(msg.getSize());
+ }
+
+ public AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _channel.getProtocolSession();
+ }
+
+ public void restoreCredit(final ServerMessage message)
+ {
+ _creditManager.restoreCredit(1, message.getSize());
+ }
+
+ protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
+ {
+ return _entryReleaseListener;
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ throws AMQException
+ {
+ _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+
+ }
+
+
+ protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
+ {
+ _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
+ }
+
+
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
+
+ public void queueEmpty() throws AMQException
+ {
+ if (isAutoClose())
+ {
+ close();
+ confirmAutoClose();
+ }
+ }
+
+ public void flushBatched()
+ {
+ _channel.getProtocolSession().setDeferFlush(false);
+
+ _channel.getProtocolSession().flushBatched();
+ }
+
+ protected void addUnacknowledgedMessage(MessageInstance entry)
+ {
+ final long size = entry.getMessage().getSize();
+ _unacknowledgedBytes.addAndGet(size);
+ _unacknowledgedCount.incrementAndGet();
+ entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+ {
+ public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+ {
+ if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
+ {
+ _unacknowledgedBytes.addAndGet(-size);
+ _unacknowledgedCount.decrementAndGet();
+ entry.removeStateChangeListener(this);
+ }
+ }
+ });
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
index 060aebdd65..1de1638c2e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
@@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -35,34 +32,28 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
- private final Map<Long, QueueEntry> _msgToRequeue;
- private final Map<Long, QueueEntry> _msgToResend;
- private final boolean _requeueIfUnableToResend;
+ private final Map<Long, MessageInstance> _msgToRequeue;
+ private final Map<Long, MessageInstance> _msgToResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
- Map<Long, QueueEntry> msgToRequeue,
- Map<Long, QueueEntry> msgToResend,
- boolean requeueIfUnableToResend,
- MessageStore txnLog)
+ Map<Long, MessageInstance> msgToRequeue,
+ Map<Long, MessageInstance> msgToResend)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
_msgToResend = msgToResend;
- _requeueIfUnableToResend = requeueIfUnableToResend;
- _transactionLog = txnLog;
}
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
{
message.setRedelivered();
- final Subscription subscription = message.getDeliveredSubscription();
- if (subscription != null)
+ final Consumer consumer = message.getDeliveredConsumer();
+ if (consumer != null)
{
// Consumer exists
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
_msgToResend.put(deliveryTag, message);
}
@@ -73,58 +64,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
}
else
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (!message.isQueueDeleted())
- {
- if (_requeueIfUnableToResend)
- {
- _msgToRequeue.put(deliveryTag, message);
- }
- else
- {
-
- dequeueEntry(message);
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
- }
- else
- {
- dequeueEntry(message);
- _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
- }
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
// false means continue processing
return false;
}
-
- private void dequeueEntry(final QueueEntry node)
- {
- ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
- dequeueEntry(node, txn);
- }
-
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
- {
- txn.dequeue(node.getQueue(), node.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- node.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
public void visitComplete()
{
_unacknowledgedMessageMap.clear();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index 5a9a51ff59..80c4c77b65 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private Exchange _exchange;
+ private MessageDestination _messageDestination;
/**
* Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
return _messagePublishInfo.getExchange();
}
- public Exchange getExchange()
+ public MessageDestination getDestination()
{
- return _exchange;
+ return _messageDestination;
}
public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
return getContentHeader().getBodySize();
}
- public void setExchange(final Exchange e)
+ public void setMessageDestination(final MessageDestination e)
{
- _exchange = e;
+ _messageDestination = e;
}
public int getBodyCount() throws AMQException
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index ead28c6e26..3665e7f135 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -105,7 +105,7 @@ public class MessageMetaData implements StorableMessageMetaData
}
- public int writeToBuffer(int offset, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
int oldPosition = dest.position();
try
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
new file mode 100644
index 0000000000..70d7da3432
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
@@ -0,0 +1,29 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface RecordDeliveryMethod
+{
+ void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
deleted file mode 100644
index 6646dc0cc2..0000000000
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-
-/**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
- * primarily assists testing although in future more sophisticated subscribers may need a different subscription
- * implementation.
- *
- * @see org.apache.qpid.server.queue.AMQQueue
- */
-public interface SubscriptionFactory
-{
- Subscription createSubscription(int channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager) throws AMQException;
-
-
- Subscription createSubscription(AMQChannel channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod clientMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
-
-
- Subscription createBasicGetNoAckSubscription(AMQChannel channel,
- AMQProtocolSession session,
- AMQShortString consumerTag,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
-
-}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
deleted file mode 100644
index 93b51a0567..0000000000
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-
-public class SubscriptionFactoryImpl implements SubscriptionFactory
-{
-
- public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, boolean acks, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager) throws AMQException
- {
- AMQChannel channel = protocolSession.getChannel(channelId);
- if (channel == null)
- {
- throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
- }
- ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod();
- RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod();
-
-
- return createSubscription(channel, protocolSession, consumerTag, acks, filters,
- noLocal,
- creditManager,
- clientMethod,
- recordMethod
- );
- }
-
- public Subscription createSubscription(final AMQChannel channel,
- final AMQProtocolSession protocolSession,
- final AMQShortString consumerTag,
- final boolean acks,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod clientMethod,
- final RecordDeliveryMethod recordMethod
- )
- throws AMQException
- {
- boolean isBrowser;
-
- if (filters != null)
- {
- Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
- isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue();
- }
- else
- {
- isBrowser = false;
- }
-
- if(isBrowser)
- {
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
- }
- else if(acks)
- {
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
- }
- else
- {
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
- }
- }
-
- public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel,
- final AMQProtocolSession session,
- final AMQShortString consumerTag,
- final FieldTable filters,
- final boolean noLocal,
- final FlowCreditManager creditManager,
- final ClientDeliveryMethod deliveryMethod,
- final RecordDeliveryMethod recordMethod) throws AMQException
- {
- return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod);
- }
-
- public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
-
-}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
deleted file mode 100644
index 7c52fbe3b0..0000000000
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ /dev/null
@@ -1,858 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.FilterManagerFactory;
-import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.SubscriptionActor;
-import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
- * that was given out by the broker and the channel id. <p/>
- */
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
-{
-
- private StateListener _stateListener = new StateListener()
- {
-
- public void stateChange(Subscription sub, State oldState, State newState)
- {
-
- }
- };
-
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private volatile AMQQueue.Context _queueContext;
-
- private final ClientDeliveryMethod _deliveryMethod;
- private final RecordDeliveryMethod _recordMethod;
-
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
-
- private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
-
- private final Lock _stateChangeLock;
-
- private final long _subscriptionID;
- private LogSubject _logSubject;
- private LogActor _logActor;
- private final AtomicLong _deliveredCount = new AtomicLong(0);
- private final AtomicLong _deliveredBytes = new AtomicLong(0);
-
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
- private long _createTime = System.currentTimeMillis();
-
-
- static final class BrowserSubscription extends SubscriptionImpl
- {
- public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return true;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
- // We don't decrement the reference here as we don't want to consume the message
- // but we do want to send it to the client.
-
- synchronized (getChannel())
- {
- long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
- }
-
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- }
-
- public static class NoAckSubscription extends SubscriptionImpl
- {
- private final AutoCommitTransaction _txn;
-
- public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore());
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
- @Override
- public boolean isExplicitAcknowledge()
- {
- return false;
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry The message to send
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- _txn.dequeue(getQueue(), entry.getMessage(), NOOP);
-
- ServerMessage message = entry.getMessage();
- MessageReference ref = message.newReference();
- InstanceProperties props = entry.getInstanceProperties();
- entry.delete();
-
- synchronized (getChannel())
- {
- getChannel().getProtocolSession().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- sendToClient(message, props, deliveryTag);
-
- }
- ref.release();
-
-
- }
-
- @Override
- public boolean wouldSuspend(QueueEntry msg)
- {
- return false;
- }
-
- private static final ServerTransaction.Action NOOP =
- new ServerTransaction.Action()
- {
- @Override
- public void postCommit()
- {
- }
-
- @Override
- public void onRollback()
- {
- }
- };
- }
-
- /**
- * NoAck Subscription for use with BasicGet method.
- */
- public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
- {
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
- public boolean isTransient()
- {
- return true;
- }
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !getCreditManager().useCreditForMessage(msg.getMessage().getSize());
- }
-
- }
-
- static final class AckSubscription extends SubscriptionImpl
- {
- public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
- }
-
-
- public boolean isBrowser()
- {
- return false;
- }
-
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry The message to send
- * @param batch
- * @throws AMQException
- */
- @Override
- public void send(QueueEntry entry, boolean batch) throws AMQException
- {
-
-
- synchronized (getChannel())
- {
- getChannel().getProtocolSession().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- addUnacknowledgedMessage(entry);
- recordMessageDelivery(entry, deliveryTag);
- sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
- entry.incrementDeliveryCount();
-
- }
- }
-
-
-
- }
-
-
- private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
-
- private final AMQChannel _channel;
-
- private final AMQShortString _consumerTag;
-
-
- private boolean _noLocal;
-
- private final FlowCreditManager _creditManager;
-
- private FilterManager _filters;
-
- private final Boolean _autoClose;
-
- private AMQQueue _queue;
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
-
-
-
- public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable arguments,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
- throws AMQException
- {
- _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
- _channel = channel;
- _consumerTag = consumerTag;
-
- _creditManager = creditManager;
- creditManager.addStateListener(this);
-
- _noLocal = noLocal;
-
-
- _filters = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
-
- _deliveryMethod = deliveryMethod;
- _recordMethod = recordMethod;
-
-
- _stateChangeLock = new ReentrantLock();
-
-
- if (arguments != null)
- {
- Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
- if (autoClose != null)
- {
- _autoClose = (Boolean) autoClose;
- }
- else
- {
- _autoClose = false;
- }
- }
- else
- {
- _autoClose = false;
- }
-
- }
-
- public AMQSessionModel getSessionModel()
- {
- return _channel;
- }
-
- public Long getDelivered()
- {
- return _deliveredCount.get();
- }
-
- public synchronized void setQueue(AMQQueue queue, boolean exclusive)
- {
- if(getQueue() != null)
- {
- throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
- }
- _queue = queue;
-
- _logSubject = new SubscriptionLogSubject(this);
- _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
-
- if (CurrentActor.get().getRootMessageLogger().
- isMessageEnabled(CurrentActor.get(), _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
- {
- // Get the string value of the filters
- String filterLogString = null;
- if (_filters != null && _filters.hasFilters())
- {
- filterLogString = _filters.toString();
- }
-
- if (isAutoClose())
- {
- if (filterLogString == null)
- {
- filterLogString = "";
- }
- else
- {
- filterLogString += ",";
- }
- filterLogString += "AutoClose";
- }
-
- if (isBrowser())
- {
- // We do not need to check for null here as all Browsers are AutoClose
- filterLogString +=",Browser";
- }
-
- CurrentActor.get().
- message(_logSubject,
- SubscriptionMessages.CREATE(filterLogString,
- queue.isDurable() && exclusive,
- filterLogString != null));
- }
- }
-
- public String toString()
- {
- String subscriber = "[channel=" + _channel +
- ", consumerTag=" + _consumerTag +
- ", session=" + getProtocolSession().getKey() ;
-
- return subscriber + "]";
- }
-
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- * @param entry
- * @param batch
- * @throws AMQException
- */
- abstract public void send(QueueEntry entry, boolean batch) throws AMQException;
-
-
- public boolean isSuspended()
- {
- return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
- }
-
- /**
- * Callback indicating that a queue has been deleted.
- *
- * @param queue The queue to delete
- */
- public void queueDeleted(AMQQueue queue)
- {
- _deleted.set(true);
- }
-
- public boolean hasInterest(QueueEntry entry)
- {
- //check that the message hasn't been rejected
- if (entry.isRejectedBy(getSubscriptionID()))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Subscription:" + this + " rejected message:" + entry);
- }
- }
-
- if(entry.getMessage() instanceof AMQMessage)
- {
- if (_noLocal)
- {
- AMQMessage message = (AMQMessage) entry.getMessage();
-
- final Object publisherReference = message.getConnectionReference();
-
- // We don't want local messages so check to see if message is one we sent
- Object localReference = getProtocolSession().getReference();
-
- if(publisherReference != null && publisherReference.equals(localReference))
- {
- return false;
- }
- }
- }
- else
- {
- // No interest in messages we can't convert to AMQMessage
- if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null)
- {
- return false;
- }
- }
-
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + this + ") checking filters for message (" + entry);
- }
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(QueueEntry msg)
- {
- return (_filters == null) || _filters.allAllow(msg.asFilterable());
- }
-
- public boolean isAutoClose()
- {
- return _autoClose;
- }
-
- public FlowCreditManager getCreditManager()
- {
- return _creditManager;
- }
-
-
- public void close()
- {
- boolean closed = false;
- State state = getState();
-
- _stateChangeLock.lock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = _state.compareAndSet(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- else
- {
- _stateListener.stateChange(this,state, State.CLOSED);
- }
- }
- _creditManager.removeListener(this);
- }
- finally
- {
- _stateChangeLock.unlock();
- }
- //Log Subscription closed
- CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE());
- }
-
- public boolean isClosed()
- {
- return getState() == State.CLOSED;
- }
-
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return !_creditManager.useCreditForMessage(msg.getMessage().getSize());
- }
-
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
-
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public AMQChannel getChannel()
- {
- return _channel;
- }
-
- public AMQShortString getConsumerTag()
- {
- return _consumerTag;
- }
-
- public String getConsumerName()
- {
- return _consumerTag == null ? null : _consumerTag.asString();
- }
-
- public long getSubscriptionID()
- {
- return _subscriptionID;
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _channel.getProtocolSession();
- }
-
- public LogActor getLogActor()
- {
- return _logActor;
- }
-
- public AMQQueue getQueue()
- {
- return _queue;
- }
-
- public void onDequeue(final QueueEntry queueEntry)
- {
- restoreCredit(queueEntry);
- }
-
- public void releaseQueueEntry(final QueueEntry queueEntry)
- {
- restoreCredit(queueEntry);
- }
-
- public void restoreCredit(final QueueEntry queueEntry)
- {
- _creditManager.restoreCredit(1, queueEntry.getSize());
- }
-
- public void creditStateChanged(boolean hasCredit)
- {
-
- if(hasCredit)
- {
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
- else
- {
- // this is a hack to get round the issue of increasing bytes credit
- _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
- }
- }
- else
- {
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
- }
- CurrentActor.get().message(_logSubject,SubscriptionMessages.STATE(_state.get().toString()));
- }
-
- public State getState()
- {
- return _state.get();
- }
-
-
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context context)
- {
- _queueContext = context;
- }
-
-
- protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- _deliveryMethod.deliverToClient(this, message, props, deliveryTag);
- _deliveredCount.incrementAndGet();
- _deliveredBytes.addAndGet(message.getSize());
- }
-
-
- protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
- {
- _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void confirmAutoClose()
- {
- ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
- }
-
- public boolean acquires()
- {
- return !isBrowser();
- }
-
- public boolean seesRequeues()
- {
- return !isBrowser();
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
-
- public void setNoLocal(boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- abstract boolean isBrowser();
-
- public String getCreditMode()
- {
- return "WINDOW";
- }
-
- public boolean isBrowsing()
- {
- return isBrowser();
- }
-
- public boolean isExplicitAcknowledge()
- {
- return true;
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public boolean isExclusive()
- {
- return getQueue().hasExclusiveSubscriber();
- }
-
- public String getName()
- {
- return String.valueOf(_consumerTag);
- }
-
- public Map<String, Object> getArguments()
- {
- return null;
- }
-
- public boolean isSessionTransactional()
- {
- return _channel.isTransactional();
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public void queueEmpty() throws AMQException
- {
- if (isAutoClose())
- {
- _queue.unregisterSubscription(this);
-
- confirmAutoClose();
- }
- }
-
- public void flushBatched()
- {
- _channel.getProtocolSession().setDeferFlush(false);
-
- _channel.getProtocolSession().flushBatched();
- }
-
- public long getBytesOut()
- {
- return _deliveredBytes.longValue();
- }
-
- public long getMessagesOut()
- {
- return _deliveredCount.longValue();
- }
-
-
- protected void addUnacknowledgedMessage(QueueEntry entry)
- {
- final long size = entry.getSize();
- _unacknowledgedBytes.addAndGet(size);
- _unacknowledgedCount.incrementAndGet();
- entry.addStateChangeListener(new QueueEntry.StateChangeListener()
- {
- public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
- {
- if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
- {
- _unacknowledgedBytes.addAndGet(-size);
- _unacknowledgedCount.decrementAndGet();
- entry.removeStateChangeListener(this);
- }
- }
- });
- }
-
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
index 1d41bcdcf4..fcbbadd507 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMap
*@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+ boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
void visitComplete();
}
void visit(Visitor visitor) throws AMQException;
- void add(long deliveryTag, QueueEntry message);
+ void add(long deliveryTag, MessageInstance message);
- QueueEntry remove(long deliveryTag);
+ MessageInstance remove(long deliveryTag);
- Collection<QueueEntry> cancelAllMessages();
+ Collection<MessageInstance> cancelAllMessages();
int size();
void clear();
- QueueEntry get(long deliveryTag);
+ MessageInstance get(long deliveryTag);
/**
* Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMap
*/
Set<Long> getDeliveryTags();
- Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+ Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 17b2c7b985..8d70e769d3 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
private long _unackedSize;
- private Map<Long, QueueEntry> _map;
+ private Map<Long, MessageInstance> _map;
private long _lastDeliveryTag;
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+ _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
{
if (multiple)
{
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
else
{
- final QueueEntry entry = get(deliveryTag);
+ final MessageInstance entry = get(deliveryTag);
if(entry != null)
{
msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
- public void remove(Map<Long,QueueEntry> msgs)
+ public void remove(Map<Long,MessageInstance> msgs)
{
synchronized (_lock)
{
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public QueueEntry remove(long deliveryTag)
+ public MessageInstance remove(long deliveryTag)
{
synchronized (_lock)
{
- QueueEntry message = _map.remove(deliveryTag);
+ MessageInstance message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+ Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, MessageInstance> entry : currentEntries)
{
visitor.callback(entry.getKey().longValue(), entry.getValue());
}
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void add(long deliveryTag, QueueEntry message)
+ public void add(long deliveryTag, MessageInstance message)
{
synchronized (_lock)
{
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public Collection<QueueEntry> cancelAllMessages()
+ public Collection<MessageInstance> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<QueueEntry> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+ Collection<MessageInstance> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public QueueEntry get(long key)
+ public MessageInstance get(long key)
{
synchronized (_lock)
{
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
return ackedMessageMap.values();
}
- private void collect(long key, Map<Long, QueueEntry> msgs)
+ private void collect(long key, Map<Long, MessageInstance> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
msgs.put(entry.getKey(),entry.getValue());
if (entry.getKey() == key)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 836de44f4e..526bc9b9fe 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ queue,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
@@ -156,14 +160,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (AMQQueue.ExistingExclusiveConsumer e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 4b569ccc71..d4bd486a99 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.EnumSet;
+
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -128,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ public void deliverToClient(final Consumer sub, final ServerMessage message, final
InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -145,25 +149,32 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
- Subscription sub;
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES);
if(acks)
{
- sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- queue.registerSubscription(sub,false);
- queue.flushSubscription(sub);
- queue.unregisterSubscription(sub);
+ Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
return(!singleMessageCredit.hasCredit());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
index 497e97db3e..f8a7722447 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchange(exchangeName.toString());
+ MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
index 0a79466b35..606bcf1693 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
@@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
throw body.getChannelNotFoundException(channelId);
}
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
index b54e1c7dcf..ef26e60a62 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
@@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B
throw body.getChannelNotFoundException(channelId);
}
channel.sync();
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
index d4d8c9aaef..fdbd44b06d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
if (message == null)
{
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
}
else
{
- if (message.isQueueDeleted())
- {
- _logger.warn("Message's Queue has already been purged, dropping message");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
- {
- message.delete();
- }
- return;
- }
if (message.getMessage() == null)
{
@@ -98,41 +89,43 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
" on channel:" + channel.debugIdentity());
}
- message.reject();
-
if (body.getRequeue())
{
- channel.requeue(deliveryTag);
-
//this requeue represents a message rejected from the pre-dispatch queue
//therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
}
else
{
- final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
- _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
- if (maxDeliveryCountEnabled)
- {
- final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
- _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
- if (deliveredTooManyTimes)
- {
- channel.deadLetter(body.getDeliveryTag());
- }
- else
- {
- //this requeue represents a message rejected because of a recover/rollback that we
- //are not ready to DLQ. We rely on the reject command to resend from the unacked map
- //and therefore need to increment the delivery counter so we cancel out the effect
- //of the AMQChannel#resend() decrement.
- message.incrementDeliveryCount();
- }
- }
- else
- {
- channel.requeue(deliveryTag);
- }
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 3fdce83c2a..263175d590 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
};
protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.addQueueDeleteTask(new Action<AMQQueue>() {
+ public void performAction(AMQQueue queue)
{
protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
@@ -245,9 +246,9 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
session.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
session.removeSessionCloseTask(deleteQueueTask);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
index 19d0da007b..69ad1a0a21 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
@@ -74,7 +74,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(false);
+ channel.resend();
}
catch (AMQException e)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 2243cbff11..c805956b83 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Set;
/**
@@ -47,7 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCase
private void publishMessages(int count, boolean persistent) throws AMQException
{
- _queue.registerSubscription(_subscription,false);
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -144,7 +145,7 @@ public class AckTest extends QpidTestCase
try
{
- _queue.enqueue(message);
+ _queue.enqueue(message,null);
}
catch (AMQException e)
{
@@ -178,7 +179,13 @@ public class AckTest extends QpidTestCase
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -190,8 +197,8 @@ public class AckTest extends QpidTestCase
{
assertTrue(deliveryTag == i);
i++;
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
}
}
@@ -202,7 +209,16 @@ public class AckTest extends QpidTestCase
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +234,13 @@ public class AckTest extends QpidTestCase
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -235,7 +257,15 @@ public class AckTest extends QpidTestCase
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -248,8 +278,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -264,7 +294,15 @@ public class AckTest extends QpidTestCase
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -279,8 +317,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -290,7 +328,15 @@ public class AckTest extends QpidTestCase
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -303,8 +349,8 @@ public class AckTest extends QpidTestCase
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -319,12 +365,16 @@ public class AckTest extends QpidTestCase
// Send 10 messages
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 1;
publishMessages(msgCount);
- _queue.deliverAsync(_subscription);
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index bb5fecdfb4..281f7345ff 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
getQueue().deliverAsync();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
index 36a57fa05f..aa5a75396a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
@@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.v0_8;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
@@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest extends TestCase
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
- private MessageStore _messageStore = new TestMemoryMessageStore();
- private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private AMQQueue _queue;
+ private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
+ private Consumer _consumer;
+ private boolean _queueDeleted;
@Override
public void setUp() throws AMQException
{
+ _queueDeleted = false;
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn(getName());
+ when(_queue.isDeleted()).thenReturn(_queueDeleted);
+ _consumer = mock(Consumer.class);
+ when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+
long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
-
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(id);
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(msg);
+ when(entry.getQueue()).thenReturn(_queue);
+ when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(entry.isDeleted()).thenReturn(true);
+ return null;
+ }
+ }).when(entry).delete();
+
+ _unacknowledgedMessageMap.add(id, entry);
+ _referenceList.add(entry);
//Increment ID;
id++;
}
- // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
@@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest extends TestCase
*
* @return Subscription that performed the acquire
*/
- private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<MessageInstance> messageList)
{
- Subscription subscription = new MockSubscription();
- // Aquire messages in subscription
- for (QueueEntry entry : messageList)
+ // Acquire messages in subscription
+ for(MessageInstance entry : messageList)
{
- entry.acquire(subscription);
+ when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}
-
- return subscription;
}
/**
@@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest extends TestCase
public void testResend() throws AMQException
{
//We don't need the subscription object here.
- createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest extends TestCase
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
- Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
// Close subscription
- subscription.close();
+ when(_consumer.isClosed()).thenReturn(true);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
- * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = true so all messages should go to msgToRequeue
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
- * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testDrop() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
-
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
- * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
- * future we may wish to dead letter the message.
- *
- * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
- *
- * @throws AMQException the visit interface throws this
- */
- public void testDiscard() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- _queue.delete();
-
- // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
- }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index ef0837b3c6..1fad8fb41f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -47,11 +47,9 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -60,7 +58,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
// ChannelID(LIST) -> LinkedList<Pair>
- private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -68,7 +66,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
- _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+ _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
setTestAuthorizedSubject();
setVirtualHost(virtualHost);
@@ -117,7 +115,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
if (all == null)
{
@@ -153,23 +151,23 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(consumerTag, consumerDelivers);
+ consumers.put(consumerTag.toString(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
}
}
@@ -247,27 +245,27 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
@Override
- public void deliverToClient(Subscription sub, ServerMessage message,
+ public void deliverToClient(Consumer sub, ServerMessage message,
InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(_channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+ consumers.put(sub.getName(), consumerDelivers);
}
consumerDelivers.add(new DeliveryPair(deliveryTag, message));
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 8c716a0b56..e895f81c44 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -130,8 +130,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
//Check the process didn't suspend the subscription as this would
// indicate we are using the prefetch credit. i.e. using acks not No-Ack
assertTrue("The subscription has been suspended",
- !getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
+ !getChannel().getSubscription(browser).isSuspended());
}
private void checkStoreContents(int messageCount)
@@ -144,6 +143,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ return channel.consumeFromSource(null, queue, true, filters, true);
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
deleted file mode 100644
index e0d1b28007..0000000000
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class SubscriptionFactoryImplTest extends QpidTestCase
-{
- private AMQChannel _channel;
- private AMQProtocolSession _session;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
- _channel = BrokerTestHelper_0_8.createChannel();
- _session = _channel.getProtocolSession();
- GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false));
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.getVirtualHost().close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- /**
- * Tests that while creating Subscriptions of various types, the
- * ID numbers assigned are allocated from a common sequence
- * (in increasing order).
- */
- public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception
- {
- //create a No-Ack subscription, get the first Subscription ID
- long previousId = 0;
- Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager());
- previousId = noAckSub.getSubscriptionID();
-
- //create an ack subscription, verify the next Subscription ID is used
- Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID());
- previousId = ackSub.getSubscriptionID();
-
- //create a browser subscription
- FieldTable filters = new FieldTable();
- filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- Subscription browserSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, browserSub.getSubscriptionID());
- previousId = browserSub.getSubscriptionID();
-
- //create an BasicGet NoAck subscription
- Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false,
- _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod());
- assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID());
- previousId = getNoAckSub.getSubscriptionID();
-
- }
-
-}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 4082f22e9c..41e2fef03f 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements ConnectionEventListener
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
-
-
- public static interface Task
- {
- public void doTask(Connection_1_0 connection);
- }
-
-
- private List<Task> _closeTasks =
- Collections.synchronizedList(new ArrayList<Task>());
+ private List<Action<Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
@@ -98,26 +91,26 @@ public class Connection_1_0 implements ConnectionEventListener
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Task task)
+ void removeConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Task task)
+ void addConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Task> taskCopy;
+ List<Action<Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Task>(_closeTasks);
+ taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
}
- for(Task task : taskCopy)
+ for(Action<Connection_1_0> task : taskCopy)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized (_closeTasks)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 6a3f5b46e1..027c40aabe 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
@@ -41,153 +35,84 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Modified;
import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
-class
- Subscription_1_0 implements Subscription
-{
- private SendingLink_1_0 _link;
-
- private AMQQueue _queue;
-
- private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
+import java.nio.ByteBuffer;
+import java.util.List;
- private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
- private final long _id;
+class ConsumerTarget_1_0 extends AbstractConsumerTarget
+{
private final boolean _acquires;
- private volatile AMQQueue.Context _queueContext;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private ReentrantLock _stateChangeLock = new ReentrantLock();
-
- private boolean _noLocal;
- private FilterManager _filters;
+ private SendingLink_1_0 _link;
private long _deliveryTag = 0L;
- private StateListener _stateListener;
private Binary _transactionId;
- private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
- private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
-
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
- {
- this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY);
- }
+ private final AMQPDescribedTypeRegistry _typeRegistry;
+ private final SectionEncoder _sectionEncoder;
+ private Consumer _consumer;
- public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires)
+ public ConsumerTarget_1_0(final SendingLink_1_0 link,
+ boolean acquires)
{
+ super(State.SUSPENDED);
_link = link;
- _queue = destination.getQueue();
- _id = getEndpoint().getLocalHandle().longValue();
+ _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
+ _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
_acquires = acquires;
}
- private SendingLinkEndpoint getEndpoint()
- {
- return _link.getEndpoint();
- }
-
- public LogActor getLogActor()
- {
- return null; //TODO
- }
-
- public boolean isTransient()
- {
- return true; //TODO
- }
-
- public AMQQueue getQueue()
+ public Consumer getConsumer()
{
- return _queue;
+ return _consumer;
}
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return _owningState;
- }
-
- public void setQueue(final AMQQueue queue, final boolean exclusive)
- {
- //TODO
- }
-
- public void setNoLocal(final boolean noLocal)
- {
- _noLocal = noLocal;
- }
-
- public long getSubscriptionID()
+ private SendingLinkEndpoint getEndpoint()
{
- return _id;
+ return _link.getEndpoint();
}
public boolean isSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
}
- public boolean hasInterest(final QueueEntry entry)
+ public boolean close()
{
- if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference())
+ boolean closed = false;
+ State state = getState();
+
+ getConsumer().getSendLock();
+ try
{
- return false;
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ return closed;
}
- else if(!(entry.getMessage() instanceof Message_1_0)
- && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ finally
{
- return false;
+ getConsumer().releaseSendLock();
}
- return checkFilters(entry);
-
- }
-
- private boolean checkFilters(final QueueEntry entry)
- {
- return (_filters == null) || _filters.allAllow(entry.asFilterable());
- }
-
- public boolean isClosed()
- {
- return !getEndpoint().isAttached();
- }
-
- public boolean acquires()
- {
- return _acquires;
- }
-
- public boolean seesRequeues()
- {
- // TODO
- return acquires();
- }
-
- public void close()
- {
- getEndpoint().detach();
}
- public void send(QueueEntry entry, boolean batch) throws AMQException
+ public void send(MessageInstance entry, boolean batch) throws AMQException
{
// TODO
send(entry);
@@ -198,7 +123,7 @@ class
// TODO
}
- public void send(final QueueEntry queueEntry) throws AMQException
+ public void send(final MessageInstance queueEntry) throws AMQException
{
ServerMessage serverMessage = queueEntry.getMessage();
Message_1_0 message;
@@ -209,7 +134,7 @@ class
else
{
final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
- message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost());
+ message = (Message_1_0) converter.convert(serverMessage, _link.getVirtualHost());
}
Transfer transfer = new Transfer();
@@ -329,7 +254,7 @@ class
public void onRollback()
{
- if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+ if(queueEntry.isAcquiredBy(getConsumer()))
{
queueEntry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -352,14 +277,14 @@ class
}
- public void queueDeleted(final AMQQueue queue)
+ public void queueDeleted()
{
//TODO
getEndpoint().setSource(null);
getEndpoint().detach();
}
- public boolean wouldSuspend(final QueueEntry msg)
+ public boolean allocateCredit(final ServerMessage msg)
{
synchronized (_link.getLock())
{
@@ -369,103 +294,32 @@ class
suspend();
}
- return !hasCredit;
+ return hasCredit;
}
}
- public boolean trySendLock()
- {
- return _stateChangeLock.tryLock();
- }
public void suspend()
{
synchronized(_link.getLock())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
}
}
- public void getSendLock()
- {
- _stateChangeLock.lock();
- }
-
- public void releaseSendLock()
- {
- _stateChangeLock.unlock();
- }
-
- public void releaseQueueEntry(QueueEntry queueEntryImpl)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void onDequeue(final QueueEntry queueEntry)
+ public void restoreCredit(final ServerMessage message)
{
//TODO
}
- public void restoreCredit(final QueueEntry queueEntry)
- {
- //TODO
- }
-
- public void setStateListener(final StateListener listener)
- {
- _stateListener = listener;
- }
-
- public State getState()
- {
- return _state.get();
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
-
- public boolean isActive()
- {
- return getState() == State.ACTIVE;
- }
-
- public void set(String key, Object value)
- {
- _properties.put(key, value);
- }
-
- public Object get(String key)
- {
- return _properties.get(key);
- }
-
- public boolean isSessionTransactional()
- {
- return false; //TODO
- }
-
public void queueEmpty()
{
synchronized(_link.getLock())
{
if(_link.drained())
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
- {
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
}
}
}
@@ -476,10 +330,7 @@ class
{
if(isSuspended() && getEndpoint() != null)
{
- if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
- {
- _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
- }
+ updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
}
}
@@ -493,10 +344,10 @@ class
private class DispositionAction implements UnsettledAction
{
- private final QueueEntry _queueEntry;
+ private final MessageInstance _queueEntry;
private final Binary _deliveryTag;
- public DispositionAction(Binary tag, QueueEntry queueEntry)
+ public DispositionAction(Binary tag, MessageInstance queueEntry)
{
_deliveryTag = tag;
_queueEntry = queueEntry;
@@ -527,13 +378,13 @@ class
if(outcome instanceof Accepted)
{
- txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+ txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
public void postCommit()
{
- if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+ if(_queueEntry.isAcquiredBy(getConsumer()))
{
_queueEntry.delete();
}
@@ -618,7 +469,7 @@ class
private class DoNothingAction implements UnsettledAction
{
public DoNothingAction(final Binary tag,
- final QueueEntry queueEntry)
+ final MessageInstance queueEntry)
{
}
@@ -640,35 +491,22 @@ class
}
}
- public FilterManager getFilters()
- {
- return _filters;
- }
-
- public void setFilters(final FilterManager filters)
- {
- _filters = filters;
- }
-
@Override
public AMQSessionModel getSessionModel()
{
- // TODO
return getSession();
}
@Override
- public long getBytesOut()
+ public void consumerAdded(final Consumer sub)
{
- // TODO
- return 0;
+ _consumer = sub;
}
@Override
- public long getMessagesOut()
+ public void consumerRemoved(final Consumer sub)
{
- // TODO
- return 0;
+
}
@Override
@@ -685,10 +523,4 @@ class
return 0;
}
- @Override
- public String getConsumerName()
- {
- //TODO
- return "TODO";
- }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 78ca9ff2a6..a96d951de6 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
@@ -286,7 +285,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
Binary dataEncoding = sectionEncoder.getEncoding();
final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(0,allData);
+ metaData.writeToBuffer(allData);
allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
return allData;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 5026007360..be9d7a2d60 100755
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -314,7 +314,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -326,7 +326,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
new file mode 100644
index 0000000000..6f37d2d831
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class MessageSourceDestination implements SendingDestination
+{
+ private static final Logger _logger = Logger.getLogger(MessageSourceDestination.class);
+ private static final Accepted ACCEPTED = new Accepted();
+ private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
+
+
+ private MessageSource _queue;
+
+ public MessageSourceDestination(MessageSource queue)
+ {
+ _queue = queue;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 100;
+ }
+
+ public MessageSource getQueue()
+ {
+ return _queue;
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
new file mode 100644
index 0000000000..70f659b546
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class NodeReceivingDestination implements ReceivingDestination
+{
+ private static final Accepted ACCEPTED = new Accepted();
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
+
+ private MessageDestination _exchange;
+ private TerminusDurability _durability;
+ private TerminusExpiryPolicy _expiryPolicy;
+
+ public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy)
+ {
+ _exchange = exchange;
+ _durability = durable;
+ _expiryPolicy = expiryPolicy;
+ }
+
+ public Outcome[] getOutcomes()
+ {
+ return OUTCOMES;
+ }
+
+ public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ {
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
+
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
+
+
+ return enqueues == 0 ? REJECTED : ACCEPTED;
+ }
+
+ TerminusDurability getDurability()
+ {
+ return _durability;
+ }
+
+ TerminusExpiryPolicy getExpiryPolicy()
+ {
+ return _expiryPolicy;
+ }
+
+ public int getCredit()
+ {
+ // TODO - fix
+ return 20000;
+ }
+
+ public MessageDestination getDestination()
+ {
+ return _exchange;
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index b9c10b925f..3d6bb5e3db 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -24,22 +24,21 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
{
private static final Logger _logger = Logger.getLogger(QueueDestination.class);
private static final Accepted ACCEPTED = new Accepted();
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
- private AMQQueue _queue;
-
public QueueDestination(AMQQueue queue)
{
- _queue = queue;
+ super(queue);
}
public Outcome[] getOutcomes()
@@ -52,7 +51,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
try
{
- txn.enqueue(_queue,message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
@@ -60,8 +59,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
{
try
{
-
- _queue.enqueue(message);
+ getQueue().enqueue(message,null);
}
catch (Exception e)
{
@@ -93,7 +91,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio
public AMQQueue getQueue()
{
- return _queue;
+ return (AMQQueue) super.getQueue();
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 4abf1bf76b..9e0327fe76 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,11 +65,14 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -78,18 +82,22 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription_1_0 _subscription;
+ private Consumer _consumer;
+ private ConsumerTarget_1_0 _target;
+
private boolean _draining;
- private final Map<Binary, QueueEntry> _unsettledMap =
- new HashMap<Binary, QueueEntry>();
+ private final Map<Binary, MessageInstance> _unsettledMap =
+ new HashMap<Binary, MessageInstance>();
private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
new ConcurrentHashMap<Binary, UnsettledAction>();
private volatile SendingLinkAttachment _linkAttachment;
private TerminusDurability _durability;
- private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
+ private final MessageSource _queue;
+
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
final VirtualHost vhost,
@@ -103,24 +111,22 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_durability = source.getDurable();
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- AMQQueue queue = null;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
boolean noLocal = false;
JMSSelectorFilter messageFilter = null;
- if(destination instanceof QueueDestination)
+ if(destination instanceof MessageSourceDestination)
{
- queue = ((QueueDestination) _destination).getQueue();
+ _queue = ((MessageSourceDestination) _destination).getQueue();
- if(queue.getAvailableAttributes().contains("topic"))
+ if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
- qd = (QueueDestination) destination;
-
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -167,7 +173,13 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+ _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+
}
else if(destination instanceof ExchangeDestination)
{
@@ -199,7 +211,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
name = UUID.randomUUID().toString();
}
- queue = _vhost.getQueue(name);
+ AMQQueue queue = _vhost.getQueue(name);
Exchange exchange = exchangeDestination.getExchange();
if(queue == null)
@@ -299,9 +311,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
}
}
+ _queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- exchange.addBinding(binding,queue,null);
+ exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
@@ -309,10 +322,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
final String queueName = name;
final AMQQueue tempQueue = queue;
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -331,9 +344,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
}
@@ -347,31 +360,46 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
catch (AMQSecurityException e)
{
_logger.error("Security error", e);
+ throw new RuntimeException(e);
}
catch (AMQInternalException e)
{
_logger.error("Internal error", e);
+ throw new RuntimeException(e);
}
catch (AMQException e)
{
_logger.error("Error", e);
+ throw new RuntimeException(e);
}
- _subscription = new Subscription_1_0(this, qd, true);
+
+ _target = new ConsumerTarget_1_0(this, true);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+
+ }
+ else
+ {
+ throw new RuntimeException("Unknown destination type");
}
- if(_subscription != null)
+ if(_target != null)
{
- _subscription.setNoLocal(noLocal);
- if(messageFilter!=null)
+ if(noLocal)
{
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ options.add(Consumer.Option.NO_LOCAL);
}
+
+ _consumer.setNoLocal(noLocal);
+
+
try
{
-
- queue.registerSubscription(_subscription, false);
+ _consumer = _queue.addConsumer(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
@@ -394,12 +422,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
// if not durable or close
if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
- AMQQueue queue = _subscription.getQueue();
try
{
- queue.unregisterSubscription(_subscription);
+ _consumer.close();
}
catch (AMQException e)
@@ -426,7 +453,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
try
{
- queue.getVirtualHost().removeQueue(queue);
+ _vhost.removeQueue((AMQQueue)_queue);
}
catch(AMQException e)
{
@@ -443,7 +470,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
else if(detach == null || detach.getError() != null)
{
_linkAttachment = null;
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
else
{
@@ -491,7 +518,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
if(_resumeAcceptedTransfers.isEmpty())
{
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
}
@@ -531,7 +558,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
}
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
{
_unsettledActionMap.put(tag,unsettledAction);
if(getTransactionId() == null)
@@ -593,9 +620,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
- if(_subscription.isActive())
+ if(_consumer.isActive())
{
- _subscription.suspend();
+ _target.suspend();
}
_linkAttachment = linkAttachment;
@@ -603,14 +630,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
endpoint.setDeliveryStateHandler(this);
Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
- for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final QueueEntry queueEntry = entry.getValue();
+ final MessageInstance queueEntry = entry.getValue();
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
@@ -624,7 +651,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -644,7 +671,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
else if(outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -678,9 +705,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
public Map getUnsettledOutcomeMap()
{
- Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
- for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
{
entry.setValue(null);
}
@@ -692,4 +719,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_closeAction = action;
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return _vhost;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 823e4cb16d..beed6be84b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
@@ -108,11 +111,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueue(addr);
+ MessageSource queue = _vhost.getMessageSource(addr);
if(queue != null)
{
- destination = new QueueDestination(queue);
+ destination = new MessageSourceDestination(queue);
@@ -249,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
+ MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+ if(messageDestination != null)
{
- destination = new ExchangeDestination(exchg, target.getDurable(),
- target.getExpiryPolicy());
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy());
}
else
{
@@ -343,10 +346,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
{
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -365,9 +368,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
_connection.addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
_connection.removeConnectionCloseTask(deleteQueueTask);
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index a71d833fc3..9ca23ce1ce 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -20,7 +20,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -327,8 +326,8 @@ public class MessageServlet extends AbstractServlet
: entry.isAcquired()
? "Acquired"
: "");
- final Subscription deliveredSubscription = entry.getDeliveredSubscription();
- object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID());
+ final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+ object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId());
ServerMessage message = entry.getMessage();
if(message != null)