summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java100
1 files changed, 45 insertions, 55 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index bde756dd03..76d975a789 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -20,46 +20,58 @@
*/
package org.apache.qpid.server.subscription;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfigType;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.CreditCreditManager;
-import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
-import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.transport.ServerSession;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.Struct;
import org.apache.qpid.url.AMQBindingURL;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
+
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -67,13 +79,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ConcurrentHashMap;
-import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
@@ -387,6 +398,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
}
+ if(origDeliveryProps.hasTtl())
+ {
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
+ }
}
@@ -537,36 +552,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
}
-
// TODO - ReplyTo
-
- final Map<String, Object> appHeaders = new HashMap<String, Object>();
-
- /*properties.getHeaders().processOverElements(
- new FieldTable.FieldTableElementProcessor()
- {
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- Object val = value.getValue();
- if(val instanceof AMQShortString)
- {
- val = val.toString();
- }
- appHeaders.put(propertyName, val);
- return true;
- }
-
- public Object getResult()
- {
- return appHeaders;
- }
- });
-
-
- messageProps.setApplicationHeaders(appHeaders);
-*/
Header header = new Header(deliveryProps, messageProps, null);
xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
: new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
@@ -690,7 +677,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
entry.setRedelivered();
entry.routeToAlternate();
-
+ if(entry.isAcquiredBy(this))
+ {
+ entry.discard();
+ }
}
void release(final QueueEntry entry, final boolean setRedelivered)