diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java | 100 |
1 files changed, 45 insertions, 55 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index bde756dd03..76d975a789 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -20,46 +20,58 @@ */ package org.apache.qpid.server.subscription; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; - -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SubscriptionConfig; import org.apache.qpid.server.configuration.SubscriptionConfigType; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.CreditCreditManager; -import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.FlowCreditManager_0_10; -import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.flow.WindowCreditManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.transport.ServerSession; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.*; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.Struct; import org.apache.qpid.url.AMQBindingURL; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; + import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -67,13 +79,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.ConcurrentHashMap; -import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { @@ -387,6 +398,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); } + if(origDeliveryProps.hasTtl()) + { + deliveryProps.setTtl(origDeliveryProps.getTtl()); + } } @@ -537,36 +552,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); } - // TODO - ReplyTo - - final Map<String, Object> appHeaders = new HashMap<String, Object>(); - - /*properties.getHeaders().processOverElements( - new FieldTable.FieldTableElementProcessor() - { - - public boolean processElement(String propertyName, AMQTypedValue value) - { - Object val = value.getValue(); - if(val instanceof AMQShortString) - { - val = val.toString(); - } - appHeaders.put(propertyName, val); - return true; - } - - public Object getResult() - { - return appHeaders; - } - }); - - - messageProps.setApplicationHeaders(appHeaders); -*/ Header header = new Header(deliveryProps, messageProps, null); xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); @@ -690,7 +677,10 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { entry.setRedelivered(); entry.routeToAlternate(); - + if(entry.isAcquiredBy(this)) + { + entry.discard(); + } } void release(final QueueEntry entry, final boolean setRedelivered) |