diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription')
11 files changed, 80 insertions, 97 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java index f511cc0dc9..6b2dff7165 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.subscription; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; + import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +48,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana private static int pow2(final int i) { int val = 1; - while(val < i) val<<=1; + while(val < i) + { + val<<=1; + } return val; } @@ -111,11 +115,15 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana public boolean visit(final QueueEntry entry) { if(!entry.isAvailable()) + { return false; + } Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId); if(groupId == null) + { return false; + } Integer group = groupId.hashCode() & _groupMask; Subscription assignedSub = _groupMap.get(group); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java index fbc8b3af7d..632b59d3fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.subscription; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.QueueEntry; public interface ClientDeliveryMethod { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index 689e48b4cf..62e94f6f2e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.server.subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -188,7 +189,9 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager public boolean visit(final QueueEntry entry) { if(!entry.isAvailable()) + { return false; + } Object groupId = getKey(entry); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java index 80c5e2866c..cf2754862d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.subscription; -import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.log4j.Logger; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.transport.ServerSession; + class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java index a61b0b4e82..1e37675c98 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.subscription; -import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.log4j.Logger; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.transport.ServerSession; + class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener { private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class); @@ -71,8 +72,6 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public boolean acquire() { boolean acquired = _entry.acquire(getSubscription()); - //TODO - why acknowledge here??? seems bizarre... - // getSubscription().getSession().acknowledge(getSubscription(), _entry); return acquired; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index bc1be90531..66825caa24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java index 0fd7fdffe5..3659243cea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java @@ -20,22 +20,21 @@ */ package org.apache.qpid.server.subscription; -import java.util.Map; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager_0_10; -import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageFlowMode; +import java.util.Map; + /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory * primarily assists testing although in future more sophisticated subscribers may need a different subscription diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java index 1622d63648..a2e30b6ae7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.server.subscription; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; @@ -38,6 +35,9 @@ import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageFlowMode; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + public class SubscriptionFactoryImpl implements SubscriptionFactory { private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 23ae14eef1..1f25c215cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; @@ -320,9 +319,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final Boolean _autoClose; - - private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - private AMQQueue _queue; private final AtomicBoolean _deleted = new AtomicBoolean(false); @@ -479,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean hasInterest(QueueEntry entry) { - - - - //check that the message hasn't been rejected if (entry.isRejectedBy(getSubscriptionID())) { @@ -490,27 +482,21 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { _logger.debug("Subscription:" + this + " rejected message:" + entry); } -// return false; } if (_noLocal) { - AMQMessage message = (AMQMessage) entry.getMessage(); - //todo - client id should be recorded so we don't have to handle - // the case where this is null. - final Object publisher = message.getPublisherIdentifier(); + final Object publisherReference = message.getConnectionIdentifier(); // We don't want local messages so check to see if message is one we sent - Object localInstance = getProtocolSession(); + Object localReference = getProtocolSession().getReference(); - if(publisher.equals(localInstance)) + if(publisherReference != null && publisherReference.equals(localReference)) { return false; } - - } @@ -585,7 +571,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage()); + return !_creditManager.useCreditForMessage(msg.getMessage().getSize()); } public boolean trySendLock() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java index 3e6299cb8a..bf5ce31bd9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java @@ -20,11 +20,9 @@ */ package org.apache.qpid.server.subscription; -import org.apache.qpid.server.subscription.Subscription; - -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class SubscriptionList { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index bde756dd03..76d975a789 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -20,46 +20,58 @@ */ package org.apache.qpid.server.subscription; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; - -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SubscriptionConfig; import org.apache.qpid.server.configuration.SubscriptionConfigType; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.CreditCreditManager; -import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager_0_10; -import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.*; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.Struct; import org.apache.qpid.url.AMQBindingURL; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; + import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -67,13 +79,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.ConcurrentHashMap; -import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { @@ -387,6 +398,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); } + if(origDeliveryProps.hasTtl()) + { + deliveryProps.setTtl(origDeliveryProps.getTtl()); + } } @@ -537,36 +552,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); } - // TODO - ReplyTo - - final Map<String, Object> appHeaders = new HashMap<String, Object>(); - - /*properties.getHeaders().processOverElements( - new FieldTable.FieldTableElementProcessor() - { - - public boolean processElement(String propertyName, AMQTypedValue value) - { - Object val = value.getValue(); - if(val instanceof AMQShortString) - { - val = val.toString(); - } - appHeaders.put(propertyName, val); - return true; - } - - public Object getResult() - { - return appHeaders; - } - }); - - - messageProps.setApplicationHeaders(appHeaders); -*/ Header header = new Header(deliveryProps, messageProps, null); xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); @@ -690,7 +677,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { entry.setRedelivered(); entry.routeToAlternate(); - + if(entry.isAcquiredBy(this)) + { + entry.discard(); + } } void release(final QueueEntry entry, final boolean setRedelivered) |