diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-07 16:57:49 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-07 16:57:49 +0000 |
| commit | 65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1 (patch) | |
| tree | 427d5ce851b9336fea70eb8fc6f87135ad239065 /qpid/java/broker-plugins | |
| parent | 3ab4f9bdc9bbc8375534f45022a02257eb6e030d (diff) | |
| download | qpid-python-65b1a1ddfe95b9e273d4cdaf23067a0aaff9b1d1.tar.gz | |
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565726 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
47 files changed, 2109 insertions, 3070 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java new file mode 100644 index 0000000000..6ad9de22cb --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -0,0 +1,580 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.*; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener +{ + + private static final Option[] BATCHED = new Option[] { Option.BATCH }; + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + private final String _name; + + + private FlowCreditManager_0_10 _creditManager; + + private final MessageAcceptMode _acceptMode; + private final MessageAcquireMode _acquireMode; + private MessageFlowMode _flowMode; + private final ServerSession _session; + private final AtomicBoolean _stopped = new AtomicBoolean(true); + + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + + private final Map<String, Object> _arguments; + private int _deferredMessageCredit; + private long _deferredSizeCredit; + private Consumer _consumer; + + + public ConsumerTarget_0_10(ServerSession session, + String name, + MessageAcceptMode acceptMode, + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + Map<String, Object> arguments) + { + super(State.SUSPENDED); + _session = session; + _postIdSettingAction = new AddMessageDispositionListenerAction(session); + _acceptMode = acceptMode; + _acquireMode = acquireMode; + _creditManager = creditManager; + _flowMode = flowMode; + _creditManager.addStateListener(this); + _arguments = arguments == null ? Collections.<String, Object> emptyMap() : + Collections.<String, Object> unmodifiableMap(arguments); + _name = name; + } + + public Consumer getConsumer() + { + return _consumer; + } + + public boolean isSuspended() + { + return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension + } + + public boolean close() + { + boolean closed = false; + State state = getState(); + + getConsumer().getSendLock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + _creditManager.removeListener(this); + } + finally + { + getConsumer().releaseSendLock(); + } + + return closed; + + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(!updateState(State.SUSPENDED, State.ACTIVE)) + { + // this is a hack to get round the issue of increasing bytes credit + getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + updateState(State.ACTIVE, State.SUSPENDED); + } + } + + public String getName() + { + return _name; + } + + + public static class AddMessageDispositionListenerAction implements Runnable + { + private MessageTransfer _xfr; + private ServerSession.MessageDispositionChangeListener _action; + private ServerSession _session; + + public AddMessageDispositionListenerAction(ServerSession session) + { + _session = session; + } + + public void setXfr(MessageTransfer xfr) + { + _xfr = xfr; + } + + public void setAction(ServerSession.MessageDispositionChangeListener action) + { + _action = action; + } + + public void run() + { + if(_action != null) + { + _session.onMessageDispositionChange(_xfr, _action); + } + } + } + + private final AddMessageDispositionListenerAction _postIdSettingAction; + + public void send(final MessageInstance entry, boolean batch) throws AMQException + { + ServerMessage serverMsg = entry.getMessage(); + + + MessageTransfer xfr; + + DeliveryProperties deliveryProps; + MessageProperties messageProps = null; + + MessageTransferMessage msg; + + if(serverMsg instanceof MessageTransferMessage) + { + + msg = (MessageTransferMessage) serverMsg; + + } + else + { + MessageConverter converter = + MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); + + + msg = (MessageTransferMessage) converter.convert(serverMsg, _session.getVirtualHost()); + } + DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); + messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); + + deliveryProps = new DeliveryProperties(); + if(origDeliveryProps != null) + { + if(origDeliveryProps.hasDeliveryMode()) + { + deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); + } + if(origDeliveryProps.hasExchange()) + { + deliveryProps.setExchange(origDeliveryProps.getExchange()); + } + if(origDeliveryProps.hasExpiration()) + { + deliveryProps.setExpiration(origDeliveryProps.getExpiration()); + } + if(origDeliveryProps.hasPriority()) + { + deliveryProps.setPriority(origDeliveryProps.getPriority()); + } + if(origDeliveryProps.hasRoutingKey()) + { + deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); + } + if(origDeliveryProps.hasTimestamp()) + { + deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); + } + if(origDeliveryProps.hasTtl()) + { + deliveryProps.setTtl(origDeliveryProps.getTtl()); + } + + + } + + deliveryProps.setRedelivered(entry.isRedelivered()); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + } + else if(_flowMode == MessageFlowMode.WINDOW) + { + xfr.setCompletionListener(new Method.CompletionListener() + { + public void onComplete(Method method) + { + deferredAddCredit(1, entry.getMessage().getSize()); + } + }); + } + + + _postIdSettingAction.setXfr(xfr); + if(_acceptMode == MessageAcceptMode.EXPLICIT) + { + _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); + } + else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) + { + _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); + } + else + { + _postIdSettingAction.setAction(null); + } + + + _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); + if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + forceDequeue(entry, false); + } + else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) + { + recordUnacknowledged(entry); + } + + } + + void recordUnacknowledged(MessageInstance entry) + { + _unacknowledgedCount.incrementAndGet(); + _unacknowledgedBytes.addAndGet(entry.getMessage().getSize()); + } + + private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) + { + _deferredMessageCredit += deferredMessageCredit; + _deferredSizeCredit += deferredSizeCredit; + + } + + public void flushCreditState(boolean strict) + { + if(strict || !isSuspended() || _deferredMessageCredit >= 200 + || !(_creditManager instanceof WindowCreditManager) + || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) + { + _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); + _deferredMessageCredit = 0; + _deferredSizeCredit = 0l; + } + } + + private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) + { + AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); + dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), + new ServerTransaction.Action() + { + public void postCommit() + { + if (restoreCredit) + { + restoreCredit(entry.getMessage()); + } + entry.delete(); + } + + public void onRollback() + { + + } + }); + } + + void reject(final MessageInstance entry) + { + entry.setRedelivered(); + entry.routeToAlternate(null, null); + if(entry.isAcquiredBy(getConsumer())) + { + entry.delete(); + } + } + + void release(final MessageInstance entry, final boolean setRedelivered) + { + if (setRedelivered) + { + entry.setRedelivered(); + } + + if (getSessionModel().isClosing() || !setRedelivered) + { + entry.decrementDeliveryCount(); + } + + if (isMaxDeliveryLimitReached(entry)) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(MessageInstance entry) + { + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + + int requeues = entry.routeToAlternate(new Action<MessageInstance>() + { + @Override + public void performAction(final MessageInstance requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getOwningResource().getName())); + } + }, null); + + if (requeues == 0) + { + TransactionLogResource owningResource = entry.getOwningResource(); + if(owningResource instanceof AMQQueue) + { + final AMQQueue queue = (AMQQueue)owningResource; + final Exchange alternateExchange = queue.getAlternateExchange(); + + if(alternateExchange != null) + { + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); + } + else + { + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); + } + } + } + } + + private boolean isMaxDeliveryLimitReached(MessageInstance entry) + { + final int maxDeliveryLimit = entry.getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); + } + + public void queueDeleted() + { + _deleted.set(true); + } + + public boolean allocateCredit(ServerMessage message) + { + return _creditManager.useCreditForMessage(message.getSize()); + } + + public void restoreCredit(ServerMessage message) + { + _creditManager.restoreCredit(1, message.getSize()); + } + + public FlowCreditManager_0_10 getCreditManager() + { + return _creditManager; + } + + + public void stop() + { + try + { + getConsumer().getSendLock(); + + updateState(State.ACTIVE, State.SUSPENDED); + _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); + } + finally + { + getConsumer().releaseSendLock(); + } + } + + public void addCredit(MessageCreditUnit unit, long value) + { + FlowCreditManager_0_10 creditManager = getCreditManager(); + + switch (unit) + { + case MESSAGE: + + creditManager.addCredit(value, 0L); + break; + case BYTE: + creditManager.addCredit(0l, value); + break; + } + + _stopped.set(false); + + if(creditManager.hasCredit()) + { + updateState(State.SUSPENDED, State.ACTIVE); + } + + } + + public void setFlowMode(MessageFlowMode flowMode) + { + + + _creditManager.removeListener(this); + + switch(flowMode) + { + case CREDIT: + _creditManager = new CreditCreditManager(0l,0l); + break; + case WINDOW: + _creditManager = new WindowCreditManager(0l,0l); + break; + default: + throw new RuntimeException("Unknown message flow mode: " + flowMode); + } + _flowMode = flowMode; + updateState(State.ACTIVE, State.SUSPENDED); + + _creditManager.addStateListener(this); + + } + + public boolean isStopped() + { + return _stopped.get(); + } + + public void acknowledge(MessageInstance entry) + { + // TODO Fix Store Context / cleanup + if(entry.isAcquiredBy(getConsumer())) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); + entry.delete(); + } + } + + public void flush() throws AMQException + { + flushCreditState(true); + getConsumer().flush(); + stop(); + } + + public ServerSession getSessionModel() + { + return _session; + } + + public boolean isDurable() + { + return false; + } + + public Map<String, Object> getArguments() + { + return _arguments; + } + + public void queueEmpty() + { + } + + public void flushBatched() + { + _session.getConnection().flush(); + } + + + @Override + public void consumerAdded(final Consumer sub) + { + _consumer = sub; + } + + @Override + public void consumerRemoved(final Consumer sub) + { + } + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4b38b8a1a3..4420709a91 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener @@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; - private final Subscription_0_10 _sub; + private final MessageInstance _entry; + private final ConsumerTarget_0_10 _target; - public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.getSessionModel().acknowledge(subscription, _entry); + _target.getSessionModel().acknowledge(_target, _entry); } else { @@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - final Subscription_0_10 subscription = getSubscription(); - if(subscription != null && _entry.isAcquiredBy(_sub)) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) { - subscription.reject(_entry); + _target.reject(_entry); } else { @@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - return _entry.acquire(getSubscription()); + return _entry.acquire(_target.getConsumer()); } - private Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java index ce0155b789..c459364dbb 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java @@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener { private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class); - private final QueueEntry _entry; - private Subscription_0_10 _sub; + private final MessageInstance _entry; + private ConsumerTarget_0_10 _target; - public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10) + public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target) { _entry = entry; - _sub = subscription_0_10; + _target = target; } public void onAccept() @@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onRelease(boolean setRedelivered) { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getConsumer())) { - getSubscription().release(_entry, setRedelivered); + _target.release(_entry, setRedelivered); } else { @@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onReject() { - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_target.getConsumer())) { - getSubscription().reject(_entry); + _target.reject(_entry); } else { @@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { - boolean acquired = _entry.acquire(getSubscription()); + boolean acquired = _entry.acquire(_target.getConsumer()); if(acquired) { - getSubscription().recordUnacknowledged(_entry); + _target.recordUnacknowledged(_entry); } return acquired; } - public Subscription_0_10 getSubscription() - { - return _sub; - } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index f5f2a8d43f..cd1146ac0b 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -21,17 +21,17 @@ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.transport.Method; public class MessageAcceptCompletionListener implements Method.CompletionListener { - private final Subscription_0_10 _sub; - private final QueueEntry _entry; + private final ConsumerTarget_0_10 _sub; + private final MessageInstance _entry; private final ServerSession _session; private boolean _restoreCredit; - public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) + public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) { super(); _sub = sub; @@ -44,9 +44,9 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene { if(_restoreCredit) { - _sub.restoreCredit(_entry); + _sub.restoreCredit(_entry.getMessage()); } - if(_entry.isAcquiredBy(_sub)) + if(_entry.isAcquiredBy(_sub.getConsumer())) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2e74621814..687331e51d 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData return buf; } - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + public int writeToBuffer(ByteBuffer dest) { ByteBuffer buf = _encoded; @@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData buf = buf.duplicate(); - buf.position(offsetInMetaData); + buf.position(0); if(dest.remaining() < buf.limit()) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index a15fea1200..c85a415ce5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -282,8 +282,8 @@ public class ServerConnectionDelegate extends ServerDelegate private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); - final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subs) + final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subs) { subscription_0_10.stop(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 93d886687c..53022c333e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; -import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -104,14 +106,7 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() - { - @Override - public void onEnqueue(final QueueEntry entry) - { - entry.getQueue().checkCapacity(ServerSession.this); - } - }; + private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); public static interface MessageDispositionChangeListener { @@ -126,12 +121,6 @@ public class ServerSession extends Session } - public static interface Task - { - public void doTask(ServerSession session); - } - - private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); @@ -142,9 +131,9 @@ public class ServerSession extends Session private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -194,7 +183,7 @@ public class ServerSession extends Session public int enqueue(final MessageTransferMessage message, final InstanceProperties instanceProperties, - final Exchange exchange) + final MessageDestination exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -386,9 +375,9 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Task task : _taskList) + for (Action<ServerSession> task : _taskList) { - task.doTask(this); + task.performAction(this); } LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); @@ -405,9 +394,9 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry) { - _transaction.dequeue(entry.getQueue(), entry.getMessage(), + _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { @@ -426,42 +415,26 @@ public class ServerSession extends Session }); } - public Collection<Subscription_0_10> getSubscriptions() + public Collection<ConsumerTarget_0_10> getSubscriptions() { return _subscriptions.values(); } - public void register(String destination, Subscription_0_10 sub) + public void register(String destination, ConsumerTarget_0_10 sub) { _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } - public Subscription_0_10 getSubscription(String destination) + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); } - public void unregister(Subscription_0_10 sub) + public void unregister(ConsumerTarget_0_10 sub) { _subscriptions.remove(sub.getName()); - try - { - sub.getSendLock(); - AMQQueue queue = sub.getQueue(); - if(queue != null) - { - queue.unregisterSubscription(sub); - } - } - catch (AMQException e) - { - // TODO - _logger.error("Failed to unregister subscription :" + e.getMessage(), e); - } - finally - { - sub.releaseSendLock(); - } + sub.close(); + } public boolean isTransactional() @@ -638,12 +611,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Task task) + public void addSessionCloseTask(Action<ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeSessionCloseTask(Action<ServerSession> task) { _taskList.remove(task); } @@ -829,8 +802,8 @@ public class ServerSession extends Session void unregisterSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } @@ -838,8 +811,8 @@ public class ServerSession extends Session void stopSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.stop(); } @@ -848,8 +821,8 @@ public class ServerSession extends Session public void receivedComplete() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } @@ -955,4 +928,16 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } + private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>> + { + @Override + public void performAction(final MessageInstance<C> entry) + { + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index dcca696529..9a90b74656 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; @@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -45,6 +48,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -193,7 +198,7 @@ public class ServerSessionDelegate extends SessionDelegate String queueName = method.getQueue(); VirtualHost vhost = getVirtualHost(session); - final AMQQueue queue = vhost.getQueue(queueName); + final MessageSource queue = vhost.getMessageSource(queueName); if(queue == null) { @@ -214,9 +219,9 @@ public class ServerSessionDelegate extends SessionDelegate ServerSession s = (ServerSession) session; queue.setExclusiveOwningSession(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getExclusiveOwningSession() == session) { @@ -228,9 +233,9 @@ public class ServerSessionDelegate extends SessionDelegate if(queue.getAuthorizationHolder() == null) { queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getAuthorizationHolder() == session) { @@ -254,25 +259,42 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, - destination, - method.getAcceptMode(), - method.getAcquireMode(), - MessageFlowMode.WINDOW, - creditManager, - filterManager, - method.getArguments()); + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, + method.getArguments() + ); - ((ServerSession)session).register(destination, sub); + ((ServerSession)session).register(destination, target); try { - queue.registerSubscription(sub, method.getExclusive()); + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) + { + options.add(Consumer.Option.ACQUIRES); + } + if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + options.add(Consumer.Option.SEES_REQUEUES); + } + if(method.getExclusive()) + { + options.add(Consumer.Option.EXCLUSIVE); + } + Consumer sub = + queue.addConsumer(target, + filterManager, + MessageTransferMessage.class, + destination, + options); } - catch (AMQQueue.ExistingExclusiveSubscription existing) + catch (AMQQueue.ExistingExclusiveConsumer existing) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) + catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive) { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); } @@ -288,7 +310,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - final Exchange exchange = getExchangeForMessage(ssn, xfr); + final MessageDestination exchange = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) @@ -307,7 +329,6 @@ public class ServerSessionDelegate extends SessionDelegate return; } - final Exchange exchangeInUse; final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; @@ -385,7 +406,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -393,12 +414,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = sub.getQueue(); ((ServerSession)session).unregister(sub); - if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) - { - queue.setAuthorizationHolder(null); - } } } @@ -407,7 +423,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -814,24 +830,24 @@ public class ServerSessionDelegate extends SessionDelegate return getVirtualHost(session).getExchange(exchangeName); } - private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr) + private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr) { VirtualHost virtualHost = getVirtualHost(ssn); - Exchange exchange; + MessageDestination destination; if(xfr.hasDestination()) { - exchange = virtualHost.getExchange(xfr.getDestination()); - if(exchange == null) + destination = virtualHost.getMessageDestination(xfr.getDestination()); + if(destination == null) { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } } else { - exchange = virtualHost.getDefaultExchange(); + destination = virtualHost.getDefaultExchange(); } - return exchange; + return destination; } private VirtualHost getVirtualHost(Session session) @@ -1249,9 +1265,9 @@ public class ServerSessionDelegate extends SessionDelegate if (autoDelete && exclusive) { final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() + final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { try { @@ -1265,9 +1281,9 @@ public class ServerSessionDelegate extends SessionDelegate }; final ServerSession s = (ServerSession) session; s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(deleteQueueTask); } @@ -1276,9 +1292,9 @@ public class ServerSessionDelegate extends SessionDelegate if (exclusive) { final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() + final Action<ServerSession> removeExclusive = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { q.setAuthorizationHolder(null); q.setExclusiveOwningSession(null); @@ -1287,9 +1303,9 @@ public class ServerSessionDelegate extends SessionDelegate final ServerSession s = (ServerSession) session; q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(removeExclusive); } @@ -1461,7 +1477,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = sfm.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1478,7 +1494,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = stop.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1496,7 +1512,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = flow.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java deleted file mode 100644 index 357b565365..0000000000 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ /dev/null @@ -1,944 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.ChannelMessages; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; - -import java.text.MessageFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject -{ - private final long _subscriptionID; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private static final Option[] BATCHED = new Option[] { Option.BATCH }; - - private final Lock _stateChangeLock = new ReentrantLock(); - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - private FlowCreditManager_0_10 _creditManager; - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private AMQQueue _queue; - private final String _destination; - private boolean _noLocal; - private final FilterManager _filters; - private final MessageAcceptMode _acceptMode; - private final MessageAcquireMode _acquireMode; - private MessageFlowMode _flowMode; - private final ServerSession _session; - private final AtomicBoolean _stopped = new AtomicBoolean(true); - private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; - - private LogActor _logActor; - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private String _traceExclude; - private String _trace; - private final long _createTime = System.currentTimeMillis(); - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private final Map<String, Object> _arguments; - private int _deferredMessageCredit; - private long _deferredSizeCredit; - - - public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, - MessageFlowMode flowMode, - FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments) - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _session = session; - _postIdSettingAction = new AddMessageDispositionListenerAction(session); - _destination = destination; - _acceptMode = acceptMode; - _acquireMode = acquireMode; - _creditManager = creditManager; - _flowMode = flowMode; - _filters = filters; - _creditManager.addStateListener(this); - _arguments = arguments == null ? Collections.<String, Object> emptyMap() : - Collections.<String, Object> unmodifiableMap(arguments); - _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); - - } - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); - _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); - String filterLogString = null; - - _logActor = GenericActor.getInstance(this); - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - filterLogString = getFilterLogString(); - CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - public String getConsumerName() - { - return _destination; - } - - public boolean isSuspended() - { - return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension - } - - public boolean hasInterest(QueueEntry entry) - { - - - - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - - return false; - } - - if (entry.getMessage() instanceof MessageTransferMessage) - { - if(_noLocal) - { - Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); - if (connectionRef != null && connectionRef == _session.getReference()) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null) - { - return false; - } - } - - - return checkFilters(entry); - - - } - - private boolean checkFilters(QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - public boolean isBrowser() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean seesRequeues() - { - return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - finally - { - _stateChangeLock.unlock(); - } - - - - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - } - - - public static class AddMessageDispositionListenerAction implements Runnable - { - private MessageTransfer _xfr; - private ServerSession.MessageDispositionChangeListener _action; - private ServerSession _session; - - public AddMessageDispositionListenerAction(ServerSession session) - { - _session = session; - } - - public void setXfr(MessageTransfer xfr) - { - _xfr = xfr; - } - - public void setAction(ServerSession.MessageDispositionChangeListener action) - { - _action = action; - } - - public void run() - { - if(_action != null) - { - _session.onMessageDispositionChange(_xfr, _action); - } - } - } - - private final AddMessageDispositionListenerAction _postIdSettingAction; - - public void send(final QueueEntry entry, boolean batch) throws AMQException - { - ServerMessage serverMsg = entry.getMessage(); - - - MessageTransfer xfr; - - DeliveryProperties deliveryProps; - MessageProperties messageProps = null; - - MessageTransferMessage msg; - - if(serverMsg instanceof MessageTransferMessage) - { - - msg = (MessageTransferMessage) serverMsg; - - } - else - { - MessageConverter converter = - MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); - - - msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost()); - } - DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); - messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - - deliveryProps = new DeliveryProperties(); - if(origDeliveryProps != null) - { - if(origDeliveryProps.hasDeliveryMode()) - { - deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); - } - if(origDeliveryProps.hasExchange()) - { - deliveryProps.setExchange(origDeliveryProps.getExchange()); - } - if(origDeliveryProps.hasExpiration()) - { - deliveryProps.setExpiration(origDeliveryProps.getExpiration()); - } - if(origDeliveryProps.hasPriority()) - { - deliveryProps.setPriority(origDeliveryProps.getPriority()); - } - if(origDeliveryProps.hasRoutingKey()) - { - deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); - } - if(origDeliveryProps.hasTimestamp()) - { - deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); - } - if(origDeliveryProps.hasTtl()) - { - deliveryProps.setTtl(origDeliveryProps.getTtl()); - } - - - } - - deliveryProps.setRedelivered(entry.isRedelivered()); - - if(_trace != null && messageProps == null) - { - messageProps = new MessageProperties(); - } - - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - - - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); - - boolean excludeDueToFederation = false; - - if(_trace != null) - { - if(!messageProps.hasApplicationHeaders()) - { - messageProps.setApplicationHeaders(new HashMap<String,Object>()); - } - Map<String,Object> appHeaders = messageProps.getApplicationHeaders(); - String trace = (String) appHeaders.get("x-qpid.trace"); - if(trace == null) - { - trace = _trace; - } - else - { - if(_traceExclude != null) - { - excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude); - } - trace+=","+_trace; - } - appHeaders.put("x-qpid.trace",trace); - } - - if(!excludeDueToFederation) - { - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); - } - else if(_flowMode == MessageFlowMode.WINDOW) - { - xfr.setCompletionListener(new Method.CompletionListener() - { - public void onComplete(Method method) - { - deferredAddCredit(1, entry.getSize()); - } - }); - } - - - _postIdSettingAction.setXfr(xfr); - if(_acceptMode == MessageAcceptMode.EXPLICIT) - { - _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this)); - } - else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED) - { - _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this)); - } - else - { - _postIdSettingAction.setAction(null); - } - - - _session.sendMessage(xfr, _postIdSettingAction); - entry.incrementDeliveryCount(); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(entry.getSize()); - if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - forceDequeue(entry, false); - } - else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED) - { - recordUnacknowledged(entry); - } - } - else - { - forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW); - - } - } - - void recordUnacknowledged(QueueEntry entry) - { - _unacknowledgedCount.incrementAndGet(); - _unacknowledgedBytes.addAndGet(entry.getSize()); - } - - private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit) - { - _deferredMessageCredit += deferredMessageCredit; - _deferredSizeCredit += deferredSizeCredit; - - } - - public void flushCreditState(boolean strict) - { - if(strict || !isSuspended() || _deferredMessageCredit >= 200 - || !(_creditManager instanceof WindowCreditManager) - || ((WindowCreditManager)_creditManager).getMessageCreditLimit() < 400 ) - { - _creditManager.restoreCredit(_deferredMessageCredit, _deferredSizeCredit); - _deferredMessageCredit = 0; - _deferredSizeCredit = 0l; - } - } - - private void forceDequeue(final QueueEntry entry, final boolean restoreCredit) - { - AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(), - new ServerTransaction.Action() - { - public void postCommit() - { - if (restoreCredit) - { - restoreCredit(entry); - } - entry.delete(); - } - - public void onRollback() - { - - } - }); - } - - void reject(final QueueEntry entry) - { - entry.setRedelivered(); - entry.routeToAlternate(null, null); - if(entry.isAcquiredBy(this)) - { - entry.delete(); - } - } - - void release(final QueueEntry entry, final boolean setRedelivered) - { - if (setRedelivered) - { - entry.setRedelivered(); - } - - if (getSessionModel().isClosing() || !setRedelivered) - { - entry.decrementDeliveryCount(); - } - - if (isMaxDeliveryLimitReached(entry)) - { - sendToDLQOrDiscard(entry); - } - else - { - entry.release(); - } - } - - protected void sendToDLQOrDiscard(QueueEntry entry) - { - final LogActor logActor = CurrentActor.get(); - final ServerMessage msg = entry.getMessage(); - - int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() - { - @Override - public void onEnqueue(final QueueEntry requeueEntry) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); - } - }, null); - - if (requeues == 0) - { - final AMQQueue queue = entry.getQueue(); - final Exchange alternateExchange = queue.getAlternateExchange(); - - if(alternateExchange != null) - { - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), - alternateExchange.getName())); - } - else - { - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), - queue.getName(), - msg.getRoutingKey())); - } - } - } - - private boolean isMaxDeliveryLimitReached(QueueEntry entry) - { - final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); - return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); - } - - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean wouldSuspend(QueueEntry entry) - { - return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void restoreCredit(QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void onDequeue(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void releaseQueueEntry(QueueEntry queueEntry) - { - // no-op for 0-10, credit restored by completing command. - } - - public void setStateListener(StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public FlowCreditManager_0_10 getCreditManager() - { - return _creditManager; - } - - - public void stop() - { - try - { - getSendLock(); - - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); - } - finally - { - releaseSendLock(); - } - } - - public void addCredit(MessageCreditUnit unit, long value) - { - FlowCreditManager_0_10 creditManager = getCreditManager(); - - switch (unit) - { - case MESSAGE: - - creditManager.addCredit(value, 0L); - break; - case BYTE: - creditManager.addCredit(0l, value); - break; - } - - _stopped.set(false); - - if(creditManager.hasCredit()) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - } - - } - - public void setFlowMode(MessageFlowMode flowMode) - { - - - _creditManager.removeListener(this); - - switch(flowMode) - { - case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); - break; - case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); - break; - default: - throw new RuntimeException("Unknown message flow mode: " + flowMode); - } - _flowMode = flowMode; - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - - _creditManager.addStateListener(this); - - } - - public boolean isStopped() - { - return _stopped.get(); - } - - public boolean acquires() - { - return _acquireMode == MessageAcquireMode.PRE_ACQUIRED; - } - - public void acknowledge(QueueEntry entry) - { - // TODO Fix Store Context / cleanup - if(entry.isAcquiredBy(this)) - { - _unacknowledgedBytes.addAndGet(-entry.getSize()); - _unacknowledgedCount.decrementAndGet(); - entry.delete(); - } - } - - public void flush() throws AMQException - { - flushCreditState(true); - _queue.flushSubscription(this); - stop(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public LogActor getLogActor() - { - return _logActor; - } - - public boolean isTransient() - { - return false; - } - - public ServerSession getSessionModel() - { - return _session; - } - - public boolean isBrowsing() - { - return _acquireMode == MessageAcquireMode.NOT_ACQUIRED; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public boolean isDurable() - { - return false; - } - - - public boolean isExplicitAcknowledge() - { - return _acceptMode == MessageAcceptMode.EXPLICIT; - } - - public String getCreditMode() - { - return _flowMode.toString(); - } - - public String getName() - { - return _destination; - } - - public Map<String, Object> getArguments() - { - return _arguments; - } - - public boolean isSessionTransactional() - { - return _session.isTransactional(); - } - - public void queueEmpty() - { - } - - public long getCreateTime() - { - return _createTime; - } - - public String toLogString() - { - String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), - _queue.getName()); - String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "(" - // queueString is "vh(/{0})/qu({1}) " so need to trim - + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] "; - return result; - } - - private String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } - - if (isBrowser()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - hasEntries = true; - } - - if (isDurable()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Durable"); - hasEntries = true; - } - - return filterLogString.toString(); - } - - public LogSubject getLogSubject() - { - return (LogSubject) this; - } - - - public void flushBatched() - { - _session.getConnection().flush(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index e139887284..aa465d373f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -21,19 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -42,6 +30,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -55,6 +44,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; @@ -66,25 +56,28 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; @@ -122,7 +115,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private IncomingMessage _currentMessage; /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */ - private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>(); + private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>(); private final MessageStore _messageStore; @@ -155,7 +148,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private volatile boolean _rollingBack; private static final Runnable NULL_TASK = new Runnable() { public void run() {} }; - private List<QueueEntry> _resendList = new ArrayList<QueueEntry>(); + private List<MessageInstance> _resendList = new ArrayList<MessageInstance>(); private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible."); private long _createTime = System.currentTimeMillis(); @@ -266,7 +259,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return _channelId; } - public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException + public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException { String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); SecurityManager securityManager = getVirtualHost().getSecurityManager(); @@ -275,7 +268,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F throw new AMQSecurityException("Permission denied: " + e.getName()); } _currentMessage = new IncomingMessage(info); - _currentMessage.setExchange(e); + _currentMessage.setMessageDestination(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody) @@ -360,7 +353,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, immediate ? _immediateAction : _capacityCheckAction); if(enqueues == 0) { @@ -497,62 +490,89 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public Subscription getSubscription(AMQShortString subscription) + public Consumer getSubscription(AMQShortString tag) { - return _tag2SubscriptionMap.get(subscription); + final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); + return target == null ? null : target.getConsumer(); } /** * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean * up all subscriptions, even if the client does not explicitly unsubscribe from all queues. * + * * @param tag the tag chosen by the client (if null, server will generate one) - * @param queue the queue to subscribe to + * @param source the queue to subscribe to * @param acks Are acks enabled for this subscriber * @param filters Filters to apply to this subscriber * - * @param noLocal Flag stopping own messages being received. * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * * @throws AMQException if something goes wrong */ - public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException + public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, + FieldTable filters, boolean exclusive) throws AMQException { if (tag == null) { tag = new AMQShortString("sgen_" + getNextConsumerTag()); } - if (_tag2SubscriptionMap.containsKey(tag)) + if (_tag2SubscriptionTargetMap.containsKey(tag)) { throw new AMQException("Consumer already exists with same tag: " + tag); } - Subscription subscription = - SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager); + ConsumerTarget_0_8 target; + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + + if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + { + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + } + else if(acks) + { + target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); + } + else + { + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); + } + if(exclusive) + { + options.add(Consumer.Option.EXCLUSIVE); + } // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. // We add before we register as the Async Delivery process may AutoClose the subscriber // so calling _cT2QM.remove before we have done put which was after the register succeeded. // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. - _tag2SubscriptionMap.put(tag, subscription); + _tag2SubscriptionTargetMap.put(tag, target); try { - queue.registerSubscription(subscription, exclusive); + Consumer sub = + source.addConsumer(target, + FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), + AMQMessage.class, + AMQShortString.toString(tag), + options); } catch (AMQException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } catch (RuntimeException e) { - _tag2SubscriptionMap.remove(tag); + _tag2SubscriptionTargetMap.remove(tag); throw e; } return tag; @@ -567,18 +587,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException { - Subscription sub = _tag2SubscriptionMap.remove(consumerTag); + ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); + Consumer sub = target == null ? null : target.getConsumer(); if (sub != null) { - try - { - sub.getSendLock(); - sub.getQueue().unregisterSubscription(sub); - } - finally - { - sub.releaseSendLock(); - } + sub.close(); return true; } else @@ -633,7 +646,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { if (_logger.isInfoEnabled()) { - if (!_tag2SubscriptionMap.isEmpty()) + if (!_tag2SubscriptionTargetMap.isEmpty()) { _logger.info("Unsubscribing all consumers on channel " + toString()); } @@ -643,28 +656,21 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet()) + for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet()) { if (_logger.isInfoEnabled()) { _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Subscription sub = me.getValue(); + Consumer sub = me.getValue().getConsumer(); - try - { - sub.getSendLock(); - sub.getQueue().unregisterSubscription(sub); - } - finally - { - sub.releaseSendLock(); - } + + sub.close(); } - _tag2SubscriptionMap.clear(); + _tag2SubscriptionTargetMap.clear(); } /** @@ -673,24 +679,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * @param entry the record of the message on the queue that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) - * @param subscription The consumer that is to acknowledge this message. + * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) { if (_logger.isDebugEnabled()) { - if (entry.getQueue() == null) - { - _logger.debug("Adding unacked message with a null queue:" + entry); - } - else - { - if (_logger.isDebugEnabled()) - { _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag - + ") with a queue(" + entry.getQueue() + ") for " + subscription); - } - } + + ") for " + consumer + " on " + entry.getOwningResource().getName()); + } _unacknowledgedMessageMap.add(deliveryTag, entry); @@ -713,7 +710,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered - Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); if (!messagesToBeDelivered.isEmpty()) { @@ -724,21 +721,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - for (QueueEntry unacked : messagesToBeDelivered) + for (MessageInstance unacked : messagesToBeDelivered) { - if (!unacked.isQueueDeleted()) - { - // Mark message redelivered - unacked.setRedelivered(); - - // Ensure message is released for redelivery - unacked.release(); + // Mark message redelivered + unacked.setRedelivered(); - } - else - { - unacked.delete(); - } + // Ensure message is released for redelivery + unacked.release(); } } @@ -752,7 +741,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ public void requeue(long deliveryTag) throws AMQException { - QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag); + MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { @@ -760,20 +749,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F unacked.setRedelivered(); // Ensure message is released for redelivery - if (!unacked.isQueueDeleted()) - { - - // Ensure message is released for redelivery - unacked.release(); - - } - else - { - _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked - + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + unacked.release(); - unacked.delete(); - } } else { @@ -786,10 +763,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean isMaxDeliveryCountEnabled(final long deliveryTag) { - final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { - final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); return maximumDeliveryCount > 0; } @@ -798,10 +775,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean isDeliveredTooManyTimes(final long deliveryTag) { - final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag); if (queueEntry != null) { - final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount(); final int numDeliveries = queueEntry.getDeliveryCount(); return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; } @@ -812,16 +789,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F /** * Called to resend all outstanding unacknowledged messages to this same channel. * - * @param requeue Are the messages to be requeued or dropped. - * * @throws AMQException When something goes wrong. */ - public void resend(final boolean requeue) throws AMQException + public void resend() throws AMQException { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); if (_logger.isDebugEnabled()) { @@ -833,9 +808,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, - requeue, - _messageStore)); + msgToResend + )); // Process Messages to Resend @@ -851,39 +825,20 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + MessageInstance message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. message.decrementDeliveryCount(); - AMQQueue queue = message.getQueue(); - // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. message.setRedelivered(); - Subscription sub = message.getDeliveredSubscription(); - - if (sub != null) - { - - if(!queue.resend(message,sub)) - { - msgToRequeue.put(deliveryTag, message); - } - } - else + if (!message.resend()) { - - if (_logger.isInfoEnabled()) - { - _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() - + ")to prevent loss"); - } - // move this message to requeue msgToRequeue.put(deliveryTag, message); } } // for all messages @@ -898,9 +853,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } // Process Messages to Requeue at the front of the queue - for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet()) { - QueueEntry message = entry.getValue(); + MessageInstance message = entry.getValue(); long deliveryTag = entry.getKey(); //Amend the delivery counter as the client hasn't seen these messages yet. @@ -926,11 +881,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple); + Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple); _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages)); } - private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple) + private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple) { return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple); @@ -976,9 +931,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { - s.getQueue().deliverAsync(s); + s.getConsumer().externalStateChange(); } } @@ -992,15 +947,15 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if (!wasSuspended) { // may need to deliver queued messages - for (Subscription s : _tag2SubscriptionMap.values()) + for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values()) { try { - s.getSendLock(); + s.getConsumer().getSendLock(); } finally { - s.releaseSendLock(); + s.getConsumer().releaseSendLock(); } } } @@ -1077,10 +1032,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F boolean requiresSuspend = _suspended.compareAndSet(false,true); // ensure all subscriptions have seen the change to the channel state - for(Subscription sub : _tag2SubscriptionMap.values()) + for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getSendLock(); - sub.releaseSendLock(); + sub.getConsumer().getSendLock(); + sub.getConsumer().releaseSendLock(); } try @@ -1098,16 +1053,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F postRollbackTask.run(); - for(QueueEntry entry : _resendList) + for(MessageInstance entry : _resendList) { - Subscription sub = entry.getDeliveredSubscription(); + Consumer sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); } else { - sub.getQueue().resend(entry, sub); + entry.resend(); } } _resendList.clear(); @@ -1115,9 +1070,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(requiresSuspend) { _suspended.set(false); - for(Subscription sub : _tag2SubscriptionMap.values()) + for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) { - sub.getQueue().deliverAsync(sub); + sub.getConsumer().externalStateChange(); } } @@ -1173,7 +1128,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1234,78 +1189,96 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction implements BaseQueue.PostEnqueueAction + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>> { public ImmediateAction() { } - public void onEnqueue(QueueEntry entry) + public void performAction(MessageInstance<C> entry) { - AMQQueue queue = entry.getQueue(); + TransactionLogResource queue = entry.getOwningResource(); if (!entry.getDeliveredToConsumer() && entry.acquire()) { ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) - { - @Override - public void postCommit() + MessageReference ref = message.newReference(); + try + { + entry.delete(); + txn.dequeue(queue, message, + new ServerTransaction.Action() { - try + @Override + public void postCommit() { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); + try + { + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } } - catch (AMQException e) + + @Override + public void onRollback() { - throw new RuntimeException(e); + } - super.postCommit(); } - } - ); - txn.commit(); + ); + txn.commit(); + } + finally + { + ref.release(); + } } else { - queue.checkCapacity(AMQChannel.this); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } } - private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction + private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>> { @Override - public void onEnqueue(final QueueEntry entry) + public void performAction(final MessageInstance<C> entry) { - AMQQueue queue = entry.getQueue(); - queue.checkCapacity(AMQChannel.this); + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(AMQChannel.this); + } } } private class MessageAcknowledgeAction implements ServerTransaction.Action { - private final Collection<QueueEntry> _ackedMessages; + private final Collection<MessageInstance> _ackedMessages; - public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) + public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages) { _ackedMessages = ackedMessages; } @@ -1314,7 +1287,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - for(QueueEntry entry : _ackedMessages) + for(MessageInstance entry : _ackedMessages) { entry.delete(); } @@ -1337,10 +1310,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - for(QueueEntry entry : _ackedMessages) - { - entry.release(); - } + for(MessageInstance entry : _ackedMessages) + { + entry.release(); + } } finally { @@ -1505,7 +1478,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); - final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag); + final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag); if (rejectedQueueEntry == null) { @@ -1514,36 +1487,42 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F else { final ServerMessage msg = rejectedQueueEntry.getMessage(); + final Consumer sub = rejectedQueueEntry.getDeliveredConsumer(); - int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction() + int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>() { @Override - public void onEnqueue(final QueueEntry requeueEntry) + public void performAction(final MessageInstance requeueEntry) { _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), - requeueEntry.getQueue().getName())); + requeueEntry.getOwningResource().getName())); } }, null); if(requeues == 0) { - final AMQQueue queue = rejectedQueueEntry.getQueue(); - - final Exchange altExchange = queue.getAlternateExchange(); - if (altExchange == null) + final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource(); + if(owningResource instanceof AMQQueue) { - _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + final AMQQueue queue = (AMQQueue) owningResource; - } - else - { - _logger.debug( - "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " - + deliveryTag); - _actor.message(_logSubject, - ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + final Exchange altExchange = queue.getAlternateExchange(); + + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + + } + else + { + _logger.debug( + "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + + deliveryTag); + _actor.message(_logSubject, + ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + } } } @@ -1604,6 +1583,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F @Override public int getConsumerCount() { - return _tag2SubscriptionMap.size(); + return _tag2SubscriptionTargetMap.size(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index c7a84fa3b6..e83e86981b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -94,8 +94,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Subscription sub, final ServerMessage message, + public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { @@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi props, _channelId, deliveryTag, - ((SubscriptionImpl)sub).getConsumerTag()); + new AMQShortString(sub.getName())); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 85d995518a..6bcd4b9d49 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java new file mode 100644 index 0000000000..2e362c11f8 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -0,0 +1,32 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.consumer.Consumer; + +public interface ClientDeliveryMethod +{ + void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, + final long deliveryTag) throws AMQException; +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java new file mode 100644 index 0000000000..47700f812f --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -0,0 +1,552 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag + * that was given out by the broker and the channel id. <p/> + */ +public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener +{ + + private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener = + new StateChangeListener<MessageInstance, MessageInstance.State>() + { + @Override + public void stateChanged(final MessageInstance entry, + final MessageInstance.State oldSate, + final MessageInstance.State newState) + { + if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED)) + { + restoreCredit(entry.getMessage()); + } + entry.removeStateChangeListener(this); + } + }; + + private final ClientDeliveryMethod _deliveryMethod; + private final RecordDeliveryMethod _recordMethod; + + private final AtomicLong _unacknowledgedCount = new AtomicLong(0); + private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); + private Consumer _consumer; + + + public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException + { + return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + + static final class BrowserConsumer extends ConsumerTarget_0_8 + { + public BrowserConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, + filters, creditManager, deliveryMethod, recordMethod); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(MessageInstance entry, boolean batch) throws AMQException + { + // We don't decrement the reference here as we don't want to consume the message + // but we do want to send it to the client. + + synchronized (getChannel()) + { + long deliveryTag = getChannel().getNextDeliveryTag(); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + } + + } + + @Override + public boolean allocateCredit(ServerMessage msg) + { + return true; + } + + } + + public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) throws AMQException + { + return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + + public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException + { + return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + + public static class NoAckConsumer extends ConsumerTarget_0_8 + { + private final AutoCommitTransaction _txn; + + public NoAckConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + + _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore()); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry The message to send + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(MessageInstance entry, boolean batch) throws AMQException + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. + + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP); + + ServerMessage message = entry.getMessage(); + MessageReference ref = message.newReference(); + InstanceProperties props = entry.getInstanceProperties(); + entry.delete(); + + synchronized (getChannel()) + { + getChannel().getProtocolSession().setDeferFlush(batch); + long deliveryTag = getChannel().getNextDeliveryTag(); + + sendToClient(message, props, deliveryTag); + + } + ref.release(); + + + } + + @Override + public boolean allocateCredit(ServerMessage msg) + { + return true; + } + + private static final ServerTransaction.Action NOOP = + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + } + + @Override + public void onRollback() + { + } + }; + } + + /** + * NoAck Subscription for use with BasicGet method. + */ + public static final class GetNoAckConsumer extends NoAckConsumer + { + public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession, + AMQShortString consumerTag, FieldTable filters, + boolean noLocal, FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + + public boolean allocateCredit(ServerMessage msg) + { + return getCreditManager().useCreditForMessage(msg.getSize()); + } + + } + + + public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager) + throws AMQException + { + return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); + } + + + public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod); + } + + static final class AckConsumer extends ConsumerTarget_0_8 + { + public AckConsumer(AMQChannel channel, + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + + /** + * This method can be called by each of the publisher threads. As a result all changes to the channel object must be + * thread safe. + * + * + * @param entry The message to send + * @param batch + * @throws org.apache.qpid.AMQException + */ + @Override + public void send(MessageInstance entry, boolean batch) throws AMQException + { + + + synchronized (getChannel()) + { + getChannel().getProtocolSession().setDeferFlush(batch); + long deliveryTag = getChannel().getNextDeliveryTag(); + + addUnacknowledgedMessage(entry); + recordMessageDelivery(entry, deliveryTag); + entry.addStateChangeListener(getReleasedStateChangeListener()); + sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + entry.incrementDeliveryCount(); + + } + } + + + + } + + + private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class); + + private final AMQChannel _channel; + + private final AMQShortString _consumerTag; + + private final FlowCreditManager _creditManager; + + private final Boolean _autoClose; + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + + public ConsumerTarget_0_8(AMQChannel channel, + AMQShortString consumerTag, + FieldTable arguments, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) + throws AMQException + { + super(State.ACTIVE); + + _channel = channel; + _consumerTag = consumerTag; + + _creditManager = creditManager; + creditManager.addStateListener(this); + + _deliveryMethod = deliveryMethod; + _recordMethod = recordMethod; + + if (arguments != null) + { + Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); + if (autoClose != null) + { + _autoClose = (Boolean) autoClose; + } + else + { + _autoClose = false; + } + } + else + { + _autoClose = false; + } + } + + public Consumer getConsumer() + { + return _consumer; + } + + @Override + public void consumerRemoved(final Consumer sub) + { + } + + @Override + public void consumerAdded(final Consumer sub) + { + _consumer = sub; + } + + public AMQSessionModel getSessionModel() + { + return _channel; + } + + public String toString() + { + String subscriber = "[channel=" + _channel + + ", consumerTag=" + _consumerTag + + ", session=" + getProtocolSession().getKey() ; + + return subscriber + "]"; + } + + public boolean isSuspended() + { + return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); + } + + /** + * Callback indicating that a queue has been deleted. + * + */ + public void queueDeleted() + { + _deleted.set(true); + } + + public boolean isAutoClose() + { + return _autoClose; + } + + public FlowCreditManager getCreditManager() + { + return _creditManager; + } + + + public boolean close() + { + boolean closed = false; + State state = getState(); + + getConsumer().getSendLock(); + try + { + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + _creditManager.removeListener(this); + return closed; + } + finally + { + getConsumer().releaseSendLock(); + } + } + + + public boolean allocateCredit(ServerMessage msg) + { + return _creditManager.useCreditForMessage(msg.getSize()); + } + + public AMQChannel getChannel() + { + return _channel; + } + + public AMQShortString getConsumerTag() + { + return _consumerTag; + } + + public AMQProtocolSession getProtocolSession() + { + return _channel.getProtocolSession(); + } + + public void restoreCredit(final ServerMessage message) + { + _creditManager.restoreCredit(1, message.getSize()); + } + + protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener() + { + return _entryReleaseListener; + } + + public void creditStateChanged(boolean hasCredit) + { + + if(hasCredit) + { + if(!updateState(State.SUSPENDED, State.ACTIVE)) + { + // this is a hack to get round the issue of increasing bytes credit + getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE); + } + } + else + { + updateState(State.ACTIVE, State.SUSPENDED); + } + } + + protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + throws AMQException + { + _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + + } + + + protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag) + { + _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag); + } + + + public void confirmAutoClose() + { + ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); + } + + public void queueEmpty() throws AMQException + { + if (isAutoClose()) + { + close(); + confirmAutoClose(); + } + } + + public void flushBatched() + { + _channel.getProtocolSession().setDeferFlush(false); + + _channel.getProtocolSession().flushBatched(); + } + + protected void addUnacknowledgedMessage(MessageInstance entry) + { + final long size = entry.getMessage().getSize(); + _unacknowledgedBytes.addAndGet(size); + _unacknowledgedCount.incrementAndGet(); + entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>() + { + public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) + { + if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED)) + { + _unacknowledgedBytes.addAndGet(-size); + _unacknowledgedCount.decrementAndGet(); + entry.removeStateChangeListener(this); + } + } + }); + } + + public long getUnacknowledgedBytes() + { + return _unacknowledgedBytes.longValue(); + } + + public long getUnacknowledgedMessages() + { + return _unacknowledgedCount.longValue(); + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 060aebdd65..1de1638c2e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -35,34 +32,28 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class); - private final Map<Long, QueueEntry> _msgToRequeue; - private final Map<Long, QueueEntry> _msgToResend; - private final boolean _requeueIfUnableToResend; + private final Map<Long, MessageInstance> _msgToRequeue; + private final Map<Long, MessageInstance> _msgToResend; private final UnacknowledgedMessageMap _unacknowledgedMessageMap; - private final MessageStore _transactionLog; public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap, - Map<Long, QueueEntry> msgToRequeue, - Map<Long, QueueEntry> msgToResend, - boolean requeueIfUnableToResend, - MessageStore txnLog) + Map<Long, MessageInstance> msgToRequeue, + Map<Long, MessageInstance> msgToResend) { _unacknowledgedMessageMap = unacknowledgedMessageMap; _msgToRequeue = msgToRequeue; _msgToResend = msgToResend; - _requeueIfUnableToResend = requeueIfUnableToResend; - _transactionLog = txnLog; } - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException { message.setRedelivered(); - final Subscription subscription = message.getDeliveredSubscription(); - if (subscription != null) + final Consumer consumer = message.getDeliveredConsumer(); + if (consumer != null) { // Consumer exists - if (!subscription.isClosed()) + if (!consumer.isClosed()) { _msgToResend.put(deliveryTag, message); } @@ -73,58 +64,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (!message.isQueueDeleted()) - { - if (_requeueIfUnableToResend) - { - _msgToRequeue.put(deliveryTag, message); - } - else - { - - dequeueEntry(message); - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); - } - } - else - { - dequeueEntry(message); - _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); - } + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); } // false means continue processing return false; } - - private void dequeueEntry(final QueueEntry node) - { - ServerTransaction txn = new AutoCommitTransaction(_transactionLog); - dequeueEntry(node, txn); - } - - private void dequeueEntry(final QueueEntry node, ServerTransaction txn) - { - txn.dequeue(node.getQueue(), node.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - node.delete(); - } - - public void onRollback() - { - - } - }); - } - public void visitComplete() { _unacknowledgedMessageMap.clear(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 5a9a51ff59..80c4c77b65 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -20,15 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.message.MessageDestination; import java.util.ArrayList; import java.util.List; @@ -38,7 +35,7 @@ public class IncomingMessage private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - private Exchange _exchange; + private MessageDestination _messageDestination; /** * Keeps a track of how many bytes we have received in body frames @@ -77,9 +74,9 @@ public class IncomingMessage return _messagePublishInfo.getExchange(); } - public Exchange getExchange() + public MessageDestination getDestination() { - return _exchange; + return _messageDestination; } public ContentHeaderBody getContentHeader() @@ -92,9 +89,9 @@ public class IncomingMessage return getContentHeader().getBodySize(); } - public void setExchange(final Exchange e) + public void setMessageDestination(final MessageDestination e) { - _exchange = e; + _messageDestination = e; } public int getBodyCount() throws AMQException diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index ead28c6e26..3665e7f135 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -105,7 +105,7 @@ public class MessageMetaData implements StorableMessageMetaData } - public int writeToBuffer(int offset, ByteBuffer dest) + public int writeToBuffer(ByteBuffer dest) { int oldPosition = dest.position(); try diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java new file mode 100644 index 0000000000..70d7da3432 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -0,0 +1,29 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.consumer.Consumer; + +public interface RecordDeliveryMethod +{ + void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java deleted file mode 100644 index 6646dc0cc2..0000000000 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; - -/** - * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory - * primarily assists testing although in future more sophisticated subscribers may need a different subscription - * implementation. - * - * @see org.apache.qpid.server.queue.AMQQueue - */ -public interface SubscriptionFactory -{ - Subscription createSubscription(int channel, - AMQProtocolSession protocolSession, - AMQShortString consumerTag, - boolean acks, - FieldTable filters, - boolean noLocal, FlowCreditManager creditManager) throws AMQException; - - - Subscription createSubscription(AMQChannel channel, - AMQProtocolSession protocolSession, - AMQShortString consumerTag, - boolean acks, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod clientMethod, - RecordDeliveryMethod recordMethod) throws AMQException; - - - Subscription createBasicGetNoAckSubscription(AMQChannel channel, - AMQProtocolSession session, - AMQShortString consumerTag, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException; - -} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java deleted file mode 100644 index 93b51a0567..0000000000 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; - -public class SubscriptionFactoryImpl implements SubscriptionFactory -{ - - public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession, - AMQShortString consumerTag, boolean acks, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager) throws AMQException - { - AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session"); - } - ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod(); - RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod(); - - - return createSubscription(channel, protocolSession, consumerTag, acks, filters, - noLocal, - creditManager, - clientMethod, - recordMethod - ); - } - - public Subscription createSubscription(final AMQChannel channel, - final AMQProtocolSession protocolSession, - final AMQShortString consumerTag, - final boolean acks, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod clientMethod, - final RecordDeliveryMethod recordMethod - ) - throws AMQException - { - boolean isBrowser; - - if (filters != null) - { - Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue()); - isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue(); - } - else - { - isBrowser = false; - } - - if(isBrowser) - { - return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); - } - else if(acks) - { - return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); - } - else - { - return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); - } - } - - public SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(final AMQChannel channel, - final AMQProtocolSession session, - final AMQShortString consumerTag, - final FieldTable filters, - final boolean noLocal, - final FlowCreditManager creditManager, - final ClientDeliveryMethod deliveryMethod, - final RecordDeliveryMethod recordMethod) throws AMQException - { - return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod); - } - - public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl(); - -} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java deleted file mode 100644 index 7c52fbe3b0..0000000000 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ /dev/null @@ -1,858 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.filter.FilterManagerFactory; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.SubscriptionActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag - * that was given out by the broker and the channel id. <p/> - */ -public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener -{ - - private StateListener _stateListener = new StateListener() - { - - public void stateChange(Subscription sub, State oldState, State newState) - { - - } - }; - - - private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE); - private volatile AMQQueue.Context _queueContext; - - private final ClientDeliveryMethod _deliveryMethod; - private final RecordDeliveryMethod _recordMethod; - - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - - private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - - private final Lock _stateChangeLock; - - private final long _subscriptionID; - private LogSubject _logSubject; - private LogActor _logActor; - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - - private final AtomicLong _unacknowledgedCount = new AtomicLong(0); - private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - - private long _createTime = System.currentTimeMillis(); - - - static final class BrowserSubscription extends SubscriptionImpl - { - public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return true; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - // We don't decrement the reference here as we don't want to consume the message - // but we do want to send it to the client. - - synchronized (getChannel()) - { - long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); - } - - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - } - - public static class NoAckSubscription extends SubscriptionImpl - { - private final AutoCommitTransaction _txn; - - public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - _txn = new AutoCommitTransaction(protocolSession.getVirtualHost().getMessageStore()); - } - - - public boolean isBrowser() - { - return false; - } - - @Override - public boolean isExplicitAcknowledge() - { - return false; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry The message to send - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. - - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. - - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - _txn.dequeue(getQueue(), entry.getMessage(), NOOP); - - ServerMessage message = entry.getMessage(); - MessageReference ref = message.newReference(); - InstanceProperties props = entry.getInstanceProperties(); - entry.delete(); - - synchronized (getChannel()) - { - getChannel().getProtocolSession().setDeferFlush(batch); - long deliveryTag = getChannel().getNextDeliveryTag(); - - sendToClient(message, props, deliveryTag); - - } - ref.release(); - - - } - - @Override - public boolean wouldSuspend(QueueEntry msg) - { - return false; - } - - private static final ServerTransaction.Action NOOP = - new ServerTransaction.Action() - { - @Override - public void postCommit() - { - } - - @Override - public void onRollback() - { - } - }; - } - - /** - * NoAck Subscription for use with BasicGet method. - */ - public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription - { - public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - public boolean isTransient() - { - return true; - } - - public boolean wouldSuspend(QueueEntry msg) - { - return !getCreditManager().useCreditForMessage(msg.getMessage().getSize()); - } - - } - - static final class AckSubscription extends SubscriptionImpl - { - public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); - } - - - public boolean isBrowser() - { - return false; - } - - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry The message to send - * @param batch - * @throws AMQException - */ - @Override - public void send(QueueEntry entry, boolean batch) throws AMQException - { - - - synchronized (getChannel()) - { - getChannel().getProtocolSession().setDeferFlush(batch); - long deliveryTag = getChannel().getNextDeliveryTag(); - - addUnacknowledgedMessage(entry); - recordMessageDelivery(entry, deliveryTag); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); - entry.incrementDeliveryCount(); - - } - } - - - - } - - - private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); - - private final AMQChannel _channel; - - private final AMQShortString _consumerTag; - - - private boolean _noLocal; - - private final FlowCreditManager _creditManager; - - private FilterManager _filters; - - private final Boolean _autoClose; - - private AMQQueue _queue; - private final AtomicBoolean _deleted = new AtomicBoolean(false); - - - - - public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable arguments, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) - throws AMQException - { - _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); - _channel = channel; - _consumerTag = consumerTag; - - _creditManager = creditManager; - creditManager.addStateListener(this); - - _noLocal = noLocal; - - - _filters = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); - - _deliveryMethod = deliveryMethod; - _recordMethod = recordMethod; - - - _stateChangeLock = new ReentrantLock(); - - - if (arguments != null) - { - Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue()); - if (autoClose != null) - { - _autoClose = (Boolean) autoClose; - } - else - { - _autoClose = false; - } - } - else - { - _autoClose = false; - } - - } - - public AMQSessionModel getSessionModel() - { - return _channel; - } - - public Long getDelivered() - { - return _deliveredCount.get(); - } - - public synchronized void setQueue(AMQQueue queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - _logSubject = new SubscriptionLogSubject(this); - _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this); - - if (CurrentActor.get().getRootMessageLogger(). - isMessageEnabled(CurrentActor.get(), _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - // Get the string value of the filters - String filterLogString = null; - if (_filters != null && _filters.hasFilters()) - { - filterLogString = _filters.toString(); - } - - if (isAutoClose()) - { - if (filterLogString == null) - { - filterLogString = ""; - } - else - { - filterLogString += ","; - } - filterLogString += "AutoClose"; - } - - if (isBrowser()) - { - // We do not need to check for null here as all Browsers are AutoClose - filterLogString +=",Browser"; - } - - CurrentActor.get(). - message(_logSubject, - SubscriptionMessages.CREATE(filterLogString, - queue.isDurable() && exclusive, - filterLogString != null)); - } - } - - public String toString() - { - String subscriber = "[channel=" + _channel + - ", consumerTag=" + _consumerTag + - ", session=" + getProtocolSession().getKey() ; - - return subscriber + "]"; - } - - /** - * This method can be called by each of the publisher threads. As a result all changes to the channel object must be - * thread safe. - * - * - * @param entry - * @param batch - * @throws AMQException - */ - abstract public void send(QueueEntry entry, boolean batch) throws AMQException; - - - public boolean isSuspended() - { - return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); - } - - /** - * Callback indicating that a queue has been deleted. - * - * @param queue The queue to delete - */ - public void queueDeleted(AMQQueue queue) - { - _deleted.set(true); - } - - public boolean hasInterest(QueueEntry entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(getSubscriptionID())) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Subscription:" + this + " rejected message:" + entry); - } - } - - if(entry.getMessage() instanceof AMQMessage) - { - if (_noLocal) - { - AMQMessage message = (AMQMessage) entry.getMessage(); - - final Object publisherReference = message.getConnectionReference(); - - // We don't want local messages so check to see if message is one we sent - Object localReference = getProtocolSession().getReference(); - - if(publisherReference != null && publisherReference.equals(localReference)) - { - return false; - } - } - } - else - { - // No interest in messages we can't convert to AMQMessage - if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null) - { - return false; - } - } - - - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + this + ") checking filters for message (" + entry); - } - return checkFilters(entry); - - } - - private boolean checkFilters(QueueEntry msg) - { - return (_filters == null) || _filters.allAllow(msg.asFilterable()); - } - - public boolean isAutoClose() - { - return _autoClose; - } - - public FlowCreditManager getCreditManager() - { - return _creditManager; - } - - - public void close() - { - boolean closed = false; - State state = getState(); - - _stateChangeLock.lock(); - try - { - while(!closed && state != State.CLOSED) - { - closed = _state.compareAndSet(state, State.CLOSED); - if(!closed) - { - state = getState(); - } - else - { - _stateListener.stateChange(this,state, State.CLOSED); - } - } - _creditManager.removeListener(this); - } - finally - { - _stateChangeLock.unlock(); - } - //Log Subscription closed - CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE()); - } - - public boolean isClosed() - { - return getState() == State.CLOSED; - } - - - public boolean wouldSuspend(QueueEntry msg) - { - return !_creditManager.useCreditForMessage(msg.getMessage().getSize()); - } - - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public AMQChannel getChannel() - { - return _channel; - } - - public AMQShortString getConsumerTag() - { - return _consumerTag; - } - - public String getConsumerName() - { - return _consumerTag == null ? null : _consumerTag.asString(); - } - - public long getSubscriptionID() - { - return _subscriptionID; - } - - public AMQProtocolSession getProtocolSession() - { - return _channel.getProtocolSession(); - } - - public LogActor getLogActor() - { - return _logActor; - } - - public AMQQueue getQueue() - { - return _queue; - } - - public void onDequeue(final QueueEntry queueEntry) - { - restoreCredit(queueEntry); - } - - public void releaseQueueEntry(final QueueEntry queueEntry) - { - restoreCredit(queueEntry); - } - - public void restoreCredit(final QueueEntry queueEntry) - { - _creditManager.restoreCredit(1, queueEntry.getSize()); - } - - public void creditStateChanged(boolean hasCredit) - { - - if(hasCredit) - { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } - else - { - // this is a hack to get round the issue of increasing bytes credit - _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE); - } - } - else - { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } - } - CurrentActor.get().message(_logSubject,SubscriptionMessages.STATE(_state.get().toString())); - } - - public State getState() - { - return _state.get(); - } - - - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context context) - { - _queueContext = context; - } - - - protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) - throws AMQException - { - _deliveryMethod.deliverToClient(this, message, props, deliveryTag); - _deliveredCount.incrementAndGet(); - _deliveredBytes.addAndGet(message.getSize()); - } - - - protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag) - { - _recordMethod.recordMessageDelivery(this,entry,deliveryTag); - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void confirmAutoClose() - { - ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag()); - } - - public boolean acquires() - { - return !isBrowser(); - } - - public boolean seesRequeues() - { - return !isBrowser(); - } - - public boolean isTransient() - { - return false; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - - public void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } - - abstract boolean isBrowser(); - - public String getCreditMode() - { - return "WINDOW"; - } - - public boolean isBrowsing() - { - return isBrowser(); - } - - public boolean isExplicitAcknowledge() - { - return true; - } - - public boolean isDurable() - { - return false; - } - - public boolean isExclusive() - { - return getQueue().hasExclusiveSubscriber(); - } - - public String getName() - { - return String.valueOf(_consumerTag); - } - - public Map<String, Object> getArguments() - { - return null; - } - - public boolean isSessionTransactional() - { - return _channel.isTransactional(); - } - - public long getCreateTime() - { - return _createTime; - } - - public void queueEmpty() throws AMQException - { - if (isAutoClose()) - { - _queue.unregisterSubscription(this); - - confirmAutoClose(); - } - } - - public void flushBatched() - { - _channel.getProtocolSession().setDeferFlush(false); - - _channel.getProtocolSession().flushBatched(); - } - - public long getBytesOut() - { - return _deliveredBytes.longValue(); - } - - public long getMessagesOut() - { - return _deliveredCount.longValue(); - } - - - protected void addUnacknowledgedMessage(QueueEntry entry) - { - final long size = entry.getSize(); - _unacknowledgedBytes.addAndGet(size); - _unacknowledgedCount.incrementAndGet(); - entry.addStateChangeListener(new QueueEntry.StateChangeListener() - { - public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) - { - if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED)) - { - _unacknowledgedBytes.addAndGet(-size); - _unacknowledgedCount.decrementAndGet(); - entry.removeStateChangeListener(this); - } - } - }); - } - - public long getUnacknowledgedBytes() - { - return _unacknowledgedBytes.longValue(); - } - - public long getUnacknowledgedMessages() - { - return _unacknowledgedCount.longValue(); - } -} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java index 1d41bcdcf4..fcbbadd507 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import java.util.Collection; @@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMap *@param message the message being iterated over @return true to stop iteration, false to continue * @throws AMQException */ - boolean callback(final long deliveryTag, QueueEntry message) throws AMQException; + boolean callback(final long deliveryTag, MessageInstance message) throws AMQException; void visitComplete(); } void visit(Visitor visitor) throws AMQException; - void add(long deliveryTag, QueueEntry message); + void add(long deliveryTag, MessageInstance message); - QueueEntry remove(long deliveryTag); + MessageInstance remove(long deliveryTag); - Collection<QueueEntry> cancelAllMessages(); + Collection<MessageInstance> cancelAllMessages(); int size(); void clear(); - QueueEntry get(long deliveryTag); + MessageInstance get(long deliveryTag); /** * Get the set of delivery tags that are outstanding. @@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMap */ Set<Long> getDeliveryTags(); - Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple); + Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java index 17b2c7b985..8d70e769d3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.QueueEntry; import java.util.Collection; @@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap private long _unackedSize; - private Map<Long, QueueEntry> _map; + private Map<Long, MessageInstance> _map; private long _lastDeliveryTag; @@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap public UnacknowledgedMessageMapImpl(int prefetchLimit) { _prefetchLimit = prefetchLimit; - _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit); + _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit); } - public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs) + public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs) { if (multiple) { @@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } else { - final QueueEntry entry = get(deliveryTag); + final MessageInstance entry = get(deliveryTag); if(entry != null) { msgs.put(deliveryTag, entry); @@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } - public void remove(Map<Long,QueueEntry> msgs) + public void remove(Map<Long,MessageInstance> msgs) { synchronized (_lock) { @@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public QueueEntry remove(long deliveryTag) + public MessageInstance remove(long deliveryTag) { synchronized (_lock) { - QueueEntry message = _map.remove(deliveryTag); + MessageInstance message = _map.remove(deliveryTag); if(message != null) { _unackedSize -= message.getMessage().getSize(); @@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet(); - for (Map.Entry<Long, QueueEntry> entry : currentEntries) + Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet(); + for (Map.Entry<Long, MessageInstance> entry : currentEntries) { visitor.callback(entry.getKey().longValue(), entry.getValue()); } @@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void add(long deliveryTag, QueueEntry message) + public void add(long deliveryTag, MessageInstance message) { synchronized (_lock) { @@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public Collection<QueueEntry> cancelAllMessages() + public Collection<MessageInstance> cancelAllMessages() { synchronized (_lock) { - Collection<QueueEntry> currentEntries = _map.values(); - _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit); + Collection<MessageInstance> currentEntries = _map.values(); + _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit); _unackedSize = 0l; return currentEntries; } @@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public QueueEntry get(long key) + public MessageInstance get(long key) { synchronized (_lock) { @@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple) + public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple) { - Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>(); + Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>(); collect(deliveryTag, multiple, ackedMessageMap); remove(ackedMessageMap); return ackedMessageMap.values(); } - private void collect(long key, Map<Long, QueueEntry> msgs) + private void collect(long key, Map<Long, MessageInstance> msgs) { synchronized (_lock) { - for (Map.Entry<Long, QueueEntry> entry : _map.entrySet()) + for (Map.Entry<Long, MessageInstance> entry : _map.entrySet()) { msgs.put(entry.getKey(),entry.getValue()); if (entry.getKey() == key) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 836de44f4e..526bc9b9fe 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic " args:" + body.getArguments()); } - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); + MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); if (queue == null) { @@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) { - AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(), - body.getArguments(), body.getNoLocal(), body.getExclusive()); + AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, + queue, + !body.getNoAck(), + body.getArguments(), + body.getExclusive()); if (!body.getNowait()) { MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); @@ -156,14 +160,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } - catch (AMQQueue.ExistingExclusiveSubscription e) + catch (AMQQueue.ExistingExclusiveConsumer e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " + queue.getName() + " as it already has an existing exclusive consumer"); } - catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + catch (AMQQueue.ExistingConsumerPreventsExclusive e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue " diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 4b569ccc71..d4bd486a99 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.EnumSet; + public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -128,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { @Override - public void deliverToClient(final Subscription sub, final ServerMessage message, final + public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { @@ -145,25 +149,32 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; - Subscription sub; + ConsumerTarget_0_8 target; + EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES); if(acks) { - sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + + target = ConsumerTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } else { - sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + target = ConsumerTarget_0_8.createNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } - queue.registerSubscription(sub,false); - queue.flushSubscription(sub); - queue.unregisterSubscription(sub); + Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + sub.flush(); + sub.close(); return(!singleMessageCredit.hasCredit()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 497e97db3e..f8a7722447 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -67,7 +68,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basic } VirtualHost vHost = session.getVirtualHost(); - Exchange exch = vHost.getExchange(exchangeName.toString()); + MessageDestination exch = vHost.getMessageDestination(exchangeName.toString()); // if the exchange does not exist we raise a channel exception if (exch == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java index 0a79466b35..606bcf1693 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java @@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(channelId); } - channel.resend(body.getRequeue()); + channel.resend(); // Qpid 0-8 hacks a synchronous -ok onto recover. // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java index b54e1c7dcf..ef26e60a62 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java @@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<B throw body.getChannelNotFoundException(channelId);
}
channel.sync();
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index d4d8c9aaef..fdbd44b06d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; @@ -65,7 +66,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR long deliveryTag = body.getDeliveryTag(); - QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); if (message == null) { @@ -73,16 +74,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR } else { - if (message.isQueueDeleted()) - { - _logger.warn("Message's Queue has already been purged, dropping message"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - if(message != null) - { - message.delete(); - } - return; - } if (message.getMessage() == null) { @@ -98,41 +89,43 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR " on channel:" + channel.debugIdentity()); } - message.reject(); - if (body.getRequeue()) { - channel.requeue(deliveryTag); - //this requeue represents a message rejected from the pre-dispatch queue //therefore we need to amend the delivery counter. message.decrementDeliveryCount(); + + channel.requeue(deliveryTag); } else { - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); - _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); - if (maxDeliveryCountEnabled) - { - final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); - _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); - if (deliveredTooManyTimes) - { - channel.deadLetter(body.getDeliveryTag()); - } - else - { - //this requeue represents a message rejected because of a recover/rollback that we - //are not ready to DLQ. We rely on the reject command to resend from the unacked map - //and therefore need to increment the delivery counter so we cancel out the effect - //of the AMQChannel#resend() decrement. - message.incrementDeliveryCount(); - } - } - else - { - channel.requeue(deliveryTag); - } + // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here + // as it would prevent redelivery + // message.reject(); + + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); + if (deliveredTooManyTimes) + { + channel.deadLetter(body.getDeliveryTag()); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + channel.requeue(deliveryTag); + } } } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 3fdce83c2a..263175d590 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; @@ -134,8 +135,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } }; protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException + queue.addQueueDeleteTask(new Action<AMQQueue>() { + public void performAction(AMQQueue queue) { protocolConnection.removeSessionCloseTask(sessionCloseTask); } @@ -245,9 +246,9 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar session.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { session.removeSessionCloseTask(deleteQueueTask); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java index 19d0da007b..69ad1a0a21 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java @@ -74,7 +74,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(false); + channel.resend(); } catch (AMQException e) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 2243cbff11..c805956b83 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Set; /** @@ -47,7 +48,8 @@ import java.util.Set; */ public class AckTest extends QpidTestCase { - private Subscription _subscription; + private ConsumerTarget_0_8 _subscriptionTarget; + private Consumer _consumer; private AMQProtocolSession _protocolSession; @@ -86,7 +88,6 @@ public class AckTest extends QpidTestCase private void publishMessages(int count, boolean persistent) throws AMQException { - _queue.registerSubscription(_subscription,false); for (int i = 1; i <= count; i++) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -144,7 +145,7 @@ public class AckTest extends QpidTestCase try { - _queue.enqueue(message); + _queue.enqueue(message,null); } catch (AMQException e) { @@ -178,7 +179,13 @@ public class AckTest extends QpidTestCase */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -190,8 +197,8 @@ public class AckTest extends QpidTestCase { assertTrue(deliveryTag == i); i++; - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); } } @@ -202,7 +209,16 @@ public class AckTest extends QpidTestCase public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, + null, + AMQMessage.class, + DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -218,7 +234,13 @@ public class AckTest extends QpidTestCase public void testPersistentNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -235,7 +257,15 @@ public class AckTest extends QpidTestCase */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -248,8 +278,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) { @@ -264,7 +294,15 @@ public class AckTest extends QpidTestCase */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -279,8 +317,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); ++i; } } @@ -290,7 +328,15 @@ public class AckTest extends QpidTestCase */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, + DEFAULT_CONSUMER_TAG, + null, + new LimitlessCreditManager()); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); + final int msgCount = 10; publishMessages(msgCount); @@ -303,8 +349,8 @@ public class AckTest extends QpidTestCase for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); + MessageInstance unackedMsg = map.get(deliveryTag); + assertTrue(unackedMsg.getOwningResource() == _queue); ++i; } } @@ -319,12 +365,16 @@ public class AckTest extends QpidTestCase // Send 10 messages Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, - DEFAULT_CONSUMER_TAG, true, null, false, creditManager); + + _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); + _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); + final int msgCount = 1; publishMessages(msgCount); - _queue.deliverAsync(_subscription); + _consumer.externalStateChange(); _channel.acknowledgeMessage(1, false); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index bb5fecdfb4..281f7345ff 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -140,7 +140,7 @@ public class AcknowledgeTest extends QpidTestCase assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); //Subscribe to the queue - AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); + AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true); getQueue().deliverAsync(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index 36a57fa05f..aa5a75396a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.QueueEntryIterator; -import org.apache.qpid.server.queue.SimpleQueueEntryList; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.subscription.MockSubscription; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * QPID-1385 : Race condition between added to unacked map and resending due to a rollback. * @@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest extends TestCase private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap; private static final int INITIAL_MSG_COUNT = 10; - private AMQQueue _queue = new MockAMQQueue(getName()); - private MessageStore _messageStore = new TestMemoryMessageStore(); - private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>(); + private AMQQueue _queue; + private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>(); + private Consumer _consumer; + private boolean _queueDeleted; @Override public void setUp() throws AMQException { + _queueDeleted = false; _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100); + _queue = mock(AMQQueue.class); + when(_queue.getName()).thenReturn(getName()); + when(_queue.isDeleted()).thenReturn(_queueDeleted); + _consumer = mock(Consumer.class); + when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement()); + long id = 0; - SimpleQueueEntryList list = new SimpleQueueEntryList(_queue); // Add initial messages to QueueEntryList for (int count = 0; count < INITIAL_MSG_COUNT; count++) { - AMQMessage msg = new MockAMQMessage(id); - - list.add(msg); - + ServerMessage msg = mock(ServerMessage.class); + when(msg.getMessageNumber()).thenReturn(id); + final QueueEntry entry = mock(QueueEntry.class); + when(entry.getMessage()).thenReturn(msg); + when(entry.getQueue()).thenReturn(_queue); + when(entry.isQueueDeleted()).thenReturn(_queueDeleted); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + when(entry.isDeleted()).thenReturn(true); + return null; + } + }).when(entry).delete(); + + _unacknowledgedMessageMap.add(id, entry); + _referenceList.add(entry); //Increment ID; id++; } - // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList - QueueEntryIterator queueEntries = list.iterator(); - while(queueEntries.advance()) - { - QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); - - // Store the entry for future inspection - _referenceList.add(entry); - } - assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } @@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest extends TestCase * * @return Subscription that performed the acquire */ - private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList) + private void acquireMessages(LinkedList<MessageInstance> messageList) { - Subscription subscription = new MockSubscription(); - // Aquire messages in subscription - for (QueueEntry entry : messageList) + // Acquire messages in subscription + for(MessageInstance entry : messageList) { - entry.acquire(subscription); + when(entry.getDeliveredConsumer()).thenReturn(_consumer); } - - return subscription; } /** @@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest extends TestCase public void testResend() throws AMQException { //We don't need the subscription object here. - createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); // requeueIfUnableToResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); + msgToResend)); assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size()); assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); @@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest extends TestCase */ public void testRequeueDueToSubscriptionClosure() throws AMQException { - Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList); + acquireMessages(_referenceList); // Close subscription - subscription.close(); + when(_consumer.isClosed()).thenReturn(true); - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); + final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>(); + final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>(); // requeueIfUnableToResend doesn't matter here. _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); + msgToResend)); assertEquals("Message count for resend not correct.", 0, msgToResend.size()); assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); } - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued - * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map. - * - * @throws AMQException the visit interface throws this - */ - - public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnableToResend = true so all messages should go to msgToRequeue - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, true, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - } - - /** - * If the subscription is null, due to message being retrieved via a GET, And we request that we don't - * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue. - * - * @throws AMQException the visit interface throws this - */ - - public void testDrop() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - - } - - /** - * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was - * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the - * future we may wish to dead letter the message. - * - * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted - * - * @throws AMQException the visit interface throws this - */ - public void testDiscard() throws AMQException - { - final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>(); - final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>(); - - _queue.delete(); - - // requeueIfUnableToResend : value doesn't matter here as queue has been deleted - _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue, - msgToResend, false, _messageStore)); - - assertEquals("Message count for resend not correct.", 0, msgToResend.size()); - assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size()); - assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size()); - - for (QueueEntry entry : _referenceList) - { - assertTrue("Message was not discarded", entry.isDeleted()); - } - } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index ef0837b3c6..1fad8fb41f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -47,11 +47,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -60,7 +58,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class); // ChannelID(LIST) -> LinkedList<Pair> - private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; + private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers; private AtomicInteger _deliveryCount = new AtomicInteger(0); private static final AtomicLong ID_GENERATOR = new AtomicLong(0); @@ -68,7 +66,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null); - _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); + _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>(); setTestAuthorizedSubject(); setVirtualHost(virtualHost); @@ -117,7 +115,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { synchronized (_channelDelivers) { - List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag); + List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag)); if (all == null) { @@ -153,23 +151,23 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr synchronized (_channelDelivers) { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); + Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); if (consumers == null) { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + consumers = new HashMap<String, LinkedList<DeliveryPair>>(); _channelDelivers.put(channelId, consumers); } - LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); + LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag)); if (consumerDelivers == null) { consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(consumerTag, consumerDelivers); + consumers.put(consumerTag.toString(), consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg)); + consumerDelivers.add(new DeliveryPair(deliveryTag, msg)); } } @@ -247,27 +245,27 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Subscription sub, ServerMessage message, + public void deliverToClient(Consumer sub, ServerMessage message, InstanceProperties props, long deliveryTag) throws AMQException { _deliveryCount.incrementAndGet(); synchronized (_channelDelivers) { - Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); + Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); if (consumers == null) { - consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + consumers = new HashMap<String, LinkedList<DeliveryPair>>(); _channelDelivers.put(_channelId, consumers); } - LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag()); + LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName()); if (consumerDelivers == null) { consumerDelivers = new LinkedList<DeliveryPair>(); - consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers); + consumers.put(sub.getName(), consumerDelivers); } consumerDelivers.add(new DeliveryPair(deliveryTag, message)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 8c716a0b56..e895f81c44 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -130,8 +130,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase //Check the process didn't suspend the subscription as this would // indicate we are using the prefetch credit. i.e. using acks not No-Ack assertTrue("The subscription has been suspended", - !getChannel().getSubscription(browser).getState() - .equals(Subscription.State.SUSPENDED)); + !getChannel().getSubscription(browser).isSuspended()); } private void checkStoreContents(int messageCount) @@ -144,6 +143,6 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase FieldTable filters = new FieldTable(); filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - return channel.subscribeToQueue(null, queue, true, filters, false, true); + return channel.consumeFromSource(null, queue, true, filters, true); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java deleted file mode 100644 index e0d1b28007..0000000000 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class SubscriptionFactoryImplTest extends QpidTestCase -{ - private AMQChannel _channel; - private AMQProtocolSession _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper_0_8.createChannel(); - _session = _channel.getProtocolSession(); - GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - /** - * Tests that while creating Subscriptions of various types, the - * ID numbers assigned are allocated from a common sequence - * (in increasing order). - */ - public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception - { - //create a No-Ack subscription, get the first Subscription ID - long previousId = 0; - Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); - previousId = noAckSub.getSubscriptionID(); - - //create an ack subscription, verify the next Subscription ID is used - Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); - previousId = ackSub.getSubscriptionID(); - - //create a browser subscription - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - Subscription browserSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, browserSub.getSubscriptionID()); - previousId = browserSub.getSubscriptionID(); - - //create an BasicGet NoAck subscription - Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, - _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); - previousId = getNoAckSub.getSubscriptionID(); - - } - -} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 4082f22e9c..41e2fef03f 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; @@ -53,16 +54,8 @@ public class Connection_1_0 implements ConnectionEventListener private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); - - - public static interface Task - { - public void doTask(Connection_1_0 connection); - } - - - private List<Task> _closeTasks = - Collections.synchronizedList(new ArrayList<Task>()); + private List<Action<Connection_1_0>> _closeTasks = + Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>()); @@ -98,26 +91,26 @@ public class Connection_1_0 implements ConnectionEventListener _sessions.remove(session); } - void removeConnectionCloseTask(final Task task) + void removeConnectionCloseTask(final Action<Connection_1_0> task) { _closeTasks.remove( task ); } - void addConnectionCloseTask(final Task task) + void addConnectionCloseTask(final Action<Connection_1_0> task) { _closeTasks.add( task ); } public void closeReceived() { - List<Task> taskCopy; + List<Action<Connection_1_0>> taskCopy; synchronized (_closeTasks) { - taskCopy = new ArrayList<Task>(_closeTasks); + taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks); } - for(Task task : taskCopy) + for(Action<Connection_1_0> task : taskCopy) { - task.doTask(this); + task.performAction(this); } synchronized (_closeTasks) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 6a3f5b46e1..027c40aabe 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.server.protocol.v1_0; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; @@ -41,153 +35,84 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Modified; import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.server.plugin.MessageConverter; -import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; -class - Subscription_1_0 implements Subscription -{ - private SendingLink_1_0 _link; - - private AMQQueue _queue; - - private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED); +import java.nio.ByteBuffer; +import java.util.List; - private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); - private final long _id; +class ConsumerTarget_1_0 extends AbstractConsumerTarget +{ private final boolean _acquires; - private volatile AMQQueue.Context _queueContext; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); - private ReentrantLock _stateChangeLock = new ReentrantLock(); - - private boolean _noLocal; - private FilterManager _filters; + private SendingLink_1_0 _link; private long _deliveryTag = 0L; - private StateListener _stateListener; private Binary _transactionId; - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); - private SectionEncoder _sectionEncoder = new SectionEncoderImpl(_typeRegistry); - - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination) - { - this(link, destination, ((Source)link.getEndpoint().getSource()).getDistributionMode() != StdDistMode.COPY); - } + private final AMQPDescribedTypeRegistry _typeRegistry; + private final SectionEncoder _sectionEncoder; + private Consumer _consumer; - public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination, boolean acquires) + public ConsumerTarget_1_0(final SendingLink_1_0 link, + boolean acquires) { + super(State.SUSPENDED); _link = link; - _queue = destination.getQueue(); - _id = getEndpoint().getLocalHandle().longValue(); + _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry(); + _sectionEncoder = new SectionEncoderImpl(_typeRegistry); _acquires = acquires; } - private SendingLinkEndpoint getEndpoint() - { - return _link.getEndpoint(); - } - - public LogActor getLogActor() - { - return null; //TODO - } - - public boolean isTransient() - { - return true; //TODO - } - - public AMQQueue getQueue() + public Consumer getConsumer() { - return _queue; + return _consumer; } - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return _owningState; - } - - public void setQueue(final AMQQueue queue, final boolean exclusive) - { - //TODO - } - - public void setNoLocal(final boolean noLocal) - { - _noLocal = noLocal; - } - - public long getSubscriptionID() + private SendingLinkEndpoint getEndpoint() { - return _id; + return _link.getEndpoint(); } public boolean isSuspended() { - return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); } - public boolean hasInterest(final QueueEntry entry) + public boolean close() { - if(_noLocal && entry.getMessage().getConnectionReference() == getSession().getConnection().getReference()) + boolean closed = false; + State state = getState(); + + getConsumer().getSendLock(); + try { - return false; + while(!closed && state != State.CLOSED) + { + closed = updateState(state, State.CLOSED); + if(!closed) + { + state = getState(); + } + } + return closed; } - else if(!(entry.getMessage() instanceof Message_1_0) - && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + finally { - return false; + getConsumer().releaseSendLock(); } - return checkFilters(entry); - - } - - private boolean checkFilters(final QueueEntry entry) - { - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - public boolean isClosed() - { - return !getEndpoint().isAttached(); - } - - public boolean acquires() - { - return _acquires; - } - - public boolean seesRequeues() - { - // TODO - return acquires(); - } - - public void close() - { - getEndpoint().detach(); } - public void send(QueueEntry entry, boolean batch) throws AMQException + public void send(MessageInstance entry, boolean batch) throws AMQException { // TODO send(entry); @@ -198,7 +123,7 @@ class // TODO } - public void send(final QueueEntry queueEntry) throws AMQException + public void send(final MessageInstance queueEntry) throws AMQException { ServerMessage serverMessage = queueEntry.getMessage(); Message_1_0 message; @@ -209,7 +134,7 @@ class else { final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class); - message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost()); + message = (Message_1_0) converter.convert(serverMessage, _link.getVirtualHost()); } Transfer transfer = new Transfer(); @@ -329,7 +254,7 @@ class public void onRollback() { - if(queueEntry.isAcquiredBy(Subscription_1_0.this)) + if(queueEntry.isAcquiredBy(getConsumer())) { queueEntry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -352,14 +277,14 @@ class } - public void queueDeleted(final AMQQueue queue) + public void queueDeleted() { //TODO getEndpoint().setSource(null); getEndpoint().detach(); } - public boolean wouldSuspend(final QueueEntry msg) + public boolean allocateCredit(final ServerMessage msg) { synchronized (_link.getLock()) { @@ -369,103 +294,32 @@ class suspend(); } - return !hasCredit; + return hasCredit; } } - public boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } public void suspend() { synchronized(_link.getLock()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } + updateState(State.ACTIVE, State.SUSPENDED); } } - public void getSendLock() - { - _stateChangeLock.lock(); - } - - public void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public void releaseQueueEntry(QueueEntry queueEntryImpl) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void onDequeue(final QueueEntry queueEntry) + public void restoreCredit(final ServerMessage message) { //TODO } - public void restoreCredit(final QueueEntry queueEntry) - { - //TODO - } - - public void setStateListener(final StateListener listener) - { - _stateListener = listener; - } - - public State getState() - { - return _state.get(); - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - - public boolean isActive() - { - return getState() == State.ACTIVE; - } - - public void set(String key, Object value) - { - _properties.put(key, value); - } - - public Object get(String key) - { - return _properties.get(key); - } - - public boolean isSessionTransactional() - { - return false; //TODO - } - public void queueEmpty() { synchronized(_link.getLock()) { if(_link.drained()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) - { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); - } + updateState(State.ACTIVE, State.SUSPENDED); } } } @@ -476,10 +330,7 @@ class { if(isSuspended() && getEndpoint() != null) { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) - { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); - } + updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); } } @@ -493,10 +344,10 @@ class private class DispositionAction implements UnsettledAction { - private final QueueEntry _queueEntry; + private final MessageInstance _queueEntry; private final Binary _deliveryTag; - public DispositionAction(Binary tag, QueueEntry queueEntry) + public DispositionAction(Binary tag, MessageInstance queueEntry) { _deliveryTag = tag; _queueEntry = queueEntry; @@ -527,13 +378,13 @@ class if(outcome instanceof Accepted) { - txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(), + txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { public void postCommit() { - if(_queueEntry.isAcquiredBy(Subscription_1_0.this)) + if(_queueEntry.isAcquiredBy(getConsumer())) { _queueEntry.delete(); } @@ -618,7 +469,7 @@ class private class DoNothingAction implements UnsettledAction { public DoNothingAction(final Binary tag, - final QueueEntry queueEntry) + final MessageInstance queueEntry) { } @@ -640,35 +491,22 @@ class } } - public FilterManager getFilters() - { - return _filters; - } - - public void setFilters(final FilterManager filters) - { - _filters = filters; - } - @Override public AMQSessionModel getSessionModel() { - // TODO return getSession(); } @Override - public long getBytesOut() + public void consumerAdded(final Consumer sub) { - // TODO - return 0; + _consumer = sub; } @Override - public long getMessagesOut() + public void consumerRemoved(final Consumer sub) { - // TODO - return 0; + } @Override @@ -685,10 +523,4 @@ class return 0; } - @Override - public String getConsumerName() - { - //TODO - return "TODO"; - } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 78ca9ff2a6..a96d951de6 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0; import java.io.EOFException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.ListIterator; @@ -286,7 +285,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement Binary dataEncoding = sectionEncoder.getEncoding(); final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); - metaData.writeToBuffer(0,allData); + metaData.writeToBuffer(allData); allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); return allData; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 5026007360..be9d7a2d60 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -314,7 +314,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData return buf; } - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + public int writeToBuffer(ByteBuffer dest) { ByteBuffer buf = _encoded; @@ -326,7 +326,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData buf = buf.duplicate(); - buf.position(offsetInMetaData); + buf.position(0); if(dest.remaining() < buf.limit()) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java new file mode 100644 index 0000000000..6f37d2d831 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.txn.ServerTransaction; + +public class MessageSourceDestination implements SendingDestination +{ + private static final Logger _logger = Logger.getLogger(MessageSourceDestination.class); + private static final Accepted ACCEPTED = new Accepted(); + private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; + + + private MessageSource _queue; + + public MessageSourceDestination(MessageSource queue) + { + _queue = queue; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public int getCredit() + { + // TODO - fix + return 100; + } + + public MessageSource getQueue() + { + return _queue; + } + +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java new file mode 100644 index 0000000000..70f659b546 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.txn.ServerTransaction; + +public class NodeReceivingDestination implements ReceivingDestination +{ + private static final Accepted ACCEPTED = new Accepted(); + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; + + private MessageDestination _exchange; + private TerminusDurability _durability; + private TerminusExpiryPolicy _expiryPolicy; + + public NodeReceivingDestination(MessageDestination exchange, TerminusDurability durable, TerminusExpiryPolicy expiryPolicy) + { + _exchange = exchange; + _durability = durable; + _expiryPolicy = expiryPolicy; + } + + public Outcome[] getOutcomes() + { + return OUTCOMES; + } + + public Outcome send(final Message_1_0 message, ServerTransaction txn) + { + final InstanceProperties instanceProperties = + new InstanceProperties() + { + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case MANDATORY: + return false; + case REDELIVERED: + return false; + case PERSISTENT: + return message.isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return message.getExpiration(); + } + return null; + }}; + + int enqueues = _exchange.send(message, instanceProperties, txn, null); + + + return enqueues == 0 ? REJECTED : ACCEPTED; + } + + TerminusDurability getDurability() + { + return _durability; + } + + TerminusExpiryPolicy getExpiryPolicy() + { + return _expiryPolicy; + } + + public int getCredit() + { + // TODO - fix + return 20000; + } + + public MessageDestination getDestination() + { + return _exchange; + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index b9c10b925f..3d6bb5e3db 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -24,22 +24,21 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.ServerTransaction; -public class QueueDestination implements SendingDestination, ReceivingDestination +public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination { private static final Logger _logger = Logger.getLogger(QueueDestination.class); private static final Accepted ACCEPTED = new Accepted(); private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED }; - private AMQQueue _queue; - public QueueDestination(AMQQueue queue) { - _queue = queue; + super(queue); } public Outcome[] getOutcomes() @@ -52,7 +51,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio try { - txn.enqueue(_queue,message, new ServerTransaction.Action() + txn.enqueue(getQueue(),message, new ServerTransaction.Action() { @@ -60,8 +59,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio { try { - - _queue.enqueue(message); + getQueue().enqueue(message,null); } catch (Exception e) { @@ -93,7 +91,7 @@ public class QueueDestination implements SendingDestination, ReceivingDestinatio public AMQQueue getQueue() { - return _queue; + return (AMQQueue) super.getQueue(); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 4abf1bf76b..9e0327fe76 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,11 +65,14 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler @@ -78,18 +82,22 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Subscription_1_0 _subscription; + private Consumer _consumer; + private ConsumerTarget_1_0 _target; + private boolean _draining; - private final Map<Binary, QueueEntry> _unsettledMap = - new HashMap<Binary, QueueEntry>(); + private final Map<Binary, MessageInstance> _unsettledMap = + new HashMap<Binary, MessageInstance>(); private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<Binary, UnsettledAction>(); private volatile SendingLinkAttachment _linkAttachment; private TerminusDurability _durability; - private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>(); + private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>(); private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>(); private Runnable _closeAction; + private final MessageSource _queue; + public SendingLink_1_0(final SendingLinkAttachment linkAttachment, final VirtualHost vhost, @@ -103,24 +111,22 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _durability = source.getDurable(); linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - AMQQueue queue = null; + EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); boolean noLocal = false; JMSSelectorFilter messageFilter = null; - if(destination instanceof QueueDestination) + if(destination instanceof MessageSourceDestination) { - queue = ((QueueDestination) _destination).getQueue(); + _queue = ((MessageSourceDestination) _destination).getQueue(); - if(queue.getAvailableAttributes().contains("topic")) + if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } - qd = (QueueDestination) destination; - Map<Symbol,Filter> filters = source.getFilter(); Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>(); @@ -167,7 +173,13 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY); + _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); + if(source.getDistributionMode() != StdDistMode.COPY) + { + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); + } + } else if(destination instanceof ExchangeDestination) { @@ -199,7 +211,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS name = UUID.randomUUID().toString(); } - queue = _vhost.getQueue(name); + AMQQueue queue = _vhost.getQueue(name); Exchange exchange = exchangeDestination.getExchange(); if(queue == null) @@ -299,9 +311,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } } + _queue = queue; source.setFilter(actualFilters.isEmpty() ? null : actualFilters); - exchange.addBinding(binding,queue,null); + exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); if(!isDurable) @@ -309,10 +322,10 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS final String queueName = name; final AMQQueue tempQueue = queue; - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() + final Action<Connection_1_0> deleteQueueTask = + new Action<Connection_1_0>() { - public void doTask(Connection_1_0 session) + public void performAction(Connection_1_0 session) { if (_vhost.getQueue(queueName) == tempQueue) { @@ -331,9 +344,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); } @@ -347,31 +360,46 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS catch (AMQSecurityException e) { _logger.error("Security error", e); + throw new RuntimeException(e); } catch (AMQInternalException e) { _logger.error("Internal error", e); + throw new RuntimeException(e); } catch (AMQException e) { _logger.error("Error", e); + throw new RuntimeException(e); } - _subscription = new Subscription_1_0(this, qd, true); + + _target = new ConsumerTarget_1_0(this, true); + options.add(Consumer.Option.ACQUIRES); + options.add(Consumer.Option.SEES_REQUEUES); + + } + else + { + throw new RuntimeException("Unknown destination type"); } - if(_subscription != null) + if(_target != null) { - _subscription.setNoLocal(noLocal); - if(messageFilter!=null) + if(noLocal) { - _subscription.setFilters(new SimpleFilterManager(messageFilter)); + options.add(Consumer.Option.NO_LOCAL); } + + _consumer.setNoLocal(noLocal); + + try { - - queue.registerSubscription(_subscription, false); + _consumer = _queue.addConsumer(_target, + messageFilter == null ? null : new SimpleFilterManager(messageFilter), + Message_1_0.class, getEndpoint().getName(), options); } catch (AMQException e) { @@ -394,12 +422,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS // if not durable or close if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) { - AMQQueue queue = _subscription.getQueue(); try { - queue.unregisterSubscription(_subscription); + _consumer.close(); } catch (AMQException e) @@ -426,7 +453,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - queue.getVirtualHost().removeQueue(queue); + _vhost.removeQueue((AMQQueue)_queue); } catch(AMQException e) { @@ -443,7 +470,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS else if(detach == null || detach.getError() != null) { _linkAttachment = null; - _subscription.flowStateChanged(); + _target.flowStateChanged(); } else { @@ -491,7 +518,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } if(_resumeAcceptedTransfers.isEmpty()) { - _subscription.flowStateChanged(); + _target.flowStateChanged(); } } @@ -531,7 +558,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } } - public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry) + public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry) { _unsettledActionMap.put(tag,unsettledAction); if(getTransactionId() == null) @@ -593,9 +620,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) { - if(_subscription.isActive()) + if(_consumer.isActive()) { - _subscription.suspend(); + _target.suspend(); } _linkAttachment = linkAttachment; @@ -603,14 +630,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS SendingLinkEndpoint endpoint = linkAttachment.getEndpoint(); endpoint.setDeliveryStateHandler(this); Map initialUnsettledMap = endpoint.getInitialUnsettledMap(); - Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap); + Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap); _resumeAcceptedTransfers.clear(); _resumeFullTransfers.clear(); - for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet()) + for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet()) { Binary deliveryTag = entry.getKey(); - final QueueEntry queueEntry = entry.getValue(); + final MessageInstance queueEntry = entry.getValue(); if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag)) { queueEntry.setRedelivered(); @@ -624,7 +651,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(outcome instanceof Accepted) { AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) + if(_consumer.acquires()) { txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action() @@ -644,7 +671,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS else if(outcome instanceof Released) { AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore()); - if(_subscription.acquires()) + if(_consumer.acquires()) { txn.dequeue(Collections.singleton(queueEntry), new ServerTransaction.Action() @@ -678,9 +705,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public Map getUnsettledOutcomeMap() { - Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap); + Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap); - for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet()) + for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet()) { entry.setValue(null); } @@ -692,4 +719,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _closeAction = action; } + + public VirtualHost getVirtualHost() + { + return _vhost; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 823e4cb16d..beed6be84b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; @@ -108,11 +111,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueue(addr); + MessageSource queue = _vhost.getMessageSource(addr); if(queue != null) { - destination = new QueueDestination(queue); + destination = new MessageSourceDestination(queue); @@ -249,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } String addr = target.getAddress(); - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) + MessageDestination messageDestination = _vhost.getMessageDestination(addr); + if(messageDestination != null) { - destination = new ExchangeDestination(exchg, target.getDurable(), - target.getExpiryPolicy()); + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy()); } else { @@ -343,10 +346,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) { - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() + final Action<Connection_1_0> deleteQueueTask = + new Action<Connection_1_0>() { - public void doTask(Connection_1_0 session) + public void performAction(Connection_1_0 session) { if (_vhost.getQueue(queueName) == tempQueue) { @@ -365,9 +368,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu _connection.addConnectionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { _connection.removeConnectionCloseTask(deleteQueueTask); } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index a71d833fc3..9ca23ce1ce 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -20,7 +20,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,8 +326,8 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Subscription deliveredSubscription = entry.getDeliveredSubscription(); - object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID()); + final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId()); ServerMessage message = entry.getMessage(); if(message != null) |
