summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java29
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java93
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java86
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java109
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java95
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java785
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java245
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java957
11 files changed, 2543 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
new file mode 100644
index 0000000000..fbc8b3af7d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
@@ -0,0 +1,29 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.AMQException;
+
+public interface ClientDeliveryMethod
+{
+ void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
new file mode 100755
index 0000000000..b49b12fb79
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+
+class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
+
+
+ private final QueueEntry _entry;
+ private final Subscription_0_10 _sub;
+
+ public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ {
+ _entry = entry;
+ _sub = subscription_0_10;
+ }
+
+ public void onAccept()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.getSession().acknowledge(subscription, _entry);
+ }
+ else
+ {
+ _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public void onRelease()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.release(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ }
+ }
+
+ public void onReject()
+ {
+ final Subscription_0_10 subscription = getSubscription();
+ if(subscription != null && _entry.isAcquiredBy(_sub))
+ {
+ subscription.reject(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public boolean acquire()
+ {
+ return _entry.acquire(getSubscription());
+ }
+
+
+ private Subscription_0_10 getSubscription()
+ {
+ return _sub;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
new file mode 100755
index 0000000000..b5bb2014b5
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.log4j.Logger;
+
+class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+{
+ private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+ private final QueueEntry _entry;
+ private Subscription_0_10 _sub;
+
+ public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ {
+ _entry = entry;
+ _sub = subscription_0_10;
+ }
+
+ public void onAccept()
+ {
+ _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+ }
+
+ public void onRelease()
+ {
+ if(_entry.isAcquiredBy(_sub))
+ {
+ getSubscription().release(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ }
+ }
+
+ public void onReject()
+ {
+ if(_entry.isAcquiredBy(_sub))
+ {
+ getSubscription().reject(_entry);
+ }
+ else
+ {
+ _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ }
+
+ }
+
+ public boolean acquire()
+ {
+ boolean acquired = _entry.acquire(getSubscription());
+ //TODO - why acknowledge here??? seems bizarre...
+ // getSubscription().getSession().acknowledge(getSubscription(), _entry);
+ return acquired;
+
+ }
+
+ public Subscription_0_10 getSubscription()
+ {
+ return _sub;
+ }
+
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
new file mode 100755
index 0000000000..8a2a370236
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.transport.Method;
+
+public class MessageAcceptCompletionListener implements Method.CompletionListener
+{
+ private final Subscription_0_10 _sub;
+ private final QueueEntry _entry;
+ private final ServerSession _session;
+ private boolean _restoreCredit;
+
+ public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ {
+ super();
+ _sub = sub;
+ _entry = entry;
+ _session = session;
+ _restoreCredit = restoreCredit;
+ }
+
+ public void onComplete(Method method)
+ {
+ if(_restoreCredit)
+ {
+ _sub.restoreCredit(_entry);
+ }
+ if(_entry.isAcquiredBy(_sub))
+ {
+ _session.acknowledge(_sub, _entry);
+ }
+
+ _session.removeDispositionListener(method);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
new file mode 100644
index 0000000000..e2ed4104de
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/RecordDeliveryMethod.java
@@ -0,0 +1,28 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.QueueEntry;
+
+public interface RecordDeliveryMethod
+{
+ void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
new file mode 100644
index 0000000000..0a3576ff42
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public interface Subscription
+{
+ LogActor getLogActor();
+
+ boolean isTransient();
+
+ public static enum State
+ {
+ ACTIVE,
+ SUSPENDED,
+ CLOSED
+ }
+
+ public static interface StateListener
+ {
+ public void stateChange(Subscription sub, State oldState, State newState);
+ }
+
+ AMQQueue getQueue();
+
+ QueueEntry.SubscriptionAcquiredState getOwningState();
+ QueueEntry.SubscriptionAssignedState getAssignedState();
+
+
+ void setQueue(AMQQueue queue, boolean exclusive);
+
+ void setNoLocal(boolean noLocal);
+
+ AMQShortString getConsumerTag();
+
+ long getSubscriptionID();
+
+ boolean isSuspended();
+
+ boolean hasInterest(QueueEntry msg);
+
+ boolean isAutoClose();
+
+ boolean isClosed();
+
+ boolean acquires();
+
+ boolean seesRequeues();
+
+ void close();
+
+ void send(QueueEntry msg) throws AMQException;
+
+ void queueDeleted(AMQQueue queue);
+
+
+ boolean wouldSuspend(QueueEntry msg);
+
+ void getSendLock();
+
+ void releaseSendLock();
+
+ void onDequeue(final QueueEntry queueEntry);
+
+ void restoreCredit(final QueueEntry queueEntry);
+
+ void setStateListener(final StateListener listener);
+
+ public State getState();
+
+ AMQQueue.Context getQueueContext();
+
+ void setQueueContext(AMQQueue.Context queueContext);
+
+
+ boolean isActive();
+
+ void confirmAutoClose();
+
+ public void set(String key, Object value);
+
+ public Object get(String key);
+
+ boolean isSessionTransactional();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
new file mode 100644
index 0000000000..ce0362d73f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+
+/**
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
+ * primarily assists testing although in future more sophisticated subscribers may need a different subscription
+ * implementation.
+ *
+ * @see org.apache.qpid.server.queue.AMQQueue
+ */
+public interface SubscriptionFactory
+{
+ Subscription createSubscription(int channel,
+ AMQProtocolSession protocolSession,
+ AMQShortString consumerTag,
+ boolean acks,
+ FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager) throws AMQException;
+
+
+ Subscription createSubscription(AMQChannel channel,
+ AMQProtocolSession protocolSession,
+ AMQShortString consumerTag,
+ boolean acks,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod clientMethod,
+ RecordDeliveryMethod recordMethod
+ )
+ throws AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
new file mode 100644
index 0000000000..1bba2529c6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+public class SubscriptionFactoryImpl implements SubscriptionFactory
+{
+ public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager) throws AMQException
+ {
+ AMQChannel channel = protocolSession.getChannel(channelId);
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
+ }
+ ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod();
+ RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod();
+
+
+ return createSubscription(channel, protocolSession, consumerTag, acks, filters,
+ noLocal,
+ creditManager,
+ clientMethod,
+ recordMethod
+ );
+ }
+
+ public Subscription createSubscription(final AMQChannel channel,
+ final AMQProtocolSession protocolSession,
+ final AMQShortString consumerTag,
+ final boolean acks,
+ final FieldTable filters,
+ final boolean noLocal,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod clientMethod,
+ final RecordDeliveryMethod recordMethod
+ )
+ throws AMQException
+ {
+ boolean isBrowser;
+
+ if (filters != null)
+ {
+ Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue();
+ }
+ else
+ {
+ isBrowser = false;
+ }
+
+ if(isBrowser)
+ {
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ }
+ else if(acks)
+ {
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ }
+ else
+ {
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
+ }
+ }
+
+
+ public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
new file mode 100644
index 0000000000..d8f44c9f7f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -0,0 +1,785 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
+ * that was given out by the broker and the channel id. <p/>
+ */
+public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener,
+ SubscriptionConfig
+{
+
+ private StateListener _stateListener = new StateListener()
+ {
+
+ public void stateChange(Subscription sub, State oldState, State newState)
+ {
+
+ }
+ };
+
+
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+ private AMQQueue.Context _queueContext;
+
+ private final ClientDeliveryMethod _deliveryMethod;
+ private final RecordDeliveryMethod _recordMethod;
+
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+
+ private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
+ private final Lock _stateChangeLock;
+
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+ private LogSubject _logSubject;
+ private LogActor _logActor;
+ private UUID _id;
+ private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private long _createTime = System.currentTimeMillis();
+
+
+ static final class BrowserSubscription extends SubscriptionImpl
+ {
+ public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+
+ public boolean isBrowser()
+ {
+ return true;
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ * @param msg The message to send
+ * @throws AMQException
+ */
+ @Override
+ public void send(QueueEntry msg) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(msg, deliveryTag);
+ }
+
+ }
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
+ }
+
+ public static class NoAckSubscription extends SubscriptionImpl
+ {
+ public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+
+ public boolean isBrowser()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isExplicitAcknowledge()
+ {
+ return false;
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ * @param entry The message to send
+ * @throws AMQException
+ */
+ @Override
+ public void send(QueueEntry entry) throws AMQException
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ entry.dequeue();
+
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+ sendToClient(entry, deliveryTag);
+
+ }
+ entry.dispose();
+
+
+ }
+
+ @Override
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return false;
+ }
+
+ }
+
+ static final class AckSubscription extends SubscriptionImpl
+ {
+ public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+
+ public boolean isBrowser()
+ {
+ return false;
+ }
+
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ * @param entry The message to send
+ * @throws AMQException
+ */
+ @Override
+ public void send(QueueEntry entry) throws AMQException
+ {
+
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+
+ synchronized (getChannel())
+ {
+ long deliveryTag = getChannel().getNextDeliveryTag();
+
+
+ recordMessageDelivery(entry, deliveryTag);
+ sendToClient(entry, deliveryTag);
+
+
+ }
+ }
+
+
+
+ }
+
+
+ private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
+
+ private final AMQChannel _channel;
+
+ private final AMQShortString _consumerTag;
+
+
+ private boolean _noLocal;
+
+ private final FlowCreditManager _creditManager;
+
+ private FilterManager _filters;
+
+ private final Boolean _autoClose;
+
+
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
+ private AMQQueue _queue;
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+
+ public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable arguments,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+
+ _channel = channel;
+ _consumerTag = consumerTag;
+
+ _creditManager = creditManager;
+ creditManager.addStateListener(this);
+
+ _noLocal = noLocal;
+
+
+ _filters = FilterManagerFactory.createManager(arguments);
+
+ _deliveryMethod = deliveryMethod;
+ _recordMethod = recordMethod;
+
+
+ _stateChangeLock = new ReentrantLock();
+
+
+ if (arguments != null)
+ {
+ Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getQueue().getConfigStore();
+ }
+
+ public Long getDelivered()
+ {
+ return _deliveredCount.get();
+ }
+
+ public synchronized void setQueue(AMQQueue queue, boolean exclusive)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+
+ _logSubject = new SubscriptionLogSubject(this);
+ _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
+ if (CurrentActor.get().getRootMessageLogger().
+ isMessageEnabled(CurrentActor.get(), _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
+ {
+ // Get the string value of the filters
+ String filterLogString = null;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString = _filters.toString();
+ }
+
+ if (isAutoClose())
+ {
+ if (filterLogString == null)
+ {
+ filterLogString = "";
+ }
+ else
+ {
+ filterLogString += ",";
+ }
+ filterLogString += "AutoClose";
+ }
+
+ if (isBrowser())
+ {
+ // We do not need to check for null here as all Browsers are AutoClose
+ filterLogString +=",Browser";
+ }
+
+ CurrentActor.get().
+ message(_logSubject,
+ SubscriptionMessages.CREATE(filterLogString,
+ queue.isDurable() && exclusive,
+ filterLogString != null));
+ }
+ }
+
+ public String toString()
+ {
+ String subscriber = "[channel=" + _channel +
+ ", consumerTag=" + _consumerTag +
+ ", session=" + getProtocolSession().getKey() ;
+
+ return subscriber + "]";
+ }
+
+ /**
+ * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
+ * thread safe.
+ *
+ * @param msg The message to send
+ * @throws AMQException
+ */
+ abstract public void send(QueueEntry msg) throws AMQException;
+
+
+ public boolean isSuspended()
+ {
+ return !isActive() || _channel.isSuspended() || _deleted.get();
+ }
+
+ /**
+ * Callback indicating that a queue has been deleted.
+ *
+ * @param queue The queue to delete
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ _deleted.set(true);
+// _channel.queueDeleted(queue);
+ }
+
+ public boolean filtersMessages()
+ {
+ return _filters != null || _noLocal;
+ }
+
+ public boolean hasInterest(QueueEntry entry)
+ {
+
+
+
+
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(this))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Subscription:" + this + " rejected message:" + entry);
+ }
+// return false;
+ }
+
+ if (_noLocal)
+ {
+
+ AMQMessage message = (AMQMessage) entry.getMessage();
+
+ //todo - client id should be recorded so we don't have to handle
+ // the case where this is null.
+ final Object publisher = message.getPublisherIdentifier();
+
+ // We don't want local messages so check to see if message is one we sent
+ Object localInstance = getProtocolSession();
+
+ if(publisher.equals(localInstance))
+ {
+ return false;
+ }
+
+
+ }
+
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("(" + this + ") checking filters for message (" + entry);
+ }
+ return checkFilters(entry);
+
+ }
+
+ private String id = String.valueOf(System.identityHashCode(this));
+
+ private String debugIdentity()
+ {
+ return id;
+ }
+
+ private boolean checkFilters(QueueEntry msg)
+ {
+ return (_filters == null) || _filters.allAllow(msg);
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public FlowCreditManager getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public void close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ _stateChangeLock.lock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = _state.compareAndSet(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ _stateListener.stateChange(this,state, State.CLOSED);
+ }
+ }
+ _creditManager.removeListener(this);
+ }
+ finally
+ {
+ _stateChangeLock.unlock();
+ }
+ getConfigStore().removeConfiguredObject(this);
+
+ //Log Subscription closed
+ CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE());
+ }
+
+ public boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public AMQChannel getChannel()
+ {
+ return _channel;
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _channel.getProtocolSession();
+ }
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public void onDequeue(final QueueEntry queueEntry)
+ {
+ restoreCredit(queueEntry);
+ }
+
+ public void restoreCredit(final QueueEntry queueEntry)
+ {
+ _creditManager.restoreCredit(1, queueEntry.getSize());
+ }
+
+
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ else
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ CurrentActor.get().message(_logSubject,SubscriptionMessages.STATE(_state.get().toString()));
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+
+ public void setStateListener(final StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+
+ public AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context context)
+ {
+ _queueContext = context;
+ }
+
+
+ protected void sendToClient(final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ _deliveryMethod.deliverToClient(this,entry,deliveryTag);
+ _deliveredCount.incrementAndGet();
+ }
+
+
+ protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
+ {
+ _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
+ }
+
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
+ }
+
+
+ public void confirmAutoClose()
+ {
+ ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+ }
+
+ public boolean acquires()
+ {
+ return !isBrowser();
+ }
+
+ public boolean seesRequeues()
+ {
+ return !isBrowser();
+ }
+
+ public boolean isTransient()
+ {
+ return false;
+ }
+
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ abstract boolean isBrowser();
+
+ public String getCreditMode()
+ {
+ return "WINDOW";
+ }
+
+ public SessionConfig getSessionConfig()
+ {
+ return getChannel();
+ }
+
+ public boolean isBrowsing()
+ {
+ return isBrowser();
+ }
+
+ public boolean isExplicitAcknowledge()
+ {
+ return true;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public SubscriptionConfigType getConfigType()
+ {
+ return SubscriptionConfigType.getInstance();
+ }
+
+ public boolean isExclusive()
+ {
+ return getQueue().hasExclusiveSubscriber();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getSessionConfig();
+ }
+
+ public String getName()
+ {
+ return String.valueOf(_consumerTag);
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null;
+ }
+
+ public boolean isSessionTransactional()
+ {
+ return _channel.isTransactional();
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
new file mode 100644
index 0000000000..9ea81660c6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
@@ -0,0 +1,245 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.subscription.Subscription;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.ByteBuffer;
+
+public class SubscriptionList
+{
+
+ private final SubscriptionNode _head = new SubscriptionNode();
+
+ private AtomicReference<SubscriptionNode> _tail = new AtomicReference<SubscriptionNode>(_head);
+ private AtomicInteger _size = new AtomicInteger();
+
+
+ public final class SubscriptionNode
+ {
+ private final AtomicBoolean _deleted = new AtomicBoolean();
+ private final AtomicReference<SubscriptionNode> _next = new AtomicReference<SubscriptionNode>();
+ private final Subscription _sub;
+
+
+ public SubscriptionNode()
+ {
+
+ _sub = null;
+ _deleted.set(true);
+ }
+
+ public SubscriptionNode(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+
+ public SubscriptionNode getNext()
+ {
+
+ SubscriptionNode next = nextNode();
+ while(next != null && next.isDeleted())
+ {
+
+ final SubscriptionNode newNext = next.nextNode();
+ if(newNext != null)
+ {
+ _next.compareAndSet(next, newNext);
+ next = nextNode();
+ }
+ else
+ {
+ next = null;
+ }
+
+ }
+ return next;
+ }
+
+ private SubscriptionNode nextNode()
+ {
+ return _next.get();
+ }
+
+ public boolean isDeleted()
+ {
+ return _deleted.get();
+ }
+
+
+ public boolean delete()
+ {
+ if(_deleted.compareAndSet(false,true))
+ {
+ _size.decrementAndGet();
+ advanceHead();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+ public Subscription getSubscription()
+ {
+ return _sub;
+ }
+ }
+
+
+ public SubscriptionList(AMQQueue queue)
+ {
+ }
+
+ private void advanceHead()
+ {
+ SubscriptionNode head = _head.nextNode();
+ while(head._next.get() != null && head.isDeleted())
+ {
+
+ final SubscriptionNode newhead = head.nextNode();
+ if(newhead != null)
+ {
+ _head._next.compareAndSet(head, newhead);
+ }
+ head = _head.nextNode();
+ }
+ }
+
+
+ public SubscriptionNode add(Subscription sub)
+ {
+ SubscriptionNode node = new SubscriptionNode(sub);
+ for (;;)
+ {
+ SubscriptionNode tail = _tail.get();
+ SubscriptionNode next = tail.nextNode();
+ if (tail == _tail.get())
+ {
+ if (next == null)
+ {
+ if (tail._next.compareAndSet(null, node))
+ {
+ _tail.compareAndSet(tail, node);
+ _size.incrementAndGet();
+ return node;
+ }
+ }
+ else
+ {
+ _tail.compareAndSet(tail, next);
+ }
+ }
+ }
+
+ }
+
+ public boolean remove(Subscription sub)
+ {
+ SubscriptionNode node = _head.getNext();
+ while(node != null)
+ {
+ if(sub.equals(node._sub) && node.delete())
+ {
+ return true;
+ }
+ node = node.getNext();
+ }
+ return false;
+ }
+
+
+ public static class SubscriptionNodeIterator
+ {
+
+ private SubscriptionNode _lastNode;
+
+ SubscriptionNodeIterator(SubscriptionNode startNode)
+ {
+ _lastNode = startNode;
+ }
+
+
+ public boolean atTail()
+ {
+ return _lastNode.nextNode() == null;
+ }
+
+ public SubscriptionNode getNode()
+ {
+
+ return _lastNode;
+
+ }
+
+ public boolean advance()
+ {
+
+ if(!atTail())
+ {
+ SubscriptionNode nextNode = _lastNode.nextNode();
+ while(nextNode.isDeleted() && nextNode.nextNode() != null)
+ {
+ nextNode = nextNode.nextNode();
+ }
+ _lastNode = nextNode;
+ return true;
+
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+
+ }
+
+
+ public SubscriptionNodeIterator iterator()
+ {
+ return new SubscriptionNodeIterator(_head);
+ }
+
+
+ public SubscriptionNode getHead()
+ {
+ return _head;
+ }
+
+ public int size()
+ {
+ return _size.get();
+ }
+
+
+
+}
+
+
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
new file mode 100644
index 0000000000..68e47fd86a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -0,0 +1,957 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfig;
+import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.CreditCreditManager;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager_0_10;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.nio.ByteBuffer;
+
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
+{
+
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
+ private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+ private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+
+ private final Lock _stateChangeLock = new ReentrantLock();
+
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+ private AMQQueue.Context _queueContext;
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+ private FlowCreditManager_0_10 _creditManager;
+
+ private StateListener _stateListener = new StateListener()
+ {
+
+ public void stateChange(Subscription sub, State oldState, State newState)
+ {
+ CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+ }
+ };
+ private AMQQueue _queue;
+ private final String _destination;
+ private boolean _noLocal;
+ private final FilterManager _filters;
+ private final MessageAcceptMode _acceptMode;
+ private final MessageAcquireMode _acquireMode;
+ private MessageFlowMode _flowMode;
+ private final ServerSession _session;
+ private AtomicBoolean _stopped = new AtomicBoolean(true);
+ private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+ private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
+
+ private LogActor _logActor;
+ private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private UUID _id;
+ private String _traceExclude;
+ private String _trace;
+ private long _createTime = System.currentTimeMillis();
+ private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final Map<String, Object> _arguments;
+
+
+ public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ FilterManager filters,Map<String, Object> arguments)
+ {
+ _session = session;
+ _destination = destination;
+ _acceptMode = acceptMode;
+ _acquireMode = acquireMode;
+ _creditManager = creditManager;
+ _flowMode = flowMode;
+ _filters = filters;
+ _creditManager.addStateListener(this);
+ _arguments = arguments == null ? Collections.<String, Object> emptyMap() :
+ Collections.<String, Object> unmodifiableMap(arguments);
+ _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
+
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry.SubscriptionAcquiredState getOwningState()
+ {
+ return _owningState;
+ }
+
+ public QueueEntry.SubscriptionAssignedState getAssignedState()
+ {
+ return _assignedState;
+ }
+
+ public void setQueue(AMQQueue queue, boolean exclusive)
+ {
+ if(getQueue() != null)
+ {
+ throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+ }
+ _queue = queue;
+ Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+ _traceExclude = (String) arguments.get("qpid.trace.exclude");
+ _trace = (String) arguments.get("qpid.trace.id");
+ _id = getConfigStore().createId();
+ getConfigStore().addConfiguredObject(this);
+ String filterLogString = null;
+
+ _logActor = GenericActor.getInstance(this);
+ if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, this, SubscriptionMessages.CREATE_LOG_HIERARCHY))
+ {
+ filterLogString = getFilterLogString();
+ CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
+ filterLogString.length() > 0));
+ }
+
+ }
+
+ public AMQShortString getConsumerTag()
+ {
+ return new AMQShortString(_destination);
+ }
+
+ public boolean isSuspended()
+ {
+ return !isActive() || _deleted.get(); // TODO check for Session suspension
+ }
+
+ public boolean hasInterest(QueueEntry entry)
+ {
+
+
+
+ //check that the message hasn't been rejected
+ if (entry.isRejectedBy(this))
+ {
+
+ return false;
+ }
+
+
+
+ if (_noLocal
+ && (entry.getMessage() instanceof MessageTransferMessage)
+ && ((MessageTransferMessage)entry.getMessage()).getSession() == _session)
+ {
+ return false;
+ }
+
+
+ return checkFilters(entry);
+
+
+ }
+
+ private boolean checkFilters(QueueEntry entry)
+ {
+ return (_filters == null) || _filters.allAllow(entry);
+ }
+
+ public boolean isAutoClose()
+ {
+ // no such thing in 0-10
+ return false;
+ }
+
+ public boolean isClosed()
+ {
+ return getState() == State.CLOSED;
+ }
+
+ public boolean isBrowser()
+ {
+ return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+ }
+
+ public boolean seesRequeues()
+ {
+ return _acquireMode != MessageAcquireMode.NOT_ACQUIRED || _acceptMode == MessageAcceptMode.EXPLICIT;
+ }
+
+ public void close()
+ {
+ boolean closed = false;
+ State state = getState();
+
+ _stateChangeLock.lock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
+ {
+ closed = _state.compareAndSet(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
+ else
+ {
+ _stateListener.stateChange(this,state, State.CLOSED);
+ }
+ }
+ _creditManager.removeListener(this);
+ getConfigStore().removeConfiguredObject(this);
+ CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+ }
+ finally
+ {
+ _stateChangeLock.unlock();
+ }
+
+
+
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getQueue().getConfigStore();
+ }
+
+ public Long getDelivered()
+ {
+ return _deliveredCount.get();
+ }
+
+ public void creditStateChanged(boolean hasCredit)
+ {
+
+ if(hasCredit)
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ else
+ {
+ // this is a hack to get round the issue of increasing bytes credit
+ _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
+ }
+ }
+ else
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ }
+ }
+
+
+ private class AddMessageDispositionListnerAction implements Runnable
+ {
+ public MessageTransfer _xfr;
+ public ServerSession.MessageDispositionChangeListener _action;
+
+ public void run()
+ {
+ if(_action != null)
+ {
+ _session.onMessageDispositionChange(_xfr, _action);
+ }
+ }
+ }
+
+ private final AddMessageDispositionListnerAction _postIdSettingAction = new AddMessageDispositionListnerAction();
+
+ public void send(final QueueEntry entry) throws AMQException
+ {
+ ServerMessage serverMsg = entry.getMessage();
+
+
+ MessageTransfer xfr;
+
+ DeliveryProperties deliveryProps;
+ MessageProperties messageProps = null;
+
+ if(serverMsg instanceof MessageTransferMessage)
+ {
+
+ MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
+
+
+ Struct[] headers;
+ if(msg.getHeader() == null)
+ {
+ headers = EMPTY_STRUCT_ARRAY;
+ }
+ else
+ {
+ headers = msg.getHeader().getStructs();
+ }
+
+ ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
+ DeliveryProperties origDeliveryProps = null;
+ for(Struct header : headers)
+ {
+ if(header instanceof DeliveryProperties)
+ {
+ origDeliveryProps = (DeliveryProperties) header;
+ }
+ else
+ {
+ if(header instanceof MessageProperties)
+ {
+ messageProps = (MessageProperties) header;
+ }
+ newHeaders.add(header);
+ }
+ }
+
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
+ {
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
+ }
+ if(origDeliveryProps.hasExchange())
+ {
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
+ }
+ if(origDeliveryProps.hasExpiration())
+ {
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
+ }
+ if(origDeliveryProps.hasPriority())
+ {
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
+ }
+ if(origDeliveryProps.hasRoutingKey())
+ {
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
+ }
+ if(origDeliveryProps.hasTimestamp())
+ {
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
+ }
+
+
+ }
+
+ deliveryProps.setRedelivered(entry.isRedelivered());
+
+ newHeaders.add(deliveryProps);
+
+ if(_trace != null && messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ newHeaders.add(messageProps);
+ }
+
+ Header header = new Header(newHeaders);
+
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
+ }
+ else if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage message_0_8 = (AMQMessage) serverMsg;
+ deliveryProps = new DeliveryProperties();
+ messageProps = new MessageProperties();
+
+ int size = (int) message_0_8.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ message_0_8.getContent(body, 0);
+ body.flip();
+
+ Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+ BasicContentHeaderProperties properties =
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+ final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+ if(exchange != null)
+ {
+ deliveryProps.setExchange(exchange.toString());
+ }
+ deliveryProps.setExpiration(message_0_8.getExpiration());
+ deliveryProps.setImmediate(message_0_8.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+ deliveryProps.setRedelivered(entry.isRedelivered());
+ deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+ deliveryProps.setTimestamp(properties.getTimestamp());
+
+ messageProps.setContentEncoding(properties.getEncodingAsString());
+ messageProps.setContentLength(size);
+ if(properties.getAppId() != null)
+ {
+ messageProps.setAppId(properties.getAppId().getBytes());
+ }
+ messageProps.setContentType(properties.getContentTypeAsString());
+ if(properties.getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+ }
+
+ // TODO - ReplyTo
+
+ if(properties.getUserId() != null)
+ {
+ messageProps.setUserId(properties.getUserId().getBytes());
+ }
+
+ FieldTable fieldTable = properties.getHeaders();
+
+ final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+
+ messageProps.setApplicationHeaders(appHeaders);
+
+ Header header = new Header(headers);
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ }
+ else
+ {
+
+ deliveryProps = new DeliveryProperties();
+ messageProps = new MessageProperties();
+
+ int size = (int) serverMsg.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ serverMsg.getContent(body, 0);
+ body.flip();
+
+ Struct[] headers = new Struct[] { deliveryProps, messageProps };
+
+
+ deliveryProps.setExpiration(serverMsg.getExpiration());
+ deliveryProps.setImmediate(serverMsg.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+ deliveryProps.setRedelivered(entry.isRedelivered());
+ deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+ messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentLength(size);
+ messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
+ if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ }
+
+
+ // TODO - ReplyTo
+
+
+ final Map<String, Object> appHeaders = new HashMap<String, Object>();
+
+ /*properties.getHeaders().processOverElements(
+ new FieldTable.FieldTableElementProcessor()
+ {
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ Object val = value.getValue();
+ if(val instanceof AMQShortString)
+ {
+ val = val.toString();
+ }
+ appHeaders.put(propertyName, val);
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return appHeaders;
+ }
+ });
+
+
+ messageProps.setApplicationHeaders(appHeaders);
+*/
+ Header header = new Header(headers);
+ xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
+ }
+
+ boolean excludeDueToFederation = false;
+
+ if(_trace != null)
+ {
+ if(!messageProps.hasApplicationHeaders())
+ {
+ messageProps.setApplicationHeaders(new HashMap<String,Object>());
+ }
+ Map<String,Object> appHeaders = messageProps.getApplicationHeaders();
+ String trace = (String) appHeaders.get("x-qpid.trace");
+ if(trace == null)
+ {
+ trace = _trace;
+ }
+ else
+ {
+ if(_traceExclude != null)
+ {
+ excludeDueToFederation = Arrays.asList(trace.split(",")).contains(_traceExclude);
+ }
+ trace+=","+_trace;
+ }
+ appHeaders.put("x-qpid.trace",trace);
+ }
+
+ if(!excludeDueToFederation)
+ {
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ restoreCredit(entry);
+ }
+ });
+ }
+
+
+ _postIdSettingAction._xfr = xfr;
+ if(_acceptMode == MessageAcceptMode.EXPLICIT)
+ {
+ _postIdSettingAction._action = new ExplicitAcceptDispositionChangeListener(entry, this);
+ }
+ else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
+ {
+ _postIdSettingAction._action = new ImplicitAcceptDispositionChangeListener(entry, this);
+ }
+ else
+ {
+ _postIdSettingAction._action = null;
+ }
+
+ _session.sendMessage(xfr, _postIdSettingAction);
+ _deliveredCount.incrementAndGet();
+ if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ forceDequeue(entry, false);
+ }
+ }
+ else
+ {
+ forceDequeue(entry, _flowMode == MessageFlowMode.WINDOW);
+
+ }
+ }
+
+ private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
+ {
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ txn.dequeue(entry.getQueue(),entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ if(restoreCredit)
+ {
+ restoreCredit(entry);
+ }
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
+ void reject(QueueEntry entry)
+ {
+ entry.setRedelivered();
+ entry.routeToAlternate();
+
+ }
+
+ void release(QueueEntry entry)
+ {
+ entry.setRedelivered();
+ entry.release();
+ }
+
+ public void queueDeleted(AMQQueue queue)
+ {
+ _deleted.set(true);
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !_creditManager.useCreditForMessage(msg.getMessage());
+ }
+
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ _creditManager.restoreCredit(1, queueEntry.getSize());
+ }
+
+ public void onDequeue(QueueEntry queueEntry)
+ {
+
+ }
+
+ public void setStateListener(StateListener listener)
+ {
+ _stateListener = listener;
+ }
+
+ public State getState()
+ {
+ return _state.get();
+ }
+
+ public AMQQueue.Context getQueueContext()
+ {
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
+ }
+
+ public boolean isActive()
+ {
+ return getState() == State.ACTIVE;
+ }
+
+ public void confirmAutoClose()
+ {
+ //No such thing in 0-10
+ }
+
+ public void set(String key, Object value)
+ {
+ _properties.put(key, value);
+ }
+
+ public Object get(String key)
+ {
+ return _properties.get(key);
+ }
+
+
+ public FlowCreditManager_0_10 getCreditManager()
+ {
+ return _creditManager;
+ }
+
+
+ public void stop()
+ {
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+
+ public void addCredit(MessageCreditUnit unit, long value)
+ {
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+
+ switch (unit)
+ {
+ case MESSAGE:
+
+ creditManager.addCredit(value, 0L);
+ break;
+ case BYTE:
+ creditManager.addCredit(0l, value);
+ break;
+ }
+
+ _stopped.set(false);
+
+ if(creditManager.hasCredit())
+ {
+ if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+ {
+ _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+ }
+ }
+
+ }
+
+ public void setFlowMode(MessageFlowMode flowMode)
+ {
+
+
+ _creditManager.removeListener(this);
+
+ switch(flowMode)
+ {
+ case CREDIT:
+ _creditManager = new CreditCreditManager(0l,0l);
+ break;
+ case WINDOW:
+ _creditManager = new WindowCreditManager(0l,0l);
+ break;
+ default:
+ throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ }
+ _flowMode = flowMode;
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+
+ _creditManager.addStateListener(this);
+
+ }
+
+ public boolean isStopped()
+ {
+ return _stopped.get();
+ }
+
+ public boolean acquires()
+ {
+ return _acquireMode == MessageAcquireMode.PRE_ACQUIRED;
+ }
+
+ public void acknowledge(QueueEntry entry)
+ {
+ // TODO Fix Store Context / cleanup
+ if(entry.isAcquiredBy(this))
+ {
+ entry.discard();
+ }
+ }
+
+ public void flush() throws AMQException
+ {
+ _queue.flushSubscription(this);
+ stop();
+ }
+
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
+
+ public boolean isTransient()
+ {
+ return false;
+ }
+
+ ServerSession getSession()
+ {
+ return _session;
+ }
+
+
+ public SessionConfig getSessionConfig()
+ {
+ return getSession();
+ }
+
+ public boolean isBrowsing()
+ {
+ return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
+ }
+
+ public boolean isExclusive()
+ {
+ return getQueue().hasExclusiveSubscriber();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getSessionConfig();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public SubscriptionConfigType getConfigType()
+ {
+ return SubscriptionConfigType.getInstance();
+ }
+
+ public boolean isExplicitAcknowledge()
+ {
+ return _acceptMode == MessageAcceptMode.EXPLICIT;
+ }
+
+ public String getCreditMode()
+ {
+ return _flowMode.toString();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getName()
+ {
+ return _destination;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return _arguments;
+ }
+
+ public boolean isSessionTransactional()
+ {
+ return _session.isTransactional();
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public String toLogString()
+ {
+ String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
+ _queue.getNameShortString());
+ String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
+ // queueString is "vh(/{0})/qu({1}) " so need to trim
+ + queueInfo.substring(0, queueInfo.length() - 1) + ")" + "] ";
+ return result;
+ }
+
+ private String getFilterLogString()
+ {
+ StringBuilder filterLogString = new StringBuilder();
+ String delimiter = ", ";
+ boolean hasEntries = false;
+ if (_filters != null && _filters.hasFilters())
+ {
+ filterLogString.append(_filters.toString());
+ hasEntries = true;
+ }
+
+ if (isBrowser())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Browser");
+ hasEntries = true;
+ }
+
+ if (isDurable())
+ {
+ if (hasEntries)
+ {
+ filterLogString.append(delimiter);
+ }
+ filterLogString.append("Durable");
+ hasEntries = true;
+ }
+
+ return filterLogString.toString();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return (LogSubject) this;
+ }
+
+}