summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-08-31 11:36:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-08-31 11:36:26 +0000
commitbede462542128d1bb0060bc5bec8f31e3f2b5aff (patch)
tree4a4c0d2daa69140b3f500bd010f4490caf0529c2
parent2c5cf5d0db04b5163605ff20682fe057ace41e1d (diff)
downloadqpid-python-bede462542128d1bb0060bc5bec8f31e3f2b5aff.tar.gz
more changes to pass the 0-10 python tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@809544 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java25
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java289
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java1
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java8
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java88
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java116
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java413
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java5
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java22
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java17
39 files changed, 1026 insertions, 299 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index c17d8379f4..615a02a112 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -21,6 +21,7 @@
package org.apache.qpid.extras.exchanges.diagnostic;
import java.util.ArrayList;
+import java.util.Map;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -153,6 +154,12 @@ public class DiagnosticExchange extends AbstractExchange
// No op
}
+ public void registerQueue(String routingKey, AMQQueue queue, Map<String, Object> args) throws AMQException
+ {
+ // No op
+ }
+
+
/**
* Does nothing.
*
@@ -214,4 +221,5 @@ public class DiagnosticExchange extends AbstractExchange
// TODO Auto-generated method stub
return false;
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index f1f87eb1d2..04c59fd63f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -43,6 +43,8 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+
public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
@@ -206,6 +208,12 @@ public abstract class AbstractExchange implements Exchange, Managable
return getVirtualHost().getQueueRegistry();
}
+ public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
+ {
+ return isBound(new AMQShortString(bindingKey), queue);
+ }
+
+
public boolean isBound(String bindingKey, AMQQueue queue)
{
return isBound(new AMQShortString(bindingKey), queue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 02e83f3dd3..b40576f258 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -160,8 +160,18 @@ public class DirectExchange extends AbstractExchange
return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
}
+ public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args) throws AMQException
+ {
+ registerQueue(new AMQShortString(routingKey), queue);
+ }
+
public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
+ registerQueue(routingKey, queue);
+ }
+
+ private void registerQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ {
assert queue != null;
assert routingKey != null;
if (!_index.add(routingKey, queue))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 7ff5e4ef96..40a765f420 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
import java.util.ArrayList;
+import java.util.Map;
public interface Exchange
{
@@ -51,6 +52,8 @@ public interface Exchange
void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
@@ -96,4 +99,5 @@ public interface Exchange
boolean isBound(String bindingKey, AMQQueue queue);
boolean isBound(String bindingKey);
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index fde4cfd6a2..db3b023479 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -189,14 +189,14 @@ class HeadersBinding
{
for(Map.Entry<String,Object> entry : matches.entrySet())
{
- if(!headers.containsHeader(entry.getKey())
- || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+ if(headers.containsHeader(entry.getKey())
+ || ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
|| (entry.getValue().equals(headers.getHeader(entry.getKey())))))
{
- return false;
+ return true;
}
}
- return true;
+ return false;
}
private boolean passesRequiredOr(AMQMessageHeader headers)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 02e129621c..0a2d678073 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -52,6 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
/**
* An exchange that binds queues based on a set of required headers and header values
@@ -114,6 +115,7 @@ public class HeadersExchange extends AbstractExchange
private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
+ private Map<AMQShortString, Registration> _bindingByKey = new ConcurrentHashMap<AMQShortString, Registration>();
/**
* HeadersExchangeMBean class implements the management interface for the
@@ -211,7 +213,7 @@ public class HeadersExchange extends AbstractExchange
bindingMap.setString(keyAndValue[0], keyAndValue[1]);
}
- _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
+ _bindings.add(new Registration(new HeadersBinding(bindingMap), queue, new AMQShortString(binding)));
}
} // End of MBean class
@@ -224,13 +226,17 @@ public class HeadersExchange extends AbstractExchange
public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
_logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args);
- _bindings.add(new Registration(new HeadersBinding(args), queue));
+
+ Registration registration = new Registration(new HeadersBinding(args), queue, routingKey);
+ _bindings.add(registration);
+
}
public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
_logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
- if(!_bindings.remove(new Registration(new HeadersBinding(args), queue)))
+
+ if(!_bindings.remove(new Registration(args == null ? null : new HeadersBinding(args), queue, routingKey)))
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
+ " with headers args " + args);
@@ -326,21 +332,28 @@ public class HeadersExchange extends AbstractExchange
{
private final HeadersBinding binding;
private final AMQQueue queue;
+ private final AMQShortString routingKey;
- Registration(HeadersBinding binding, AMQQueue queue)
+ Registration(HeadersBinding binding, AMQQueue queue, AMQShortString routingKey)
{
this.binding = binding;
this.queue = queue;
+ this.routingKey = routingKey;
}
public int hashCode()
{
- return queue.hashCode();
+ int queueHash = queue.hashCode();
+ int routingHash = routingKey == null ? 0 : routingKey.hashCode();
+ return queueHash + routingHash;
}
public boolean equals(Object o)
{
- return o instanceof Registration && ((Registration) o).queue.equals(queue);
+ return o instanceof Registration
+ && ((Registration) o).queue.equals(queue)
+ && (routingKey == null ? ((Registration)o).routingKey == null
+ : routingKey.equals(((Registration)o).routingKey));
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java
index 9f6fccc7f8..48c336c0b1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager_0_10.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.flow;
*/
public interface FlowCreditManager_0_10 extends FlowCreditManager
{
- public void addCredit(long bytes, long count);
+ public void addCredit(long count, long bytes);
void clearCredit();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 940b89dba9..10f578551a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -182,7 +182,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
- public synchronized void addCredit(long bytes, long count)
+ public synchronized void addCredit(long count, long bytes)
{
if(bytes > 0)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index b705ea7dba..1ab1ea916e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -109,6 +109,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
else
{
queue = createQueue(queueName, body, virtualHost, session);
+ queue.setPrincipalHolder(session);
if (queue.isDurable() && !queue.isAutoDelete())
{
store.createQueue(queue, body.getArguments());
@@ -123,12 +124,15 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+ else if (queue.getPrincipalHolder() != null
+ && queue.getPrincipalHolder().getPrincipal() != null
+ && queue.getPrincipalHolder().getPrincipal().getName() != null
+ && !session.getContextKey().equals(new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName())))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another client ID('"
- + queue.getOwner() + "')");
+ + queue.getPrincipalHolder().getPrincipal().getName() + "')");
}
AMQChannel channel = session.getChannel(channelId);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
index 9546025019..b9143ece91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
@@ -24,6 +24,7 @@ import org.apache.qpid.transport.*;
import java.util.concurrent.atomic.AtomicLong;
import java.nio.ByteBuffer;
+import java.lang.ref.WeakReference;
public class MessageTransferMessage implements InboundMessage, ServerMessage
@@ -36,16 +37,26 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
private final AMQMessageHeader _messageHeader;
private final long _messageNumber;
private final long _arrivalTime;
+ private WeakReference<Session> _sessionRef;
- public MessageTransferMessage(MessageTransfer xfr)
+ public MessageTransferMessage(MessageTransfer xfr, WeakReference<Session> sessionRef)
{
_xfr = xfr;
_messageNumber = _numberSource.getAndIncrement();
Header header = _xfr.getHeader();
- _deliveryProps = header.get(DeliveryProperties.class);
- _messageProps = header.get(MessageProperties.class);
+ if(header != null)
+ {
+ _deliveryProps = header.get(DeliveryProperties.class);
+ _messageProps = header.get(MessageProperties.class);
+ }
+ else
+ {
+ _deliveryProps = null;
+ _messageProps = null;
+ }
_messageHeader = new MessageTransferHeader(_deliveryProps, _messageProps);
_arrivalTime = System.currentTimeMillis();
+ _sessionRef = sessionRef;
}
public String getRoutingKey()
@@ -110,5 +121,9 @@ public class MessageTransferMessage implements InboundMessage, ServerMessage
return _xfr.getBody();
}
+ public Session getSession()
+ {
+ return _sessionRef == null ? null : _sessionRef.get();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 0339245b04..2383d6e0be 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -61,16 +61,19 @@ public class AMQPriorityQueue extends SimpleAMQQueue
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
{
- if(subscription.setLastSeenEntry(subnode,entry))
+ if(_releasedUpdater.compareAndSet(context,released,entry))
{
break;
}
else
{
- subnode = subscription.getLastSeenEntry();
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 8b69aac7c3..ac0363e76a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -28,16 +28,24 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import java.util.List;
import java.util.Set;
+import java.util.Map;
public interface AMQQueue extends Managable, Comparable<AMQQueue>
{
+
+ public interface Context
+ {
+ QueueEntry getLastSeenEntry();
+ }
+
AMQShortString getName();
boolean isDurable();
@@ -45,6 +53,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
boolean isAutoDelete();
AMQShortString getOwner();
+ PrincipalHolder getPrincipalHolder();
+ void setPrincipalHolder(PrincipalHolder principalHolder);
VirtualHost getVirtualHost();
@@ -90,6 +100,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+ void requeue(QueueEntryImpl storeContext, Subscription subscription);
+
void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
@@ -166,6 +178,10 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void stop();
+ boolean isExclusive();
+
+ Map<String, Object> getArguments();
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 8bb958ed3f..049b6b7604 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -165,7 +165,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
public String getOwner()
{
- return String.valueOf(_queue.getOwner());
+ return String.valueOf(_queue.getPrincipalHolder() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal().getName());
}
public boolean isAutoDelete()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 589f6919d5..7e4871158c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -3,6 +3,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.message.ServerMessage;
/*
@@ -151,6 +152,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isDeleted();
boolean acquiredBySubscription();
+ boolean isAcquiredBy(Subscription subscription);
void setDeliveredToSubscription();
@@ -172,6 +174,8 @@ public interface QueueEntry extends Comparable<QueueEntry>
void requeue(StoreContext storeContext) throws AMQException;
+ void requeue(Subscription subscription);
+
void dequeue(final StoreContext storeContext) throws FailedDequeueException;
void dispose(final StoreContext storeContext) throws MessageCleanupException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index d69f4271d9..67bc87145a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -124,7 +124,7 @@ public class QueueEntryImpl implements QueueEntry
public long getSize()
{
- return getMessage().getSize();
+ return getMessage() == null ? 0 : getMessage().getSize();
}
public boolean getDeliveredToConsumer()
@@ -134,14 +134,17 @@ public class QueueEntryImpl implements QueueEntry
public boolean expired() throws AMQException
{
- long expiration = getMessage().getExpiration();
- if (expiration != 0L)
+ ServerMessage message = getMessage();
+ if(message != null)
{
- long now = System.currentTimeMillis();
+ long expiration = message.getExpiration();
+ if (expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
- return (now > expiration);
+ return (now > expiration);
+ }
}
-
return false;
}
@@ -178,6 +181,13 @@ public class QueueEntryImpl implements QueueEntry
return (_state instanceof SubscriptionAcquiredState);
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ EntryState state = _state;
+ return state instanceof SubscriptionAcquiredState
+ && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+ }
+
public void setDeliveredToSubscription()
{
_deliveredToConsumer = true;
@@ -263,6 +273,15 @@ public class QueueEntryImpl implements QueueEntry
}
}
+ public void requeue(Subscription subscription)
+ {
+ getQueue().requeue(this, subscription);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ }
+
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
EntryState state = _state;
@@ -272,7 +291,7 @@ public class QueueEntryImpl implements QueueEntry
if (state instanceof SubscriptionAcquiredState)
{
Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
- s.restoreCredit(this);
+ s.onDequeue(this);
}
getQueue().dequeue(storeContext, this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 86d1411450..9f8a956448 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1,15 +1,9 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.*;
import javax.management.JMException;
@@ -22,15 +16,14 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.PrincipalHolder;
/*
*
@@ -56,6 +49,38 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
+ private PrincipalHolder _prinicpalHolder;
+
+
+ static final class QueueContext implements Context
+ {
+ volatile QueueEntry _lastSeenEntry;
+ volatile QueueEntry _releasedEntry;
+
+ public QueueContext(QueueEntry head)
+ {
+ _lastSeenEntry = head;
+ }
+
+ public QueueEntry getLastSeenEntry()
+ {
+ return _lastSeenEntry;
+ }
+ }
+
+
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _lastSeenUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+ _releasedUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueContext.class, QueueEntry.class, "_releasedEntry");
+
+
private final AMQShortString _name;
/** null means shared */
@@ -167,7 +192,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost);
+ this(new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost);
}
public void resetNotifications()
@@ -191,6 +216,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _durable;
}
+ public boolean isExclusive()
+ {
+ return _owner != null;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null;
+ }
+
public boolean isAutoDelete()
{
return _autoDelete;
@@ -201,6 +236,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _owner;
}
+ public PrincipalHolder getPrincipalHolder()
+ {
+ return _prinicpalHolder;
+ }
+
+ public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+ {
+ _prinicpalHolder = prinicpalHolder;
+ }
+
+
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -208,6 +254,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// ------ bind and unbind
+ public void bind(Exchange exchange, String bindingKey, Map<String, Object> arguments) throws AMQException
+ {
+
+ FieldTable fieldTable = FieldTable.convertToFieldTable(arguments);
+ AMQShortString routingKey = new AMQShortString(bindingKey);
+
+ exchange.registerQueue(routingKey, this, fieldTable);
+
+ if (isDurable() && exchange.isDurable())
+ {
+
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, fieldTable);
+ }
+
+ _bindings.addBinding(routingKey, fieldTable, exchange);
+ }
+
+
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
{
exchange.registerQueue(routingKey, this, arguments);
@@ -264,7 +328,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
- subscription.setLastSeenEntry(null, _entries.getHead());
+ subscription.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
@@ -298,17 +362,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
subscription.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
-
- QueueEntry lastSeen;
-
- while ((lastSeen = subscription.getLastSeenEntry()) != null)
- {
- subscription.setLastSeenEntry(lastSeen, null);
- }
+ subscription.setQueueContext(null);
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getConsumerCount() == 0)
+ if (_autoDelete && getConsumerCount() == 0 && !isExclusive())
{
if (_logger.isInfoEnabled())
{
@@ -449,13 +507,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
- sub.restoreCredit(entry);
+ sub.onDequeue(entry);
}
else
{
-
deliverMessage(sub, entry);
-
}
}
}
@@ -487,69 +543,43 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_deliveredMessages.incrementAndGet();
sub.send(entry);
-
+ setLastSeenEntry(sub,entry);
}
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
+ private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
{
+ return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
+ }
- // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
- // interest in.
- QueueEntry node = sub.getLastSeenEntry();
- while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
- {
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- node = null;
- break;
- }
-
- }
+ private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ {
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueEntry releasedEntry = subContext._releasedEntry;
- if (node == entry)
+ _lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
{
- // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
- // good
- return true;
+ _releasedUpdater.compareAndSet(subContext, releasedEntry, null);
}
- else
- {
- // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
- // no-one else has updated it to something furhter on in the list
- //TODO - check
- //updateLastSeenEntry(sub, entry);
- return false;
- }
-
}
- private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
{
- QueueEntry node = sub.getLastSeenEntry();
- if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ QueueContext subContext = (QueueContext) sub.getQueueContext();
+ if(subContext != null)
{
- do
+ QueueEntry oldEntry;
+
+ while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
{
- if (sub.setLastSeenEntry(node, entry))
- {
- return;
- }
- else
+ if(_releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
- node = sub.getLastSeenEntry();
+ break;
}
}
- while (node != null && entry.compareTo(node) < 0);
}
-
}
public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
@@ -564,7 +594,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
{
- updateLastSeenEntry(sub, entry);
+ updateSubRequeueEntry(sub, entry);
}
}
@@ -572,6 +602,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void requeue(QueueEntryImpl entry, Subscription subscription)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+ }
+
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
decrementQueueCount();
@@ -1248,12 +1296,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
- boolean advanced = false;
+
boolean subActive = sub.isActive();
if (subActive)
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
+
+
+ QueueEntry node = getNextAvailableEntry(sub);
+
+ if (node != null && !(node.isAcquired() || node.isDeleted()))
{
if (!sub.isSuspended())
{
@@ -1263,23 +1314,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (sub.acquires() && !node.acquire(sub))
{
- sub.restoreCredit(node);
+ sub.onDequeue(node);
}
else
{
deliverMessage(sub, node);
-
- if (!sub.acquires())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- }
}
}
@@ -1291,19 +1330,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
node.addStateChangeListener(new QueueEntryListener(sub, node));
}
}
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
+
}
}
- atTail = (_entries.next(node) == null) && !advanced;
+ atTail = (node == null) || (_entries.next(node) == null);
}
return atTail || !subActive;
}
@@ -1315,40 +1346,59 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
Subscription sub = subNode.getSubscription();
- moveSubscriptionToNextNode(sub);
+ if(sub.acquires())
+ {
+ getNextAvailableEntry(sub);
+ }
+ else
+ {
+ // TODO
+ }
}
}
- private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
+ private QueueEntry getNextAvailableEntry(final Subscription sub)
throws AMQException
{
- QueueEntry node = sub.getLastSeenEntry();
-
- while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
{
- if (!node.isAcquired() && !node.isDeleted() && node.expired())
+ QueueEntry lastSeen = context._lastSeenEntry;
+ QueueEntry releasedNode = context._releasedEntry;
+
+ QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+
+ boolean expired = false;
+ while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired())))
{
- if (node.acquire())
+ if (expired)
{
- final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
+ expired = false;
+ if (node.acquire())
+ {
+ final StoreContext reapingStoreContext = new StoreContext();
+ node.discard(reapingStoreContext);
+ }
}
- }
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- else
- {
- break;
- }
+ if(_lastSeenUpdater.compareAndSet(context, lastSeen, node))
+ {
+ _releasedUpdater.compareAndSet(context, releasedNode, null);
+ }
+
+ lastSeen = context._lastSeenEntry;
+ releasedNode = context._releasedEntry;
+ node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+ }
+ return node;
+ }
+ else
+ {
+ return null;
}
- return node;
}
+
private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
@@ -1386,12 +1436,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (sub != null)
{
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
+ done = attemptDelivery(sub);
}
if (done)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index c5a2972720..7bdf516e44 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
public class SimpleQueueEntryList implements QueueEntryList
{
+
private final QueueEntryImpl _head;
private volatile QueueEntryImpl _tail;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
index f852514444..33213055ca 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
@@ -507,8 +507,9 @@ public class PrincipalPermissions
// This will allow consumption from any temporary queue including ones not owned by this user.
// Of course the exclusivity will not be broken.
{
+
// if not limited to ownQueuesOnly then ok else check queue Owner.
- return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+ return (!ownQueuesOnly || new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
}
else
{
@@ -522,7 +523,7 @@ public class PrincipalPermissions
// if no queues are listed then ALL are ok othereise it must be specified.
if (ownQueuesOnly)
{
- if (queue.getOwner().equals(_user))
+ if ( new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user))
{
return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 34e35171e5..a8ba372f46 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -782,8 +782,14 @@ public class DerbyMessageStore implements MessageStore
{
stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
+ String owner = queue.getPrincipalHolder() == null
+ ? null
+ : queue.getPrincipalHolder().getPrincipal() == null
+ ? null
+ : queue.getPrincipalHolder().getPrincipal().getName();
+
stmt.setString(1, queue.getName().toString());
- stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
+ stmt.setString(2, owner);
stmt.execute();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
index 76a6580408..a980347633 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
@@ -30,18 +30,24 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final Subscription_0_10 _sub;
private final QueueEntry _entry;
private final ServerSession _session;
+ private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry)
+ public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
{
super();
_sub = sub;
_entry = entry;
_session = session;
+ _restoreCredit = restoreCredit;
}
public void onComplete(Method method)
{
_session.acknowledge(_sub, _entry);
+ if(_restoreCredit)
+ {
+ _sub.restoreCredit(_entry);
+ }
_session.removeDispositionListener(method);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index a99ca3b118..87f4fd5c7c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -30,7 +30,6 @@ public interface Subscription
{
-
public static enum State
{
ACTIVE,
@@ -76,15 +75,17 @@ public interface Subscription
void releaseSendLock();
+ void onDequeue(final QueueEntry queueEntry);
+
void restoreCredit(final QueueEntry queueEntry);
void setStateListener(final StateListener listener);
public State getState();
- QueueEntry getLastSeenEntry();
+ AMQQueue.Context getQueueContext();
- boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+ void setQueueContext(AMQQueue.Context queueContext);
boolean isActive();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 382cda08da..68581acc14 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -61,7 +61,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private AMQQueue.Context _queueContext;
+
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
@@ -544,12 +545,18 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _queue;
}
+ public void onDequeue(final QueueEntry queueEntry)
+ {
+ restoreCredit(queueEntry);
+ }
+
public void restoreCredit(final QueueEntry queueEntry)
{
_creditManager.restoreCredit(1, queueEntry.getSize());
}
+
public void creditStateChanged(boolean hasCredit)
{
@@ -586,14 +593,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return _queueContext.get();
+ return _queueContext;
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
+ public void setQueueContext(AMQQueue.Context context)
{
- return _queueContext.compareAndSet(expected,newvalue);
+ _queueContext = context;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 999d268181..9046175c84 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
-import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener
{
@@ -51,7 +50,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+ private AMQQueue.Context _queueContext;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -76,16 +75,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final ServerSession _session;
private AtomicBoolean _stopped = new AtomicBoolean(true);
private ConcurrentHashMap<Integer, QueueEntry> _sentMap = new ConcurrentHashMap<Integer, QueueEntry>();
+ private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
public Subscription_0_10(ServerSession session, String destination, MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode, FlowCreditManager_0_10 creditManager, FilterManager filters)
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ FilterManager filters)
{
_session = session;
_destination = destination;
_acceptMode = acceptMode;
_acquireMode = acquireMode;
_creditManager = creditManager;
+ _flowMode = flowMode;
_filters = filters;
_creditManager.addStateListener(this);
_state.set(_creditManager.hasCredit() ? State.ACTIVE : State.SUSPENDED);
@@ -139,10 +143,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
- if (_noLocal)
+ if (_noLocal
+ && (entry.getMessage() instanceof MessageTransferMessage)
+ && ((MessageTransferMessage)entry.getMessage()).getSession() == _session)
{
-
-
+ return false;
}
@@ -241,18 +246,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
-
- MessageTransfer xfr = new MessageTransfer();
- xfr.setDestination(_destination);
- if(msg.getBody() != null)
+ Struct[] headers;
+ if(msg.getHeader() == null)
{
- xfr.setBody(msg.getBody());
+ headers = EMPTY_STRUCT_ARRAY;
+ }
+ else
+ {
+ headers = msg.getHeader().getStructs();
}
-
- xfr.setAcceptMode(_acceptMode);
- xfr.setAcquireMode(_acquireMode);
-
- Struct[] headers = msg.getHeader().getStructs();
ArrayList<Struct> newHeaders = new ArrayList<Struct>(headers.length);
DeliveryProperties origDeliveryProps = null;
@@ -297,11 +299,23 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
deliveryProps.setRedelivered(entry.isRedelivered());
newHeaders.add(deliveryProps);
- xfr.setHeader(new Header(newHeaders));
+ Header header = new Header(newHeaders);
+
+ MessageTransfer xfr = new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
if(_acceptMode == MessageAcceptMode.NONE)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry));
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ }
+ else if(_flowMode == MessageFlowMode.WINDOW)
+ {
+ xfr.setCompletionListener(new Method.CompletionListener()
+ {
+ public void onComplete(Method method)
+ {
+ restoreCredit(entry);
+ }
+ });
}
@@ -329,6 +343,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
reject(entry);
}
+
+ public boolean acquire()
+ {
+ return entry.acquire(Subscription_0_10.this);
+ }
});
}
else
@@ -350,6 +369,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
reject(entry);
}
+ public boolean acquire()
+ {
+ boolean acquired = entry.acquire(Subscription_0_10.this);
+ _session.acknowledge(Subscription_0_10.this,entry);
+ return acquired;
+
+ }
+
});
}
@@ -367,6 +394,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
{
entry.setRedelivered(true);
entry.release();
+ try
+ {
+ entry.requeue(new StoreContext());
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
}
public void queueDeleted(AMQQueue queue)
@@ -394,6 +429,11 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_creditManager.restoreCredit(1, queueEntry.getSize());
}
+ public void onDequeue(QueueEntry queueEntry)
+ {
+
+ }
+
public void setStateListener(StateListener listener)
{
_stateListener = listener;
@@ -404,14 +444,14 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return _state.get();
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return _queueContext.get();
+ return _queueContext;
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ public void setQueueContext(AMQQueue.Context queueContext)
{
- return _queueContext.compareAndSet(expected, newValue);
+ _queueContext = queueContext;
}
public boolean isActive()
@@ -453,7 +493,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
creditManager.addCredit(value, 0L);
break;
case BYTE:
- creditManager.addCredit(0L, value);
+ creditManager.addCredit(0l, value);
break;
}
@@ -472,6 +512,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void setFlowMode(MessageFlowMode flowMode)
{
+
_creditManager.removeListener(this);
switch(flowMode)
@@ -485,6 +526,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
default:
throw new RuntimeException("Unknown message flow mode: " + flowMode);
}
+ _flowMode = flowMode;
if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
{
_stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 8fc426a6f6..5eab6c14c1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -29,20 +29,23 @@ import org.apache.qpid.server.txn.Transaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
import java.security.Principal;
+import java.lang.ref.WeakReference;
import static org.apache.qpid.util.Serial.*;
import com.sun.security.auth.UserPrincipal;
public class ServerSession extends Session implements PrincipalHolder
{
-
+ private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
public static interface MessageDispositionChangeListener
{
@@ -51,6 +54,10 @@ public class ServerSession extends Session implements PrincipalHolder
public void onRelease();
public void onReject();
+
+ public boolean acquire();
+
+
}
public static interface Task
@@ -66,22 +73,27 @@ public class ServerSession extends Session implements PrincipalHolder
private Principal _principal;
- private Map<String, Subscription_0_10> _subscriptions = new HashMap<String, Subscription_0_10>();
+ private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final WeakReference<Session> _reference;
+
ServerSession(Connection connection, Binary name, long expiry)
{
super(connection, name, expiry);
_transaction = new AutoCommitTransaction();
_principal = new UserPrincipal(connection.getAuthorizationID());
+ _reference = new WeakReference(this);
}
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
super(connection, delegate, name, expiry);
_transaction = new AutoCommitTransaction();
+ _principal = new UserPrincipal(connection.getAuthorizationID());
+ _reference = new WeakReference(this);
}
public void enqueue(final ServerMessage message, ArrayList<AMQQueue> queues)
@@ -104,6 +116,11 @@ public class ServerSession extends Session implements PrincipalHolder
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
});
}
@@ -160,6 +177,54 @@ public class ServerSession extends Session implements PrincipalHolder
});
}
+ public RangeSet acquire(RangeSet transfers)
+ {
+ RangeSet acquired = new RangeSet();
+
+ if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = transfers.iterator();
+
+ if(rangeIter.hasNext())
+ {
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
+ {
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
+ {
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
+ }
+ if(range != null && range.includes(next))
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
+ if(changeListener.acquire())
+ {
+ acquired.add(next);
+ }
+ }
+
+
+ }
+
+ }
+
+
+ }
+
+ return acquired;
+ }
+
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
if(!_messageDispositionListenerMap.isEmpty())
@@ -208,6 +273,7 @@ public class ServerSession extends Session implements PrincipalHolder
public void onClose()
{
+ _transaction.rollback();
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
listener.onRelease();
@@ -217,7 +283,7 @@ public class ServerSession extends Session implements PrincipalHolder
for (Task task : _taskList)
{
task.doTask(this);
- }
+ }
}
@@ -231,32 +297,53 @@ public class ServerSession extends Session implements PrincipalHolder
{
sub.acknowledge(entry);
}
+
+ public void onRollback()
+ {
+ entry.release();
+
+ try
+ {
+ entry.requeue(new StoreContext());
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+
+ }
});
}
- public Map<String, Subscription_0_10> getSubscriptions()
+ public Collection<Subscription_0_10> getSubscriptions()
{
- return _subscriptions;
+ return _subscriptions.values();
}
public void register(String destination, Subscription_0_10 sub)
{
- _subscriptions.put(destination, sub);
+ _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub);
}
public Subscription_0_10 getSubscription(String destination)
{
- return _subscriptions.get(destination);
+ return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination);
}
public void unregister(Subscription_0_10 sub)
{
- _subscriptions.remove(sub);
+ _subscriptions.remove(sub.getConsumerTag().toString());
try
{
sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
+ AMQQueue queue = sub.getQueue();
+ if(queue != null)
+ {
+ queue.unregisterSubscription(sub);
+ }
}
catch (AMQException e)
@@ -285,11 +372,6 @@ public class ServerSession extends Session implements PrincipalHolder
_transaction.rollback();
}
- void setPrincipal(Principal principal)
- {
- _principal = principal;
- }
-
public Principal getPrincipal()
{
return _principal;
@@ -305,4 +387,10 @@ public class ServerSession extends Session implements PrincipalHolder
_taskList.remove(task);
}
+ public WeakReference<Session> getReference()
+ {
+ return _reference;
+ }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 9f08434fe8..c4a40c4676 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -24,10 +24,7 @@ import org.apache.qpid.transport.*;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
+import org.apache.qpid.server.exchange.*;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -39,14 +36,13 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.*;
import java.util.ArrayList;
import java.util.Map;
+import java.util.Collection;
public class ServerSessionDelegate extends SessionDelegate
{
@@ -90,7 +86,14 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageAcquire(Session session, MessageAcquire method)
{
- super.messageAcquire(session, method);
+ RangeSet acquiredRanges = ((ServerSession)session).acquire(method.getTransfers());
+
+ Acquired result = new Acquired(acquiredRanges);
+
+
+ session.executionResult((int) method.getId(), result);
+
+
}
@Override
@@ -129,35 +132,60 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
String destination = method.getDestination();
- String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
-
-
- AMQQueue queue = queueRegistry.getQueue(queueName);
- if(queue == null)
+ if(((ServerSession)session).getSubscription(destination)!=null)
{
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destaination: '"+destination+"'");
}
else
{
+ String queueName = method.getQueue();
+ QueueRegistry queueRegistry = getQueueRegistry(session);
- FlowCreditManager_0_10 creditManager = new CreditCreditManager(0L,0L);
-
- // TODO filters
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination,method.getAcceptMode(),method.getAcquireMode(), creditManager, null);
+ AMQQueue queue = queueRegistry.getQueue(queueName);
- ((ServerSession)session).register(destination, sub);
- try
+ if(queue == null)
{
- queue.registerSubscription(sub, method.getExclusive());
+ exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- catch (AMQException e)
+ else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
- // TODO
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- throw new RuntimeException(e);
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else
+ {
+
+ FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+
+ // TODO filters
+
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+ destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager, null);
+
+ ((ServerSession)session).register(destination, sub);
+ try
+ {
+ queue.registerSubscription(sub, method.getExclusive());
+ }
+ catch (AMQQueue.ExistingExclusiveSubscription existing)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
}
}
}
@@ -172,18 +200,34 @@ public class ServerSessionDelegate extends SessionDelegate
if(xfr.hasDestination())
{
exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ if(exchange == null)
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
}
else
{
exchange = exchangeRegistry.getDefaultExchange();
}
- MessageTransferMessage message = new MessageTransferMessage(xfr);
+ MessageTransferMessage message = new MessageTransferMessage(xfr, ((ServerSession)ssn).getReference());
+
+ DeliveryProperties delvProps = null;
+ if(message.getHeader() != null && (delvProps = message.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ {
+ delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+ }
+
try
{
ArrayList<AMQQueue> queues = exchange.route(message);
- ((ServerSession) ssn).enqueue(message, queues);
+
+
+ if(queues != null)
+ {
+ ((ServerSession) ssn).enqueue(message, queues);
+ }
ssn.processed(xfr);
}
@@ -281,6 +325,10 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
// TODO - check exchange has same properties
+ if(!exchange.getType().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
}
}
@@ -329,7 +377,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- // TODO check same as declared
+ if(!exchange.getType().toString().equals(method.getType()))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Cannot redeclare with a different exchange type");
+ }
}
}
@@ -343,7 +394,7 @@ public class ServerSessionDelegate extends SessionDelegate
ex.setDescription(description);
session.invoke(ex);
- session.close();
+ //session.close();
}
private Exchange getExchange(Session session, String exchangeName)
@@ -431,12 +482,128 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void exchangeBind(Session session, ExchangeBind method)
{
- super.exchangeBind(session, method);
+
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+/*
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+*/
+ else
+ {
+ //TODO - here because of non-compiant python tests
+ if (!method.hasBindingKey())
+ {
+ method.setBindingKey(method.getQueue());
+ }
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else if (!virtualHost.getAccessManager().authoriseBind((ServerSession)session, exchange,
+ queue, new AMQShortString(method.getBindingKey())))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Bind Exchange: '" + method.getExchange()
+ + "' to Queue: '" + method.getQueue()
+ + "' not allowed");
+ }
+ else if(exchange.getType().equals(HeadersExchange.TYPE.getName()) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match")))
+ {
+ exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getName() + " require an x-match header");
+ }
+ else
+ {
+ try
+ {
+ AMQShortString routingKey = new AMQShortString(method.getBindingKey());
+ FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
+
+ if (!exchange.isBound(routingKey, fieldTable, queue))
+ {
+ queue.bind(exchange, routingKey, fieldTable);
+ }
+ else
+ {
+ // todo
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+
+ }
+
+
+
}
@Override
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
+ VirtualHost virtualHost = getVirtualHost(session);
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ if (!method.hasQueue())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not set");
+ }
+ else if (!method.hasExchange())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "exchange not set");
+ }
+ else if (!method.hasBindingKey())
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "binding-key not set");
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ Exchange exchange = exchangeRegistry.getExchange(method.getExchange());
+ if(queue == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: '" + method.getQueue() + "' not found");
+ }
+ else if(exchange == null)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found");
+ }
+ else
+ {
+ try
+ {
+ queue.unBind(exchange, new AMQShortString(method.getBindingKey()), null);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
super.exchangeUnbind(session, method);
}
@@ -445,41 +612,65 @@ public class ServerSessionDelegate extends SessionDelegate
{
ExchangeBoundResult result = new ExchangeBoundResult();
+ Exchange exchange;
+ AMQQueue queue;
if(method.hasExchange())
{
- Exchange exchange = getExchange(session, method.getExchange());
+ exchange = getExchange(session, method.getExchange());
if(exchange == null)
{
result.setExchangeNotFound(true);
}
+ }
+ else
+ {
+ exchange = getExchangeRegistry(session).getDefaultExchange();
+ }
- if(method.hasQueue())
+
+ if(method.hasQueue())
+ {
+
+ queue = getQueue(session, method.getQueue());
+ if(queue == null)
{
+ result.setQueueNotFound(true);
+ }
+
- AMQQueue queue = getQueue(session, method.getQueue());
- if(queue == null)
- {
- result.setQueueNotFound(true);
- }
+ if(exchange != null && queue != null)
+ {
+
+ boolean queueMatched = exchange.isBound(queue);
+
+ result.setQueueNotMatched(!queueMatched);
- if(exchange != null && queue != null)
+
+ if(method.hasBindingKey())
{
- if(method.hasBindingKey())
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ if(queueMatched)
{
-
- if(method.hasArguments())
- {
- // TODO
- }
result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
-
}
+ else
+ {
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+ }
+ }
+ else if (method.hasArguments())
+ {
+ // TODO
+
+ }
- result.setQueueNotMatched(!exchange.isBound(queue));
+ result.setQueueNotMatched(!exchange.isBound(queue));
- }
}
else if(exchange != null && method.hasBindingKey())
{
@@ -492,31 +683,20 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
- else if(method.hasQueue())
+ else if(exchange != null && method.hasBindingKey())
{
- AMQQueue queue = getQueue(session, method.getQueue());
- if(queue == null)
+ if(method.hasArguments())
{
- result.setQueueNotFound(true);
- }
- else
- {
- if(method.hasBindingKey())
- {
- if(method.hasArguments())
- {
- // TODO
- }
-
- // TODO
- }
+ // TODO
}
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
}
session.executionResult((int) method.getId(), result);
+
}
private AMQQueue getQueue(Session session, String queue)
@@ -580,6 +760,11 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
+ if(method.getExclusive())
+ {
+ queue.setPrincipalHolder((ServerSession)session);
+ }
+
if (queue.isDurable() && !queue.isAutoDelete())
{
@@ -597,6 +782,64 @@ public class ServerSessionDelegate extends SessionDelegate
queue.bind(defaultExchange, new AMQShortString(queueName), null);
}
+
+ if(method.hasAutoDelete()
+ && method.getAutoDelete()
+ && method.hasExclusive()
+ && method.getExclusive())
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ {
+
+ public void doTask(ServerSession session)
+ {
+ try
+ {
+ q.delete();
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(deleteQueueTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+ }
+ else if(method.getExclusive())
+ {
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task removeExclusive = new ServerSession.Task()
+ {
+
+ public void doTask(ServerSession session)
+ {
+ q.setPrincipalHolder(null);
+ }
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(removeExclusive);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ s.removeSessionCloseTask(removeExclusive);
+ }
+ });
+ }
+ }
}
catch (AMQException e)
{
@@ -605,13 +848,12 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
}
- else if (method.getExclusive() && (queue.getOwner() != null && !queue.getOwner().equals(((ServerSession)session).getPrincipal().getName())))
+ else if (method.getExclusive() && (queue.getPrincipalHolder() != null && !queue.getPrincipalHolder().equals(session)))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "')";
+ + "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
exception(session, method, errorCode, description);
@@ -695,7 +937,11 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if (method.getIfEmpty() && !queue.isEmpty())
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+ {
+ exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+ }
+ else if (method.getIfEmpty() && !queue.isEmpty())
{
exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
}
@@ -746,7 +992,7 @@ public class ServerSessionDelegate extends SessionDelegate
String queueName = method.getQueue();
if(queueName == null || queueName.length()==0)
{
- exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "No queue name supplied");
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "No queue name supplied");
}
else
@@ -778,7 +1024,25 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void queueQuery(Session session, QueueQuery method)
{
- super.queueQuery(session, method);
+ QueueQueryResult result = new QueueQueryResult();
+
+ AMQQueue queue = getQueue(session, method.getQueue());
+
+ if(queue != null)
+ {
+ result.setQueue(queue.getName().toString());
+ result.setDurable(queue.isDurable());
+ result.setExclusive(queue.isExclusive());
+ result.setAutoDelete(queue.isAutoDelete());
+ result.setArguments(queue.getArguments());
+ result.setMessageCount(queue.getMessageCount());
+ result.setSubscriberCount(queue.getConsumerCount());
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+
}
@Override
@@ -835,15 +1099,14 @@ public class ServerSessionDelegate extends SessionDelegate
public void closed(Session session)
{
super.closed(session);
- for(Subscription_0_10 sub : getSubscriptions(session).values())
+ for(Subscription_0_10 sub : getSubscriptions(session))
{
- sub.close();
+ ((ServerSession)session).unregister(sub);
}
((ServerSession)session).onClose();
- ((ServerSession)session).onClose();
}
- public Map<String, Subscription_0_10> getSubscriptions(Session session)
+ public Collection<Subscription_0_10> getSubscriptions(Session session)
{
return ((ServerSession)session).getSubscriptions();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 92347b9927..6db98ef101 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -40,7 +40,10 @@ public class LocalTransaction implements Transaction
try
{
-
+ for(Action action : _postCommitActions)
+ {
+ action.onRollback();
+ }
}
finally
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
index 91f429f48e..85719737f1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
@@ -30,6 +30,8 @@ public interface Transaction
public static interface Action
{
public void postCommit();
+
+ public void onRollback();
}
void dequeue(AMQQueue queue, ServerMessage message, Action postCommitAction);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index db0ec1c4fa..ef345f45c4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -318,6 +318,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setDeliveredToSubscription()
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -368,6 +373,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 966fb63186..2cdc002f27 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -46,6 +47,8 @@ public class MockAMQQueue implements AMQQueue
private boolean _deleted = false;
private AMQShortString _name;
+ private PrincipalHolder _principalHolder;
+
public MockAMQQueue(String name)
{
_name = new AMQShortString(name);
@@ -171,6 +174,11 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -312,6 +320,16 @@ public class MockAMQQueue implements AMQQueue
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isExclusive()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -333,4 +351,14 @@ public class MockAMQQueue implements AMQQueue
}
+ public PrincipalHolder getPrincipalHolder()
+ {
+ return _principalHolder;
+ }
+
+ public void setPrincipalHolder(PrincipalHolder principalHolder)
+ {
+ _principalHolder = principalHolder;
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 08b4573f33..84b3b09c8e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -44,6 +44,11 @@ public class MockQueueEntry implements QueueEntry
return false;
}
+ public boolean isAcquiredBy(Subscription subscription)
+ {
+ return false;
+ }
+
public void addStateChangeListener(StateChangeListener listener)
{
@@ -163,7 +168,12 @@ public class MockQueueEntry implements QueueEntry
}
-
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
public void setDeliveredToSubscription()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 72bbd7fe0c..cc2a56e1d2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -186,7 +186,7 @@ public class SimpleAMQQueueTest extends TestCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -197,7 +197,7 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
- QueueEntry entry = _subscription.getLastSeenEntry();
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
assertNull(entry);
}
@@ -207,7 +207,7 @@ public class SimpleAMQQueueTest extends TestCase
_queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
}
public void testExclusiveConsumer() throws AMQException
@@ -224,7 +224,7 @@ public class SimpleAMQQueueTest extends TestCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
@@ -273,7 +273,7 @@ public class SimpleAMQQueueTest extends TestCase
Long id = new Long(26);
AMQMessage message = createMessage(id);
_queue.enqueue(message);
- QueueEntry entry = _subscription.getLastSeenEntry();
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 51d9bd8be2..c7ea2067a6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -39,7 +39,7 @@ public class MockSubscription implements Subscription
private AMQShortString tag = new AMQShortString("mocktag");
private AMQQueue queue = null;
private StateListener _listener = null;
- private QueueEntry lastSeen = null;
+ private AMQQueue.Context _queueContext = null;
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
@@ -69,9 +69,9 @@ public class MockSubscription implements Subscription
return tag ;
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return lastSeen;
+ return _queueContext;
}
public SubscriptionAcquiredState getOwningState()
@@ -147,25 +147,23 @@ public class MockSubscription implements Subscription
{
}
+ public void onDequeue(QueueEntry queueEntry)
+ {
+ }
+
public void restoreCredit(QueueEntry queueEntry)
{
+ //To change body of implemented methods use File | Settings | File Templates.
}
public void send(QueueEntry msg) throws AMQException
{
- lastSeen = msg;
messages.add(msg);
}
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
+ public void setQueueContext(AMQQueue.Context queueContext)
{
- boolean result = false;
- if (expected != null)
- {
- result = (expected.equals(lastSeen));
- }
- lastSeen = newValue;
- return result;
+ _queueContext = queueContext;
}
public void setQueue(AMQQueue queue)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index 1ff39ca790..647d531476 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import java.util.Date;
+import java.util.Map;
+import java.math.BigDecimal;
+
/**
* AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
* value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -113,4 +117,63 @@ public class AMQTypedValue
return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
}
+
+ public static AMQTypedValue toTypedValue(Object val)
+ {
+ if(val == null)
+ {
+ return AMQType.VOID.asTypedValue(null);
+ }
+
+ Class klass = val.getClass();
+ if(klass == String.class)
+ {
+ return AMQType.ASCII_STRING.asTypedValue(val);
+ }
+ else if(klass == Character.class)
+ {
+ return AMQType.ASCII_CHARACTER.asTypedValue(val);
+ }
+ else if(klass == Integer.class)
+ {
+ return AMQType.INT.asTypedValue(val);
+ }
+ else if(klass == Long.class)
+ {
+ return AMQType.LONG.asTypedValue(val);
+ }
+ else if(klass == Float.class)
+ {
+ return AMQType.FLOAT.asTypedValue(val);
+ }
+ else if(klass == Double.class)
+ {
+ return AMQType.DOUBLE.asTypedValue(val);
+ }
+ else if(klass == Date.class)
+ {
+ return AMQType.TIMESTAMP.asTypedValue(val);
+ }
+ else if(klass == Byte.class)
+ {
+ return AMQType.BYTE.asTypedValue(val);
+ }
+ else if(klass == Boolean.class)
+ {
+ return AMQType.BOOLEAN.asTypedValue(val);
+ }
+ else if(klass == byte[].class)
+ {
+ return AMQType.BINARY.asTypedValue(val);
+ }
+ else if(klass == BigDecimal.class)
+ {
+ return AMQType.DECIMAL.asTypedValue(val);
+ }
+ else if(val instanceof Map)
+ {
+ return AMQType.FIELD_TABLE.asTypedValue(FieldTable.convertToFieldTable((Map)val));
+ }
+ return null;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index e9034d25d3..9b2f9b3969 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -828,6 +828,7 @@ public class FieldTable
recalculateEncodedSize();
}
+
public static interface FieldTableElementProcessor
{
public boolean processElement(String propertyName, AMQTypedValue value);
@@ -1187,4 +1188,24 @@ public class FieldTable
return _properties.equals(f._properties);
}
+
+ public static FieldTable convertToFieldTable(Map<String, Object> map)
+ {
+ if (map != null)
+ {
+ FieldTable table = new FieldTable();
+ for(Map.Entry<String,Object> entry : map.entrySet())
+ {
+ table.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+
+ return table;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 08842c94c0..3403b591f3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -316,7 +316,14 @@ public class Connection extends ConnectionInvoker
public void dispatch(Method method)
{
Session ssn = getSession(method.getChannel());
- ssn.received(method);
+ if(ssn != null)
+ {
+ ssn.received(method);
+ }
+ else
+ {
+ // TODO
+ }
}
public int getChannelMax()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 4b7f711bff..3c80180d0b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -179,6 +179,11 @@ public abstract class Method extends Struct implements ProtocolEvent
}
}
+ public boolean hasCompletionListener()
+ {
+ return completionListener != null;
+ }
+
public String toString()
{
StringBuilder str = new StringBuilder();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 6a69c62300..68d9a13cef 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -617,7 +617,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0 && !m.isUnreliable())
+ if ((expiry > 0 && !m.isUnreliable()) || m.hasCompletionListener())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 6144edb947..ea48e48721 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -137,6 +137,7 @@ final class IoReceiver implements Runnable
}
catch (Throwable t)
{
+ t.printStackTrace();
if (!(shutdownBroken &&
t instanceof SocketException &&
t.getMessage().equalsIgnoreCase("socket closed") &&
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 9271e1ce16..737ed2322f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -33,6 +33,7 @@ public class SubscriptionTestHelper implements Subscription
private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
+ private AMQQueue.Context _queueContext;
public SubscriptionTestHelper(Object key)
{
@@ -101,11 +102,16 @@ public class SubscriptionTestHelper implements Subscription
//To change body of implemented methods use File | Settings | File Templates.
}
- public void restoreCredit(final QueueEntry queueEntry)
+ public void onDequeue(final QueueEntry queueEntry)
{
}
+ public void restoreCredit(QueueEntry queueEntry)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void setStateListener(final StateListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -116,9 +122,14 @@ public class SubscriptionTestHelper implements Subscription
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry getLastSeenEntry()
+ public AMQQueue.Context getQueueContext()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueContext;
+ }
+
+ public void setQueueContext(AMQQueue.Context queueContext)
+ {
+ _queueContext = queueContext;
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)