From bede462542128d1bb0060bc5bec8f31e3f2b5aff Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 31 Aug 2009 11:36:26 +0000 Subject: more changes to pass the 0-10 python tests git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@809544 13f79535-47bb-0310-9956-ffa450edef68 --- .../exchanges/diagnostic/DiagnosticExchange.java | 8 + .../qpid/server/exchange/AbstractExchange.java | 8 + .../qpid/server/exchange/DirectExchange.java | 10 + .../org/apache/qpid/server/exchange/Exchange.java | 4 + .../qpid/server/exchange/HeadersBinding.java | 8 +- .../qpid/server/exchange/HeadersExchange.java | 25 +- .../qpid/server/flow/FlowCreditManager_0_10.java | 2 +- .../qpid/server/flow/WindowCreditManager.java | 2 +- .../qpid/server/handler/QueueDeclareHandler.java | 8 +- .../server/message/MessageTransferMessage.java | 21 +- .../apache/qpid/server/queue/AMQPriorityQueue.java | 11 +- .../org/apache/qpid/server/queue/AMQQueue.java | 16 + .../apache/qpid/server/queue/AMQQueueMBean.java | 6 +- .../org/apache/qpid/server/queue/QueueEntry.java | 4 + .../apache/qpid/server/queue/QueueEntryImpl.java | 33 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 289 ++++++++------ .../qpid/server/queue/SimpleQueueEntryList.java | 1 + .../security/access/PrincipalPermissions.java | 5 +- .../qpid/server/store/DerbyMessageStore.java | 8 +- .../MessageAcceptCompletionListener.java | 8 +- .../qpid/server/subscription/Subscription.java | 7 +- .../qpid/server/subscription/SubscriptionImpl.java | 17 +- .../server/subscription/Subscription_0_10.java | 88 +++-- .../qpid/server/transport/ServerSession.java | 116 +++++- .../server/transport/ServerSessionDelegate.java | 413 +++++++++++++++++---- .../apache/qpid/server/txn/LocalTransaction.java | 5 +- .../org/apache/qpid/server/txn/Transaction.java | 2 + .../exchange/AbstractHeadersExchangeTestBase.java | 10 + .../org/apache/qpid/server/queue/MockAMQQueue.java | 28 ++ .../apache/qpid/server/queue/MockQueueEntry.java | 12 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 10 +- .../qpid/server/subscription/MockSubscription.java | 22 +- .../org/apache/qpid/framing/AMQTypedValue.java | 63 ++++ .../java/org/apache/qpid/framing/FieldTable.java | 21 ++ .../java/org/apache/qpid/transport/Connection.java | 9 +- .../java/org/apache/qpid/transport/Method.java | 5 + .../java/org/apache/qpid/transport/Session.java | 2 +- .../qpid/transport/network/io/IoReceiver.java | 1 + .../qpid/server/queue/SubscriptionTestHelper.java | 17 +- 39 files changed, 1026 insertions(+), 299 deletions(-) diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index c17d8379f4..615a02a112 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -21,6 +21,7 @@ package org.apache.qpid.extras.exchanges.diagnostic; import java.util.ArrayList; +import java.util.Map; import javax.management.JMException; import javax.management.openmbean.OpenDataException; @@ -153,6 +154,12 @@ public class DiagnosticExchange extends AbstractExchange // No op } + public void registerQueue(String routingKey, AMQQueue queue, Map args) throws AMQException + { + // No op + } + + /** * Does nothing. * @@ -214,4 +221,5 @@ public class DiagnosticExchange extends AbstractExchange // TODO Auto-generated method stub return false; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index f1f87eb1d2..04c59fd63f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; + public abstract class AbstractExchange implements Exchange, Managable { private AMQShortString _name; @@ -206,6 +208,12 @@ public abstract class AbstractExchange implements Exchange, Managable return getVirtualHost().getQueueRegistry(); } + public boolean isBound(String bindingKey, Map arguments, AMQQueue queue) + { + return isBound(new AMQShortString(bindingKey), queue); + } + + public boolean isBound(String bindingKey, AMQQueue queue) { return isBound(new AMQShortString(bindingKey), queue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 02e83f3dd3..b40576f258 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -160,7 +160,17 @@ public class DirectExchange extends AbstractExchange return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; } + public void registerQueue(String routingKey, AMQQueue queue, Map args) throws AMQException + { + registerQueue(new AMQShortString(routingKey), queue); + } + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + registerQueue(routingKey, queue); + } + + private void registerQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 7ff5e4ef96..40a765f420 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.InboundMessage; import java.util.ArrayList; +import java.util.Map; public interface Exchange { @@ -51,6 +52,8 @@ public interface Exchange void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + + void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; ArrayList route(InboundMessage message) throws AMQException; @@ -96,4 +99,5 @@ public interface Exchange boolean isBound(String bindingKey, AMQQueue queue); boolean isBound(String bindingKey); + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index fde4cfd6a2..db3b023479 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -189,14 +189,14 @@ class HeadersBinding { for(Map.Entry entry : matches.entrySet()) { - if(!headers.containsHeader(entry.getKey()) - || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null) + if(headers.containsHeader(entry.getKey()) + || ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null) || (entry.getValue().equals(headers.getHeader(entry.getKey()))))) { - return false; + return true; } } - return true; + return false; } private boolean passesRequiredOr(AMQMessageHeader headers) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 02e129621c..0a2d678073 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -52,6 +52,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; /** * An exchange that binds queues based on a set of required headers and header values @@ -114,6 +115,7 @@ public class HeadersExchange extends AbstractExchange private final List _bindings = new CopyOnWriteArrayList(); + private Map _bindingByKey = new ConcurrentHashMap(); /** * HeadersExchangeMBean class implements the management interface for the @@ -211,7 +213,7 @@ public class HeadersExchange extends AbstractExchange bindingMap.setString(keyAndValue[0], keyAndValue[1]); } - _bindings.add(new Registration(new HeadersBinding(bindingMap), queue)); + _bindings.add(new Registration(new HeadersBinding(bindingMap), queue, new AMQShortString(binding))); } } // End of MBean class @@ -224,13 +226,17 @@ public class HeadersExchange extends AbstractExchange public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args); - _bindings.add(new Registration(new HeadersBinding(args), queue)); + + Registration registration = new Registration(new HeadersBinding(args), queue, routingKey); + _bindings.add(registration); + } public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName()); - if(!_bindings.remove(new Registration(new HeadersBinding(args), queue))) + + if(!_bindings.remove(new Registration(args == null ? null : new HeadersBinding(args), queue, routingKey))) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + " with headers args " + args); @@ -326,21 +332,28 @@ public class HeadersExchange extends AbstractExchange { private final HeadersBinding binding; private final AMQQueue queue; + private final AMQShortString routingKey; - Registration(HeadersBinding binding, AMQQueue queue) + Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey) { this.binding = binding; this.queue = queue; + this.routingKey = routingKey; } public int hashCode() { - return queue.hashCode(); + int queueHash = queue.hashCode(); + int routingHash = routingKey == null ? 0 : routingKey.hashCode(); + return queueHash + routingHash; } public boolean equals(Object o) { - return o instanceof Registration && ((Registration) o).queue.equals(queue); + return o instanceof Registration + && ((Registration) o).queue.equals(queue) + && (routingKey == null ? ((Registration)o).routingKey == null + : routingKey.equals(((Registration)o).routingKey)); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java index 9f6fccc7f8..48c336c0b1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.flow; */ public interface FlowCreditManager_0_10 extends FlowCreditManager { - public void addCredit(long bytes, long count); + public void addCredit(long count, long bytes); void clearCredit(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 940b89dba9..10f578551a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -182,7 +182,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } - public synchronized void addCredit(long bytes, long count) + public synchronized void addCredit(long count, long bytes) { if(bytes > 0) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b705ea7dba..1ab1ea916e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -109,6 +109,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener _sessionRef; - public MessageTransferMessage(MessageTransfer xfr) + public MessageTransferMessage(MessageTransfer xfr, WeakReference sessionRef) { _xfr = xfr; _messageNumber = _numberSource.getAndIncrement(); Header header = _xfr.getHeader(); - _deliveryProps = header.get(DeliveryProperties.class); - _messageProps = header.get(MessageProperties.class); + if(header != null) + { + _deliveryProps = header.get(DeliveryProperties.class); + _messageProps = header.get(MessageProperties.class); + } + else + { + _deliveryProps = null; + _messageProps = null; + } _messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps); _arrivalTime = System.currentTimeMillis(); + _sessionRef = sessionRef; } public String getRoutingKey() @@ -110,5 +121,9 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage return _xfr.getBody(); } + public Session getSession() + { + return _sessionRef == null ? null : _sessionRef.get(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 0339245b04..2383d6e0be 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -61,16 +61,19 @@ public class AMQPriorityQueue extends SimpleAMQQueue while(subIter.advance() && !entry.isAcquired()) { final Subscription subscription = subIter.getNode().getSubscription(); - QueueEntry subnode = subscription.getLastSeenEntry(); - while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()) + QueueContext context = (QueueContext) subscription.getQueueContext(); + QueueEntry subnode = context._lastSeenEntry; + QueueEntry released = context._releasedEntry; + while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0)) { - if(subscription.setLastSeenEntry(subnode,entry)) + if(_releasedUpdater.compareAndSet(context,released,entry)) { break; } else { - subnode = subscription.getLastSeenEntry(); + subnode = context._lastSeenEntry; + released = context._releasedEntry; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 8b69aac7c3..ac0363e76a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -28,16 +28,24 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import java.util.List; import java.util.Set; +import java.util.Map; public interface AMQQueue extends Managable, Comparable { + + public interface Context + { + QueueEntry getLastSeenEntry(); + } + AMQShortString getName(); boolean isDurable(); @@ -45,6 +53,8 @@ public interface AMQQueue extends Managable, Comparable boolean isAutoDelete(); AMQShortString getOwner(); + PrincipalHolder getPrincipalHolder(); + void setPrincipalHolder(PrincipalHolder principalHolder); VirtualHost getVirtualHost(); @@ -90,6 +100,8 @@ public interface AMQQueue extends Managable, Comparable void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; + void requeue(QueueEntryImpl storeContext, Subscription subscription); + void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; @@ -166,6 +178,10 @@ public interface AMQQueue extends Managable, Comparable void stop(); + boolean isExclusive(); + + Map getArguments(); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 8bb958ed3f..049b6b7604 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -165,7 +165,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public String getOwner() { - return String.valueOf(_queue.getOwner()); + return String.valueOf(_queue.getPrincipalHolder() == null + ? null + : _queue.getPrincipalHolder().getPrincipal() == null + ? null + : _queue.getPrincipalHolder().getPrincipal().getName()); } public boolean isAutoDelete() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 589f6919d5..7e4871158c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -3,6 +3,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.message.ServerMessage; /* @@ -151,6 +152,7 @@ public interface QueueEntry extends Comparable boolean isDeleted(); boolean acquiredBySubscription(); + boolean isAcquiredBy(Subscription subscription); void setDeliveredToSubscription(); @@ -172,6 +174,8 @@ public interface QueueEntry extends Comparable void requeue(StoreContext storeContext) throws AMQException; + void requeue(Subscription subscription); + void dequeue(final StoreContext storeContext) throws FailedDequeueException; void dispose(final StoreContext storeContext) throws MessageCleanupException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d69f4271d9..67bc87145a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -124,7 +124,7 @@ public class QueueEntryImpl implements QueueEntry public long getSize() { - return getMessage().getSize(); + return getMessage() == null ? 0 : getMessage().getSize(); } public boolean getDeliveredToConsumer() @@ -134,14 +134,17 @@ public class QueueEntryImpl implements QueueEntry public boolean expired() throws AMQException { - long expiration = getMessage().getExpiration(); - if (expiration != 0L) + ServerMessage message = getMessage(); + if(message != null) { - long now = System.currentTimeMillis(); + long expiration = message.getExpiration(); + if (expiration != 0L) + { + long now = System.currentTimeMillis(); - return (now > expiration); + return (now > expiration); + } } - return false; } @@ -178,6 +181,13 @@ public class QueueEntryImpl implements QueueEntry return (_state instanceof SubscriptionAcquiredState); } + public boolean isAcquiredBy(Subscription subscription) + { + EntryState state = _state; + return state instanceof SubscriptionAcquiredState + && ((SubscriptionAcquiredState)state).getSubscription() == subscription; + } + public void setDeliveredToSubscription() { _deliveredToConsumer = true; @@ -263,6 +273,15 @@ public class QueueEntryImpl implements QueueEntry } } + public void requeue(Subscription subscription) + { + getQueue().requeue(this, subscription); + if(_stateChangeListeners != null) + { + notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); + } + } + public void dequeue(final StoreContext storeContext) throws FailedDequeueException { EntryState state = _state; @@ -272,7 +291,7 @@ public class QueueEntryImpl implements QueueEntry if (state instanceof SubscriptionAcquiredState) { Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); - s.restoreCredit(this); + s.onDequeue(this); } getQueue().dequeue(storeContext, this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 86d1411450..9f8a956448 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,15 +1,9 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import javax.management.JMException; @@ -22,15 +16,14 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.PrincipalHolder; /* * @@ -56,6 +49,38 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + + private PrincipalHolder _prinicpalHolder; + + + static final class QueueContext implements Context + { + volatile QueueEntry _lastSeenEntry; + volatile QueueEntry _releasedEntry; + + public QueueContext(QueueEntry head) + { + _lastSeenEntry = head; + } + + public QueueEntry getLastSeenEntry() + { + return _lastSeenEntry; + } + } + + + static final AtomicReferenceFieldUpdater + _lastSeenUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueContext.class, QueueEntry.class, "_lastSeenEntry"); + + static final AtomicReferenceFieldUpdater + _releasedUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueContext.class, QueueEntry.class, "_releasedEntry"); + + private final AMQShortString _name; /** null means shared */ @@ -167,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost); + this(new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost); } public void resetNotifications() @@ -191,6 +216,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _durable; } + public boolean isExclusive() + { + return _owner != null; + } + + public Map getArguments() + { + return null; + } + public boolean isAutoDelete() { return _autoDelete; @@ -201,6 +236,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _owner; } + public PrincipalHolder getPrincipalHolder() + { + return _prinicpalHolder; + } + + public void setPrincipalHolder(PrincipalHolder prinicpalHolder) + { + _prinicpalHolder = prinicpalHolder; + } + + public VirtualHost getVirtualHost() { return _virtualHost; @@ -208,6 +254,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ bind and unbind + public void bind(Exchange exchange, String bindingKey, Map arguments) throws AMQException + { + + FieldTable fieldTable = FieldTable.convertToFieldTable(arguments); + AMQShortString routingKey = new AMQShortString(bindingKey); + + exchange.registerQueue(routingKey, this, fieldTable); + + if (isDurable() && exchange.isDurable()) + { + + _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, fieldTable); + } + + _bindings.addBinding(routingKey, fieldTable, exchange); + } + + public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException { exchange.registerQueue(routingKey, this, arguments); @@ -264,7 +328,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); subscription.setStateListener(this); - subscription.setLastSeenEntry(null, _entries.getHead()); + subscription.setQueueContext(new QueueContext(_entries.getHead())); if (!isDeleted()) { @@ -298,17 +362,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener subscription.close(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); - - QueueEntry lastSeen; - - while ((lastSeen = subscription.getLastSeenEntry()) != null) - { - subscription.setLastSeenEntry(lastSeen, null); - } + subscription.setQueueContext(null); // auto-delete queues must be deleted if there are no remaining subscribers - if (_autoDelete && getConsumerCount() == 0) + if (_autoDelete && getConsumerCount() == 0 && !isExclusive()) { if (_logger.isInfoEnabled()) { @@ -449,13 +507,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription - sub.restoreCredit(entry); + sub.onDequeue(entry); } else { - deliverMessage(sub, entry); - } } } @@ -487,69 +543,43 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _deliveredMessages.incrementAndGet(); sub.send(entry); - + setLastSeenEntry(sub,entry); } - private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) + private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException { + return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); + } - // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no - // interest in. - QueueEntry node = sub.getLastSeenEntry(); - while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node))) - { - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - else - { - node = null; - break; - } - - } + private void setLastSeenEntry(final Subscription sub, final QueueEntry entry) + { + QueueContext subContext = (QueueContext) sub.getQueueContext(); + QueueEntry releasedEntry = subContext._releasedEntry; - if (node == entry) + _lastSeenUpdater.set(subContext, entry); + if(releasedEntry == entry) { - // If the first entry that subscription can process is the one we are trying to deliver to it, then we are - // good - return true; + _releasedUpdater.compareAndSet(subContext, releasedEntry, null); } - else - { - // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing - // no-one else has updated it to something furhter on in the list - //TODO - check - //updateLastSeenEntry(sub, entry); - return false; - } - } - private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry) + private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry) { - QueueEntry node = sub.getLastSeenEntry(); - if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry)) + QueueContext subContext = (QueueContext) sub.getQueueContext(); + if(subContext != null) { - do + QueueEntry oldEntry; + + while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0) { - if (sub.setLastSeenEntry(node, entry)) - { - return; - } - else + if(_releasedUpdater.compareAndSet(subContext, oldEntry, entry)) { - node = sub.getLastSeenEntry(); + break; } } - while (node != null && entry.compareTo(node) < 0); } - } public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException @@ -564,7 +594,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // we don't make browsers send the same stuff twice if (sub.seesRequeues()) { - updateLastSeenEntry(sub, entry); + updateSubRequeueEntry(sub, entry); } } @@ -572,6 +602,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void requeue(QueueEntryImpl entry, Subscription subscription) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues() && (!sub.acquires() && sub == subscription)) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + } + public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { decrementQueueCount(); @@ -1248,12 +1296,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; - boolean advanced = false; + boolean subActive = sub.isActive(); if (subActive) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (!(node.isAcquired() || node.isDeleted())) + + + QueueEntry node = getNextAvailableEntry(sub); + + if (node != null && !(node.isAcquired() || node.isDeleted())) { if (!sub.isSuspended()) { @@ -1263,23 +1314,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (sub.acquires() && !node.acquire(sub)) { - sub.restoreCredit(node); + sub.onDequeue(node); } else { deliverMessage(sub, node); - - if (!sub.acquires()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - } } } @@ -1291,19 +1330,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener node.addStateChangeListener(new QueueEntryListener(sub, node)); } } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } + } } - atTail = (_entries.next(node) == null) && !advanced; + atTail = (node == null) || (_entries.next(node) == null); } return atTail || !subActive; } @@ -1315,40 +1346,59 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode(); Subscription sub = subNode.getSubscription(); - moveSubscriptionToNextNode(sub); + if(sub.acquires()) + { + getNextAvailableEntry(sub); + } + else + { + // TODO + } } } - private QueueEntry moveSubscriptionToNextNode(final Subscription sub) + private QueueEntry getNextAvailableEntry(final Subscription sub) throws AMQException { - QueueEntry node = sub.getLastSeenEntry(); - - while (node != null && (node.isAcquired() || node.isDeleted() || node.expired())) + QueueContext context = (QueueContext) sub.getQueueContext(); + if(context != null) { - if (!node.isAcquired() && !node.isDeleted() && node.expired()) + QueueEntry lastSeen = context._lastSeenEntry; + QueueEntry releasedNode = context._releasedEntry; + + QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); + + boolean expired = false; + while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()))) { - if (node.acquire()) + if (expired) { - final StoreContext reapingStoreContext = new StoreContext(); - node.discard(reapingStoreContext); + expired = false; + if (node.acquire()) + { + final StoreContext reapingStoreContext = new StoreContext(); + node.discard(reapingStoreContext); + } } - } - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - else - { - break; - } + if(_lastSeenUpdater.compareAndSet(context, lastSeen, node)) + { + _releasedUpdater.compareAndSet(context, releasedNode, null); + } + + lastSeen = context._lastSeenEntry; + releasedNode = context._releasedEntry; + node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); + } + return node; + } + else + { + return null; } - return node; } + private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; @@ -1386,12 +1436,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (sub != null) { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } + done = attemptDelivery(sub); } if (done) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index c5a2972720..7bdf516e44 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; */ public class SimpleQueueEntryList implements QueueEntryList { + private final QueueEntryImpl _head; private volatile QueueEntryImpl _tail; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java index f852514444..33213055ca 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java @@ -507,8 +507,9 @@ public class PrincipalPermissions // This will allow consumption from any temporary queue including ones not owned by this user. // Of course the exclusivity will not be broken. { + // if not limited to ownQueuesOnly then ok else check queue Owner. - return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; + return (!ownQueuesOnly || new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED; } else { @@ -522,7 +523,7 @@ public class PrincipalPermissions // if no queues are listed then ALL are ok othereise it must be specified. if (ownQueuesOnly) { - if (queue.getOwner().equals(_user)) + if ( new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user)) { return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 34e35171e5..a8ba372f46 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -782,8 +782,14 @@ public class DerbyMessageStore implements MessageStore { stmt = conn.prepareStatement(INSERT_INTO_QUEUE); + String owner = queue.getPrincipalHolder() == null + ? null + : queue.getPrincipalHolder().getPrincipal() == null + ? null + : queue.getPrincipalHolder().getPrincipal().getName(); + stmt.setString(1, queue.getName().toString()); - stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString()); + stmt.setString(2, owner); stmt.execute(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java index 76a6580408..a980347633 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java @@ -30,18 +30,24 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final Subscription_0_10 _sub; private final QueueEntry _entry; private final ServerSession _session; + private boolean _restoreCredit; - public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry) + public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit) { super(); _sub = sub; _entry = entry; _session = session; + _restoreCredit = restoreCredit; } public void onComplete(Method method) { _session.acknowledge(_sub, _entry); + if(_restoreCredit) + { + _sub.restoreCredit(_entry); + } _session.removeDispositionListener(method); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index a99ca3b118..87f4fd5c7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -30,7 +30,6 @@ public interface Subscription { - public static enum State { ACTIVE, @@ -76,15 +75,17 @@ public interface Subscription void releaseSendLock(); + void onDequeue(final QueueEntry queueEntry); + void restoreCredit(final QueueEntry queueEntry); void setStateListener(final StateListener listener); public State getState(); - QueueEntry getLastSeenEntry(); + AMQQueue.Context getQueueContext(); - boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue); + void setQueueContext(AMQQueue.Context queueContext); boolean isActive(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 382cda08da..68581acc14 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -61,7 +61,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private final AtomicReference _state = new AtomicReference(State.ACTIVE); - private final AtomicReference _queueContext = new AtomicReference(null); + private AMQQueue.Context _queueContext; + private final ClientDeliveryMethod _deliveryMethod; private final RecordDeliveryMethod _recordMethod; @@ -544,12 +545,18 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _queue; } + public void onDequeue(final QueueEntry queueEntry) + { + restoreCredit(queueEntry); + } + public void restoreCredit(final QueueEntry queueEntry) { _creditManager.restoreCredit(1, queueEntry.getSize()); } + public void creditStateChanged(boolean hasCredit) { @@ -586,14 +593,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return _queueContext.get(); + return _queueContext; } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue) + public void setQueueContext(AMQQueue.Context context) { - return _queueContext.compareAndSet(expected,newvalue); + _queueContext = context; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 999d268181..9046175c84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; -import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener { @@ -51,7 +50,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final Lock _stateChangeLock = new ReentrantLock(); private final AtomicReference _state = new AtomicReference(State.ACTIVE); - private final AtomicReference _queueContext = new AtomicReference(null); + private AMQQueue.Context _queueContext; private final AtomicBoolean _deleted = new AtomicBoolean(false); @@ -76,16 +75,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final ServerSession _session; private AtomicBoolean _stopped = new AtomicBoolean(true); private ConcurrentHashMap _sentMap = new ConcurrentHashMap(); + private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode, - MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters) + MessageAcquireMode acquireMode, + MessageFlowMode flowMode, + FlowCreditManager_0_10 creditManager, + FilterManager filters) { _session = session; _destination = destination; _acceptMode = acceptMode; _acquireMode = acquireMode; _creditManager = creditManager; + _flowMode = flowMode; _filters = filters; _creditManager.addStateListener(this); _state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED); @@ -139,10 +143,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr - if (_noLocal) + if (_noLocal + && (entry.getMessage() instanceof MessageTransferMessage) + && ((MessageTransferMessage)entry.getMessage()).getSession() == _session) { - - + return false; } @@ -241,18 +246,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr MessageTransferMessage msg = (MessageTransferMessage) serverMsg; - - MessageTransfer xfr = new MessageTransfer(); - xfr.setDestination(_destination); - if(msg.getBody() != null) + Struct[] headers; + if(msg.getHeader() == null) { - xfr.setBody(msg.getBody()); + headers = EMPTY_STRUCT_ARRAY; + } + else + { + headers = msg.getHeader().getStructs(); } - - xfr.setAcceptMode(_acceptMode); - xfr.setAcquireMode(_acquireMode); - - Struct[] headers = msg.getHeader().getStructs(); ArrayList newHeaders = new ArrayList(headers.length); DeliveryProperties origDeliveryProps = null; @@ -297,11 +299,23 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr deliveryProps.setRedelivered(entry.isRedelivered()); newHeaders.add(deliveryProps); - xfr.setHeader(new Header(newHeaders)); + Header header = new Header(newHeaders); + + MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); if(_acceptMode == MessageAcceptMode.NONE) { - xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry)); + xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW)); + } + else if(_flowMode == MessageFlowMode.WINDOW) + { + xfr.setCompletionListener(new Method.CompletionListener() + { + public void onComplete(Method method) + { + restoreCredit(entry); + } + }); } @@ -329,6 +343,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { reject(entry); } + + public boolean acquire() + { + return entry.acquire(Subscription_0_10.this); + } }); } else @@ -350,6 +369,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr reject(entry); } + public boolean acquire() + { + boolean acquired = entry.acquire(Subscription_0_10.this); + _session.acknowledge(Subscription_0_10.this,entry); + return acquired; + + } + }); } @@ -367,6 +394,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr { entry.setRedelivered(true); entry.release(); + try + { + entry.requeue(new StoreContext()); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } } public void queueDeleted(AMQQueue queue) @@ -394,6 +429,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _creditManager.restoreCredit(1, queueEntry.getSize()); } + public void onDequeue(QueueEntry queueEntry) + { + + } + public void setStateListener(StateListener listener) { _stateListener = listener; @@ -404,14 +444,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return _state.get(); } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return _queueContext.get(); + return _queueContext; } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) + public void setQueueContext(AMQQueue.Context queueContext) { - return _queueContext.compareAndSet(expected, newValue); + _queueContext = queueContext; } public boolean isActive() @@ -453,7 +493,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr creditManager.addCredit(value, 0L); break; case BYTE: - creditManager.addCredit(0L, value); + creditManager.addCredit(0l, value); break; } @@ -472,6 +512,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void setFlowMode(MessageFlowMode flowMode) { + _creditManager.removeListener(this); switch(flowMode) @@ -485,6 +526,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr default: throw new RuntimeException("Unknown message flow mode: " + flowMode); } + _flowMode = flowMode; if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) { _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 8fc426a6f6..5eab6c14c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -29,20 +29,23 @@ import org.apache.qpid.server.txn.Transaction; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.PrincipalHolder; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentHashMap; import java.security.Principal; +import java.lang.ref.WeakReference; import static org.apache.qpid.util.Serial.*; import com.sun.security.auth.UserPrincipal; public class ServerSession extends Session implements PrincipalHolder { - + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); public static interface MessageDispositionChangeListener { @@ -51,6 +54,10 @@ public class ServerSession extends Session implements PrincipalHolder public void onRelease(); public void onReject(); + + public boolean acquire(); + + } public static interface Task @@ -66,22 +73,27 @@ public class ServerSession extends Session implements PrincipalHolder private Principal _principal; - private Map _subscriptions = new HashMap(); + private Map _subscriptions = new ConcurrentHashMap(); private final List _taskList = new CopyOnWriteArrayList(); + private final WeakReference _reference; + ServerSession(Connection connection, Binary name, long expiry) { super(connection, name, expiry); _transaction = new AutoCommitTransaction(); _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference(this); } ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { super(connection, delegate, name, expiry); _transaction = new AutoCommitTransaction(); + _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference(this); } public void enqueue(final ServerMessage message, ArrayList queues) @@ -104,6 +116,11 @@ public class ServerSession extends Session implements PrincipalHolder e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } + + public void onRollback() + { + // NO-OP + } }); } @@ -160,6 +177,54 @@ public class ServerSession extends Session implements PrincipalHolder }); } + public RangeSet acquire(RangeSet transfers) + { + RangeSet acquired = new RangeSet(); + + if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator rangeIter = transfers.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next); + if(changeListener.acquire()) + { + acquired.add(next); + } + } + + + } + + } + + + } + + return acquired; + } + public void dispositionChange(RangeSet ranges, MessageDispositionAction action) { if(!_messageDispositionListenerMap.isEmpty()) @@ -208,6 +273,7 @@ public class ServerSession extends Session implements PrincipalHolder public void onClose() { + _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { listener.onRelease(); @@ -217,7 +283,7 @@ public class ServerSession extends Session implements PrincipalHolder for (Task task : _taskList) { task.doTask(this); - } + } } @@ -231,32 +297,53 @@ public class ServerSession extends Session implements PrincipalHolder { sub.acknowledge(entry); } + + public void onRollback() + { + entry.release(); + + try + { + entry.requeue(new StoreContext()); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + + } }); } - public Map getSubscriptions() + public Collection getSubscriptions() { - return _subscriptions; + return _subscriptions.values(); } public void register(String destination, Subscription_0_10 sub) { - _subscriptions.put(destination, sub); + _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub); } public Subscription_0_10 getSubscription(String destination) { - return _subscriptions.get(destination); + return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination); } public void unregister(Subscription_0_10 sub) { - _subscriptions.remove(sub); + _subscriptions.remove(sub.getConsumerTag().toString()); try { sub.getSendLock(); - sub.getQueue().unregisterSubscription(sub); + AMQQueue queue = sub.getQueue(); + if(queue != null) + { + queue.unregisterSubscription(sub); + } } catch (AMQException e) @@ -285,11 +372,6 @@ public class ServerSession extends Session implements PrincipalHolder _transaction.rollback(); } - void setPrincipal(Principal principal) - { - _principal = principal; - } - public Principal getPrincipal() { return _principal; @@ -305,4 +387,10 @@ public class ServerSession extends Session implements PrincipalHolder _taskList.remove(task); } + public WeakReference getReference() + { + return _reference; + } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 9f08434fe8..c4a40c4676 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -24,10 +24,7 @@ import org.apache.qpid.transport.*; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; +import org.apache.qpid.server.exchange.*; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -39,14 +36,13 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.*; import java.util.ArrayList; import java.util.Map; +import java.util.Collection; public class ServerSessionDelegate extends SessionDelegate { @@ -90,7 +86,14 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageAcquire(Session session, MessageAcquire method) { - super.messageAcquire(session, method); + RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers()); + + Acquired result = new Acquired(acquiredRanges); + + + session.executionResult((int) method.getId(), result); + + } @Override @@ -129,35 +132,60 @@ public class ServerSessionDelegate extends SessionDelegate else { String destination = method.getDestination(); - String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); - - - AMQQueue queue = queueRegistry.getQueue(queueName); - if(queue == null) + if(((ServerSession)session).getSubscription(destination)!=null) { - exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'"); } else { + String queueName = method.getQueue(); + QueueRegistry queueRegistry = getQueueRegistry(session); - FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L); - - // TODO filters - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null); + AMQQueue queue = queueRegistry.getQueue(queueName); - ((ServerSession)session).register(destination, sub); - try + if(queue == null) { - queue.registerSubscription(sub, method.getExclusive()); + exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - catch (AMQException e) + else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { - // TODO - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - throw new RuntimeException(e); + exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); + } + else + { + + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); + + // TODO filters + + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, + destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, null); + + ((ServerSession)session).register(destination, sub); + try + { + queue.registerSubscription(sub, method.getExclusive()); + } + catch (AMQQueue.ExistingExclusiveSubscription existing) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer"); + } + catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive) + { + exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } } } } @@ -172,18 +200,34 @@ public class ServerSessionDelegate extends SessionDelegate if(xfr.hasDestination()) { exchange = exchangeRegistry.getExchange(xfr.getDestination()); + if(exchange == null) + { + exchange = exchangeRegistry.getDefaultExchange(); + } } else { exchange = exchangeRegistry.getDefaultExchange(); } - MessageTransferMessage message = new MessageTransferMessage(xfr); + MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference()); + + DeliveryProperties delvProps = null; + if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration()) + { + delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); + } + try { ArrayList queues = exchange.route(message); - ((ServerSession) ssn).enqueue(message, queues); + + + if(queues != null) + { + ((ServerSession) ssn).enqueue(message, queues); + } ssn.processed(xfr); } @@ -281,6 +325,10 @@ public class ServerSessionDelegate extends SessionDelegate else { // TODO - check exchange has same properties + if(!exchange.getType().toString().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); + } } } @@ -329,7 +377,10 @@ public class ServerSessionDelegate extends SessionDelegate } else { - // TODO check same as declared + if(!exchange.getType().toString().equals(method.getType())) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type"); + } } } @@ -343,7 +394,7 @@ public class ServerSessionDelegate extends SessionDelegate ex.setDescription(description); session.invoke(ex); - session.close(); + //session.close(); } private Exchange getExchange(Session session, String exchangeName) @@ -431,12 +482,128 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void exchangeBind(Session session, ExchangeBind method) { - super.exchangeBind(session, method); + + VirtualHost virtualHost = getVirtualHost(session); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + if (!method.hasQueue()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); + } + else if (!method.hasExchange()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set"); + } +/* + else if (!method.hasBindingKey()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); + } +*/ + else + { + //TODO - here because of non-compiant python tests + if (!method.hasBindingKey()) + { + method.setBindingKey(method.getQueue()); + } + AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); + } + else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange, + queue, new AMQShortString(method.getBindingKey()))) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange() + + "' to Queue: '" + method.getQueue() + + "' not allowed"); + } + else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) + { + exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header"); + } + else + { + try + { + AMQShortString routingKey = new AMQShortString(method.getBindingKey()); + FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments()); + + if (!exchange.isBound(routingKey, fieldTable, queue)) + { + queue.bind(exchange, routingKey, fieldTable); + } + else + { + // todo + } + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + + } + + + } @Override public void exchangeUnbind(Session session, ExchangeUnbind method) { + VirtualHost virtualHost = getVirtualHost(session); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + if (!method.hasQueue()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set"); + } + else if (!method.hasExchange()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set"); + } + else if (!method.hasBindingKey()) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set"); + } + else + { + AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + Exchange exchange = exchangeRegistry.getExchange(method.getExchange()); + if(queue == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found"); + } + else if(exchange == null) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); + } + else + { + try + { + queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new RuntimeException(e); + } + } + } + + super.exchangeUnbind(session, method); } @@ -445,41 +612,65 @@ public class ServerSessionDelegate extends SessionDelegate { ExchangeBoundResult result = new ExchangeBoundResult(); + Exchange exchange; + AMQQueue queue; if(method.hasExchange()) { - Exchange exchange = getExchange(session, method.getExchange()); + exchange = getExchange(session, method.getExchange()); if(exchange == null) { result.setExchangeNotFound(true); } + } + else + { + exchange = getExchangeRegistry(session).getDefaultExchange(); + } - if(method.hasQueue()) + + if(method.hasQueue()) + { + + queue = getQueue(session, method.getQueue()); + if(queue == null) { + result.setQueueNotFound(true); + } + - AMQQueue queue = getQueue(session, method.getQueue()); - if(queue == null) - { - result.setQueueNotFound(true); - } + if(exchange != null && queue != null) + { + + boolean queueMatched = exchange.isBound(queue); + + result.setQueueNotMatched(!queueMatched); - if(exchange != null && queue != null) + + if(method.hasBindingKey()) { - if(method.hasBindingKey()) + if(method.hasArguments()) + { + // TODO + } + if(queueMatched) { - - if(method.hasArguments()) - { - // TODO - } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); - } + else + { + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + } + } + else if (method.hasArguments()) + { + // TODO + + } - result.setQueueNotMatched(!exchange.isBound(queue)); + result.setQueueNotMatched(!exchange.isBound(queue)); - } } else if(exchange != null && method.hasBindingKey()) { @@ -492,31 +683,20 @@ public class ServerSessionDelegate extends SessionDelegate } } - else if(method.hasQueue()) + else if(exchange != null && method.hasBindingKey()) { - AMQQueue queue = getQueue(session, method.getQueue()); - if(queue == null) + if(method.hasArguments()) { - result.setQueueNotFound(true); - } - else - { - if(method.hasBindingKey()) - { - if(method.hasArguments()) - { - // TODO - } - - // TODO - } + // TODO } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } session.executionResult((int) method.getId(), result); + } private AMQQueue getQueue(Session session, String queue) @@ -580,6 +760,11 @@ public class ServerSessionDelegate extends SessionDelegate try { queue = createQueue(queueName, method, virtualHost, (ServerSession)session); + if(method.getExclusive()) + { + queue.setPrincipalHolder((ServerSession)session); + } + if (queue.isDurable() && !queue.isAutoDelete()) { @@ -597,6 +782,64 @@ public class ServerSessionDelegate extends SessionDelegate queue.bind(defaultExchange, new AMQShortString(queueName), null); } + + if(method.hasAutoDelete() + && method.getAutoDelete() + && method.hasExclusive() + && method.getExclusive()) + { + final AMQQueue q = queue; + final ServerSession.Task deleteQueueTask = new ServerSession.Task() + { + + public void doTask(ServerSession session) + { + try + { + q.delete(); + } + catch (AMQException e) + { + //TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(deleteQueueTask); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + + public void doTask(AMQQueue queue) throws AMQException + { + s.removeSessionCloseTask(deleteQueueTask); + } + }); + } + else if(method.getExclusive()) + { + { + final AMQQueue q = queue; + final ServerSession.Task removeExclusive = new ServerSession.Task() + { + + public void doTask(ServerSession session) + { + q.setPrincipalHolder(null); + } + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(removeExclusive); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + + public void doTask(AMQQueue queue) throws AMQException + { + s.removeSessionCloseTask(removeExclusive); + } + }); + } + } } catch (AMQException e) { @@ -605,13 +848,12 @@ public class ServerSessionDelegate extends SessionDelegate } } } - else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName()))) + else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session))) { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "')"; + + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; exception(session, method, errorCode, description); @@ -695,7 +937,11 @@ public class ServerSessionDelegate extends SessionDelegate } else { - if (method.getIfEmpty() && !queue.isEmpty()) + if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) + { + exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); + } + else if (method.getIfEmpty() && !queue.isEmpty()) { exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty"); } @@ -746,7 +992,7 @@ public class ServerSessionDelegate extends SessionDelegate String queueName = method.getQueue(); if(queueName == null || queueName.length()==0) { - exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied"); + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied"); } else @@ -778,7 +1024,25 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void queueQuery(Session session, QueueQuery method) { - super.queueQuery(session, method); + QueueQueryResult result = new QueueQueryResult(); + + AMQQueue queue = getQueue(session, method.getQueue()); + + if(queue != null) + { + result.setQueue(queue.getName().toString()); + result.setDurable(queue.isDurable()); + result.setExclusive(queue.isExclusive()); + result.setAutoDelete(queue.isAutoDelete()); + result.setArguments(queue.getArguments()); + result.setMessageCount(queue.getMessageCount()); + result.setSubscriberCount(queue.getConsumerCount()); + + } + + + session.executionResult((int) method.getId(), result); + } @Override @@ -835,15 +1099,14 @@ public class ServerSessionDelegate extends SessionDelegate public void closed(Session session) { super.closed(session); - for(Subscription_0_10 sub : getSubscriptions(session).values()) + for(Subscription_0_10 sub : getSubscriptions(session)) { - sub.close(); + ((ServerSession)session).unregister(sub); } ((ServerSession)session).onClose(); - ((ServerSession)session).onClose(); } - public Map getSubscriptions(Session session) + public Collection getSubscriptions(Session session) { return ((ServerSession)session).getSubscriptions(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 92347b9927..6db98ef101 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -40,7 +40,10 @@ public class LocalTransaction implements Transaction try { - + for(Action action : _postCommitActions) + { + action.onRollback(); + } } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java index 91f429f48e..85719737f1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java @@ -30,6 +30,8 @@ public interface Transaction public static interface Action { public void postCommit(); + + public void onRollback(); } void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index db0ec1c4fa..ef345f45c4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -318,6 +318,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return false; //To change body of implemented methods use File | Settings | File Templates. } + public boolean isAcquiredBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + public void setDeliveredToSubscription() { //To change body of implemented methods use File | Settings | File Templates. @@ -368,6 +373,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } + public void requeue(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void dequeue(final StoreContext storeContext) throws FailedDequeueException { //To change body of implemented methods use File | Settings | File Templates. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 966fb63186..2cdc002f27 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.PrincipalHolder; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -46,6 +47,8 @@ public class MockAMQQueue implements AMQQueue private boolean _deleted = false; private AMQShortString _name; + private PrincipalHolder _principalHolder; + public MockAMQQueue(String name) { _name = new AMQShortString(name); @@ -171,6 +174,11 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public void requeue(QueueEntryImpl storeContext, Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { //To change body of implemented methods use File | Settings | File Templates. @@ -312,6 +320,16 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public boolean isExclusive() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Map getArguments() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -333,4 +351,14 @@ public class MockAMQQueue implements AMQQueue } + public PrincipalHolder getPrincipalHolder() + { + return _principalHolder; + } + + public void setPrincipalHolder(PrincipalHolder principalHolder) + { + _principalHolder = principalHolder; + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 08b4573f33..84b3b09c8e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -44,6 +44,11 @@ public class MockQueueEntry implements QueueEntry return false; } + public boolean isAcquiredBy(Subscription subscription) + { + return false; + } + public void addStateChangeListener(StateChangeListener listener) { @@ -163,7 +168,12 @@ public class MockQueueEntry implements QueueEntry } - + public void requeue(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 72bbd7fe0c..cc2a56e1d2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -186,7 +186,7 @@ public class SimpleAMQQueueTest extends TestCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check removing the subscription removes it's information from the queue _queue.unregisterSubscription(_subscription); @@ -197,7 +197,7 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage messageB = createMessage(new Long (25)); _queue.enqueue(messageB); - QueueEntry entry = _subscription.getLastSeenEntry(); + QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); assertNull(entry); } @@ -207,7 +207,7 @@ public class SimpleAMQQueueTest extends TestCase _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); } public void testExclusiveConsumer() throws AMQException @@ -224,7 +224,7 @@ public class SimpleAMQQueueTest extends TestCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA); - assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); + assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue Subscription subB = new MockSubscription(); @@ -273,7 +273,7 @@ public class SimpleAMQQueueTest extends TestCase Long id = new Long(26); AMQMessage message = createMessage(id); _queue.enqueue(message); - QueueEntry entry = _subscription.getLastSeenEntry(); + QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 51d9bd8be2..c7ea2067a6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -39,7 +39,7 @@ public class MockSubscription implements Subscription private AMQShortString tag = new AMQShortString("mocktag"); private AMQQueue queue = null; private StateListener _listener = null; - private QueueEntry lastSeen = null; + private AMQQueue.Context _queueContext = null; private State _state = State.ACTIVE; private ArrayList messages = new ArrayList(); private final Lock _stateChangeLock = new ReentrantLock(); @@ -69,9 +69,9 @@ public class MockSubscription implements Subscription return tag ; } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return lastSeen; + return _queueContext; } public SubscriptionAcquiredState getOwningState() @@ -147,25 +147,23 @@ public class MockSubscription implements Subscription { } + public void onDequeue(QueueEntry queueEntry) + { + } + public void restoreCredit(QueueEntry queueEntry) { + //To change body of implemented methods use File | Settings | File Templates. } public void send(QueueEntry msg) throws AMQException { - lastSeen = msg; messages.add(msg); } - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) + public void setQueueContext(AMQQueue.Context queueContext) { - boolean result = false; - if (expected != null) - { - result = (expected.equals(lastSeen)); - } - lastSeen = newValue; - return result; + _queueContext = queueContext; } public void setQueue(AMQQueue queue) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 1ff39ca790..647d531476 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -22,6 +22,10 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import java.util.Date; +import java.util.Map; +import java.math.BigDecimal; + /** * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides @@ -113,4 +117,63 @@ public class AMQTypedValue return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode()); } + + public static AMQTypedValue toTypedValue(Object val) + { + if(val == null) + { + return AMQType.VOID.asTypedValue(null); + } + + Class klass = val.getClass(); + if(klass == String.class) + { + return AMQType.ASCII_STRING.asTypedValue(val); + } + else if(klass == Character.class) + { + return AMQType.ASCII_CHARACTER.asTypedValue(val); + } + else if(klass == Integer.class) + { + return AMQType.INT.asTypedValue(val); + } + else if(klass == Long.class) + { + return AMQType.LONG.asTypedValue(val); + } + else if(klass == Float.class) + { + return AMQType.FLOAT.asTypedValue(val); + } + else if(klass == Double.class) + { + return AMQType.DOUBLE.asTypedValue(val); + } + else if(klass == Date.class) + { + return AMQType.TIMESTAMP.asTypedValue(val); + } + else if(klass == Byte.class) + { + return AMQType.BYTE.asTypedValue(val); + } + else if(klass == Boolean.class) + { + return AMQType.BOOLEAN.asTypedValue(val); + } + else if(klass == byte[].class) + { + return AMQType.BINARY.asTypedValue(val); + } + else if(klass == BigDecimal.class) + { + return AMQType.DECIMAL.asTypedValue(val); + } + else if(val instanceof Map) + { + return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val)); + } + return null; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index e9034d25d3..9b2f9b3969 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -828,6 +828,7 @@ public class FieldTable recalculateEncodedSize(); } + public static interface FieldTableElementProcessor { public boolean processElement(String propertyName, AMQTypedValue value); @@ -1187,4 +1188,24 @@ public class FieldTable return _properties.equals(f._properties); } + + public static FieldTable convertToFieldTable(Map map) + { + if (map != null) + { + FieldTable table = new FieldTable(); + for(Map.Entry entry : map.entrySet()) + { + table.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + + return table; + } + else + { + return null; + } + } + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 08842c94c0..3403b591f3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -316,7 +316,14 @@ public class Connection extends ConnectionInvoker public void dispatch(Method method) { Session ssn = getSession(method.getChannel()); - ssn.received(method); + if(ssn != null) + { + ssn.received(method); + } + else + { + // TODO + } } public int getChannelMax() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java index 4b7f711bff..3c80180d0b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java @@ -179,6 +179,11 @@ public abstract class Method extends Struct implements ProtocolEvent } } + public boolean hasCompletionListener() + { + return completionListener != null; + } + public String toString() { StringBuilder str = new StringBuilder(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 6a69c62300..68d9a13cef 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -617,7 +617,7 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - if (expiry > 0 && !m.isUnreliable()) + if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 6144edb947..ea48e48721 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -137,6 +137,7 @@ final class IoReceiver implements Runnable } catch (Throwable t) { + t.printStackTrace(); if (!(shutdownBroken && t instanceof SocketException && t.getMessage().equalsIgnoreCase("socket closed") && diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 9271e1ce16..737ed2322f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -33,6 +33,7 @@ public class SubscriptionTestHelper implements Subscription private final List messages; private final Object key; private boolean isSuspended; + private AMQQueue.Context _queueContext; public SubscriptionTestHelper(Object key) { @@ -101,11 +102,16 @@ public class SubscriptionTestHelper implements Subscription //To change body of implemented methods use File | Settings | File Templates. } - public void restoreCredit(final QueueEntry queueEntry) + public void onDequeue(final QueueEntry queueEntry) { } + public void restoreCredit(QueueEntry queueEntry) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void setStateListener(final StateListener listener) { //To change body of implemented methods use File | Settings | File Templates. @@ -116,9 +122,14 @@ public class SubscriptionTestHelper implements Subscription return null; //To change body of implemented methods use File | Settings | File Templates. } - public QueueEntry getLastSeenEntry() + public AMQQueue.Context getQueueContext() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _queueContext; + } + + public void setQueueContext(AMQQueue.Context queueContext) + { + _queueContext = queueContext; } public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) -- cgit v1.2.1