summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java100
11 files changed, 80 insertions, 97 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
index f511cc0dc9..6b2dff7165 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +48,10 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
private static int pow2(final int i)
{
int val = 1;
- while(val < i) val<<=1;
+ while(val < i)
+ {
+ val<<=1;
+ }
return val;
}
@@ -111,11 +115,15 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
public boolean visit(final QueueEntry entry)
{
if(!entry.isAvailable())
+ {
return false;
+ }
Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupId == null)
+ {
return false;
+ }
Integer group = groupId.hashCode() & _groupMask;
Subscription assignedSub = _groupMap.get(group);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
index fbc8b3af7d..632b59d3fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ClientDeliveryMethod.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.QueueEntry;
public interface ClientDeliveryMethod
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index 689e48b4cf..62e94f6f2e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.server.subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -188,7 +189,9 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
public boolean visit(final QueueEntry entry)
{
if(!entry.isAvailable())
+ {
return false;
+ }
Object groupId = getKey(entry);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
index 80c5e2866c..cf2754862d 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.transport.ServerSession;
+
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
index a61b0b4e82..1e37675c98 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.transport.ServerSession;
+
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
@@ -71,8 +72,6 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public boolean acquire()
{
boolean acquired = _entry.acquire(getSubscription());
- //TODO - why acknowledge here??? seems bizarre...
- // getSubscription().getSession().acknowledge(getSubscription(), _entry);
return acquired;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index bc1be90531..66825caa24 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.subscription;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
index 0fd7fdffe5..3659243cea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactory.java
@@ -20,22 +20,21 @@
*/
package org.apache.qpid.server.subscription;
-import java.util.Map;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageFlowMode;
+import java.util.Map;
+
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
* primarily assists testing although in future more sophisticated subscribers may need a different subscription
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
index 1622d63648..a2e30b6ae7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionFactoryImpl.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.subscription;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -38,6 +35,9 @@ import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageFlowMode;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 23ae14eef1..1f25c215cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
@@ -320,9 +319,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final Boolean _autoClose;
-
- private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
-
private AMQQueue _queue;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -479,10 +475,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean hasInterest(QueueEntry entry)
{
-
-
-
-
//check that the message hasn't been rejected
if (entry.isRejectedBy(getSubscriptionID()))
{
@@ -490,27 +482,21 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
_logger.debug("Subscription:" + this + " rejected message:" + entry);
}
-// return false;
}
if (_noLocal)
{
-
AMQMessage message = (AMQMessage) entry.getMessage();
- //todo - client id should be recorded so we don't have to handle
- // the case where this is null.
- final Object publisher = message.getPublisherIdentifier();
+ final Object publisherReference = message.getConnectionIdentifier();
// We don't want local messages so check to see if message is one we sent
- Object localInstance = getProtocolSession();
+ Object localReference = getProtocolSession().getReference();
- if(publisher.equals(localInstance))
+ if(publisherReference != null && publisherReference.equals(localReference))
{
return false;
}
-
-
}
@@ -585,7 +571,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
+ return !_creditManager.useCreditForMessage(msg.getMessage().getSize());
}
public boolean trySendLock()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
index 3e6299cb8a..bf5ce31bd9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionList.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.subscription.Subscription;
-
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class SubscriptionList
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index bde756dd03..76d975a789 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -20,46 +20,58 @@
*/
package org.apache.qpid.server.subscription;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfigType;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.CreditCreditManager;
-import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.Struct;
import org.apache.qpid.url.AMQBindingURL;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -67,13 +79,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ConcurrentHashMap;
-import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
@@ -387,6 +398,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
}
+ if(origDeliveryProps.hasTtl())
+ {
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
+ }
}
@@ -537,36 +552,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
}
-
// TODO - ReplyTo
-
- final Map<String, Object> appHeaders = new HashMap<String, Object>();
-
- /*properties.getHeaders().processOverElements(
- new FieldTable.FieldTableElementProcessor()
- {
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- Object val = value.getValue();
- if(val instanceof AMQShortString)
- {
- val = val.toString();
- }
- appHeaders.put(propertyName, val);
- return true;
- }
-
- public Object getResult()
- {
- return appHeaders;
- }
- });
-
-
- messageProps.setApplicationHeaders(appHeaders);
-*/
Header header = new Header(deliveryProps, messageProps, null);
xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
: new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
@@ -690,7 +677,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
entry.setRedelivered();
entry.routeToAlternate();
-
+ if(entry.isAcquiredBy(this))
+ {
+ entry.discard();
+ }
}
void release(final QueueEntry entry, final boolean setRedelivered)