summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java506
1 files changed, 506 insertions, 0 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
new file mode 100644
index 0000000000..6c91e6130e
--- /dev/null
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
@@ -0,0 +1,506 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
+ */
+public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+{
+
+ private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
+ new StateChangeListener<QueueEntry, QueueEntry.State>()
+ {
+ @Override
+ public void stateChanged(final QueueEntry entry,
+ final QueueEntry.State oldSate,
+ final QueueEntry.State newState)
+ {
+ if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ {
+ restoreCredit(entry);
+ }
+ entry.removeStateChangeListener(this);
+ }
+ };
+
+ private final ClientDeliveryMethod _deliveryMethod;
+ private final RecordDeliveryMethod _recordMethod;
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ private Subscription _subscription;
+
+
+ static final class BrowserSubscription extends SubscriptionTarget_0_8
+ {
+ public BrowserSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag,
+ filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ }
+
+ }
+
+ @Override
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return true;
+ }
+
+ }
+
+ public static class NoAckSubscription extends SubscriptionTarget_0_8
+ {
+ private final AutoCommitTransaction _txn;
+
+ public NoAckSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+
+ _txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore());
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ _txn.dequeue(getSubscription().getQueue(), entry.getMessage(), NOOP);
+
+ ServerMessage message = entry.getMessage();
+ MessageReference ref = message.newReference();
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ sendToClient(message, props, deliveryTag);
+
+ }
+ ref.release();
+
+
+ }
+
+ @Override
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return true;
+ }
+
+ private static final ServerTransaction.Action NOOP =
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ }
+
+ @Override
+ public void onRollback()
+ {
+ }
+ };
+ }
+
+ /**
+ * NoAck Subscription for use with BasicGet method.
+ */
+ public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return getCreditManager().useCreditForMessage(msg.getMessage().getSize());
+ }
+
+ }
+
+ static final class AckSubscription extends SubscriptionTarget_0_8
+ {
+ public AckSubscription(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ *
+ * @param entry The message to send
+ * @param batch
+ * @throws org.apache.qpid.AMQException
+ */
+ @Override
+ public void send(QueueEntry entry, boolean batch) throws AMQException
+ {
+
+
+ synchronized (getChannel())
+ {
+ getChannel().getProtocolSession().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ addUnacknowledgedMessage(entry);
+ recordMessageDelivery(entry, deliveryTag);
+ entry.addStateChangeListener(getReleasedStateChangeListener());
+ sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ entry.incrementDeliveryCount();
+
+ }
+ }
+
+
+
+ }
+
+
+ private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class);
+
+ private final AMQChannel _channel;
+
+ private final AMQShortString _consumerTag;
+
+ private final FlowCreditManager _creditManager;
+
+ private final Boolean _autoClose;
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+
+ public SubscriptionTarget_0_8(AMQChannel channel,
+ AMQShortString consumerTag,
+ FieldTable arguments,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(State.ACTIVE);
+
+ _channel = channel;
+ _consumerTag = consumerTag;
+
+ _creditManager = creditManager;
+ creditManager.addStateListener(this);
+
+ _deliveryMethod = deliveryMethod;
+ _recordMethod = recordMethod;
+
+ if (arguments != null)
+ {
+ Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+
+ public void setSubscription(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ public AMQSessionModel getSessionModel()
+ {
+ return _channel;
+ }
+
+ public String toString()
+ {
+ String subscriber = "[channel=" + _channel +
+ ", consumerTag=" + _consumerTag +
+ ", session=" + getProtocolSession().getKey() ;
+
+ return subscriber + "]";
+ }
+
+ public boolean isSuspended()
+ {
+ return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ *
+ */
+ public void queueDeleted()
+ {
+ _deleted.set(true);
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ getSubscription().getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ }
+ _creditManager.removeListener(this);
+ return closed;
+ }
+ finally
+ {
+ getSubscription().releaseSendLock();
+ }
+ }
+
+
+ public boolean allocateCredit(QueueEntry msg)
+ {
+ return _creditManager.useCreditForMessage(msg.getMessage().getSize());
+ }
+
+ public AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _channel.getProtocolSession();
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ _creditManager.restoreCredit(1, queueEntry.getSize());
+ }
+
+ protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
+ {
+ return _entryReleaseListener;
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(!updateState(State.SUSPENDED, State.ACTIVE))
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
+ }
+ }
+
+ protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ throws AMQException
+ {
+ _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag);
+
+ }
+
+
+ protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
+ {
+ _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
+ }
+
+
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
+
+ public void queueEmpty() throws AMQException
+ {
+ if (isAutoClose())
+ {
+ close();
+ confirmAutoClose();
+ }
+ }
+
+ public void flushBatched()
+ {
+ _channel.getProtocolSession().setDeferFlush(false);
+
+ _channel.getProtocolSession().flushBatched();
+ }
+
+ protected void addUnacknowledgedMessage(QueueEntry entry)
+ {
+ final long size = entry.getSize();
+ _unacknowledgedBytes.addAndGet(size);
+ _unacknowledgedCount.incrementAndGet();
+ entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
+ {
+ public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ {
+ if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ {
+ _unacknowledgedBytes.addAndGet(-size);
+ _unacknowledgedCount.decrementAndGet();
+ entry.removeStateChangeListener(this);
+ }
+ }
+ });
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+}