summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-19 10:35:21 +0000
committerRobert Greig <rgreig@apache.org>2007-01-19 10:35:21 +0000
commitcbee9e6623bd4c1a9790613c39517a600ca289d6 (patch)
tree165522a99560d49de2eaf2e73b803a28cdec9dd4 /java
parenteae73349c9690704b54c3e5a3a77c7f95482f593 (diff)
downloadqpid-python-cbee9e6623bd4c1a9790613c39517a600ca289d6.tar.gz
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@497770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java83
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java55
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java139
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java367
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java59
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java268
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java14
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java51
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java55
38 files changed, 592 insertions, 900 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 10f039779c..2529ddc064 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.mina.common.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,8 @@ public class AMQChannel
private long _prefetch_LowWaterMark;
+ private long _prefetchSize;
+
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
@@ -108,6 +111,8 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+
+
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -151,6 +156,17 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
+ public long getPrefetchSize()
+ {
+ return _prefetchSize;
+ }
+
+
+ public void setPrefetchSize(long prefetchSize)
+ {
+ _prefetchSize = prefetchSize;
+ }
+
public long getPrefetchLowMarkCount()
{
return _prefetch_LowWaterMark;
@@ -213,14 +229,15 @@ public class AMQChannel
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- // returns true iff the message was delivered (i.e. if all data was
- // received
if (_log.isDebugEnabled())
{
_log.debug("Content body received on channel " + _channelId);
}
try
{
+
+ // returns true iff the message was delivered (i.e. if all data was
+ // received
if (_currentMessage.addContentBodyFrame(_storeContext, contentBody))
{
// callback to allow the context to do any post message processing
@@ -269,13 +286,14 @@ public class AMQChannel
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
* @param noLocal
+ * @param exclusive
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -286,7 +304,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -364,8 +382,10 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(final AMQProtocolSession session) throws AMQException
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
+ final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
@@ -374,7 +394,20 @@ public class AMQChannel
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null || requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
// false means continue processing
return false;
}
@@ -383,6 +416,12 @@ public class AMQChannel
{
}
});
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**
@@ -459,8 +498,9 @@ public class AMQChannel
{
boolean suspend;
- suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
-
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -545,4 +585,31 @@ public class AMQChannel
}
_returnMessages.clear();
}
+
+
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ if (isSuspended())
+ {
+ return true;
+ }
+ else
+ {
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if(!willSuspend)
+ {
+ final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
+
+ willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize);
+ }
+
+
+ if(willSuspend)
+ {
+ setSuspended(true);
+ }
+ return willSuspend;
+ }
+
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index ef58ba01a3..7ea22a447f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -73,5 +73,7 @@ public interface UnacknowledgedMessageMap
* @return a set of delivery tags
*/
Set<Long> getDeliveryTags();
+
+ public long getUnacknowledgeBytes();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index a21e4cfff6..e50d239d57 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -32,6 +32,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
private final Object _lock = new Object();
+ private long _unackedSize;
+
private Map<Long, UnacknowledgedMessage> _map;
private long _lastDeliveryTag;
@@ -77,7 +79,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
for (UnacknowledgedMessage msg : msgs)
{
- _map.remove(msg.deliveryTag);
+ remove(msg.deliveryTag);
+
}
}
}
@@ -86,7 +89,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- return _map.remove(deliveryTag);
+
+ UnacknowledgedMessage message = _map.remove(deliveryTag);
+ if(message != null)
+ {
+ _unackedSize -= message.message.getSize();
+ }
+
+ return message;
}
}
@@ -113,6 +123,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.put(deliveryTag, message);
+ _unackedSize += message.message.getSize();
_lastDeliveryTag = deliveryTag;
}
}
@@ -123,6 +134,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
Collection<UnacknowledgedMessage> currentEntries = _map.values();
_map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit);
+ _unackedSize = 0l;
return currentEntries;
}
}
@@ -149,6 +161,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.clear();
+ _unackedSize = 0l;
}
}
@@ -169,6 +182,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
it.remove();
+ _unackedSize -= unacked.getValue().message.getSize();
destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
@@ -189,7 +203,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
AMQShortString consumerTag = entry.getValue().consumerTag;
AMQMessage msg = entry.getValue().message;
- msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ if(consumerTag != null)
+ {
+ msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag);
+ }
}
}
}
@@ -224,4 +241,9 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
}
+
+ public long getUnacknowledgeBytes()
+ {
+ return _unackedSize;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index cadcd22001..374772bc4a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -38,6 +38,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
*/
private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -53,9 +55,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
@@ -72,7 +83,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public Exchange getExchange(AMQShortString name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -83,7 +103,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void routeContent(AMQMessage payload) throws AMQException
{
final AMQShortString exchange = payload.getPublishBody().exchange;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
// the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index efcb963f8b..24884d20d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -38,4 +38,6 @@ public interface ExchangeRegistry extends MessageRouter
void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException;
Exchange getExchange(AMQShortString name);
+
+ void setDefaultExchange(Exchange exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index e078b0cdee..721001b454 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.AMQChannel;
@@ -66,6 +68,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
if (queue == null)
@@ -73,29 +76,13 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
_log.info("No queue for '" + body.queue + "'");
if(body.queue!=null)
{
- AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_FOUND.getCode(), // replyCode
- msg)); // replyText
+ String msg = "No such queue, '" + body.queue + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
}
else
{
- AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined.");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
}
}
else
@@ -103,7 +90,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
try
{
AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments, body.noLocal);
+ body.arguments, body.noLocal, body.exclusive);
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -143,6 +130,21 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg)); // replyText
}
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer");
+ }
+
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
index 25cc981693..0ef30be265 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicPublishBody;
@@ -42,7 +43,6 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
- private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name");
public static BasicPublishMethodHandler getInstance()
{
@@ -74,19 +74,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi
// if the exchange does not exist we raise a channel exception
if (e == null)
{
- protocolSession.closeChannel(evt.getChannelId());
- // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
- // then we can remove the hardcoded 0,0
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- ChannelCloseBody.getClazz((byte)8, (byte)0), // classId
- ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId
- 500, // replyCode
- UNKNOWN_EXCHANGE_NAME); // replyText
- protocolSession.writeFrame(cf);
+ throw body.getChannelException(500, "Unknown exchange name");
+
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
index 60f6458b8c..2bab4cac5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
@@ -44,6 +44,8 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+ session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 0e37871439..f3e0cc3a63 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -52,6 +52,8 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
- channel.resend(protocolSession);
+ BasicRecoverBody body = evt.getMethod();
+ channel.resend(protocolSession, body.requeue);
+
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
index b16de88851..8056ff9adb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
@@ -56,21 +56,22 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con
AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
ConnectionOpenBody body = evt.getMethod();
- AMQShortString contextKey = body.virtualHost;
+
+
//todo //FIXME The virtual host must be validated by the server for the connection to open-ok
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (contextKey == null)
+ if (protocolSession.getContextKey() == null)
{
- contextKey = generateClientID();
+ protocolSession.setContextKey(generateClientID());
}
- protocolSession.setContextKey(contextKey);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
(byte)8, (byte)0, // AMQP version (major, minor)
- contextKey); // knownHosts
+ body.virtualHost); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeFrame(response);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index f6897227aa..84e9a4e3f4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
if(body.passive && ((body.type == null) || body.type.length() ==0))
{
- throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange);
}
else
{
@@ -89,7 +89,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
}
catch(AMQUnknownExchangeType e)
{
- throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index c2bec68b89..19df23b103 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -77,22 +77,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
{
body.queue = createName();
}
+
+ AMQQueue queue = null;
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
synchronized (queueRegistry)
{
- AMQQueue queue;
+
if (((queue = queueRegistry.getQueue(body.queue)) == null) )
{
if(body.passive)
{
String msg = "Queue: " + body.queue + " not found.";
- throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(),
- msg,
- body.getClazz(),
- body.getMethod(),
- (byte)8,
- (byte)0 );
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg );
}
else
@@ -112,9 +109,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
+ else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ {
+ // todo - constant
+ throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
+
+ }
//set this as the default queue on the channel:
protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
+
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -122,8 +126,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
// Be aware of possible changes to parameter order as versions change.
AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
(byte)8, (byte)0, // AMQP version (major, minor)
- 0L, // consumerCount
- 0L, // messageCount
+ queue.getConsumerCount(), // consumerCount
+ queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeFrame(response);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 3f6d752f74..245d86a7a6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
@@ -84,15 +85,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete
{
if(body.ifEmpty && !queue.isEmpty())
{
- AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty.");
- // TODO - Error code
- session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." );
}
else if(body.ifUnused && !queue.isUnused())
- {
- AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used.");
+ {
// TODO - Error code
- session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg ));
+ throw body.getChannelException(406, "Queue: " + body.queue + " is still used." );
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 7dd1f9579b..d71c93a6c6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -57,7 +57,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession);
+ channel.resend(protocolSession, false);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index fa43b8809d..ed74263596 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -27,6 +27,7 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -57,6 +58,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+
+
private final IoSession _minaProtocolSession;
private AMQShortString _contextKey;
@@ -218,31 +222,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
(AMQMethodBody) frame.bodyFrame);
try
{
- boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
-
- if(!_frameListeners.isEmpty())
+ try
{
- for (AMQMethodListener listener : _frameListeners)
+ boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
+
+ if(!_frameListeners.isEmpty())
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ for (AMQMethodListener listener : _frameListeners)
+ {
+ wasAnyoneInterested = listener.methodReceived(evt) ||
+ wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
}
}
- if (!wasAnyoneInterested)
+ catch (AMQChannelException e)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ _logger.error("Closing channel due to: " + e.getMessage());
+ writeFrame(e.getCloseFrame(frame.channel));
+ closeChannel(frame.channel);
+ }
+ catch (AMQConnectionException e)
+ {
+ _logger.error("Closing connection due to: " + e.getMessage());
+ closeSession();
+ writeFrame(e.getCloseFrame(frame.channel));
}
}
- catch (AMQChannelException e)
- {
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- }
- catch (AMQConnectionException e)
- {
- _logger.error("Closing connection due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- }
catch (Exception e)
{
_stateManager.error(e);
@@ -516,6 +525,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
+ if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+ {
+ setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
+ }
}
/**
@@ -537,4 +550,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return _major == major && _minor == minor;
}
+
+
+ public Object getClientIdentifier()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 934d1ccff8..a1249723ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -124,4 +124,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter
FieldTable getClientProperties();
void setClientProperties(FieldTable clientProperties);
+
+ Object getClientIdentifier();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
index e1fac55d3b..8b5f05e8ea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
@@ -33,6 +33,8 @@ public class ExchangeInitialiser
define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+ registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c55d24d507..c227cd5094 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -43,8 +43,6 @@ public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- public static final String JMS_MESSAGE = "jms.message";
-
/**
* Used in clustering
*/
@@ -75,6 +73,8 @@ public class AMQMessage
private TransientMessageData _transientMessageData = new TransientMessageData();
+
+
/**
* Used to iterate through all the body frames associated with this message. Will not
* keep all the data in memory therefore is memory-efficient.
@@ -550,6 +550,7 @@ public class AMQMessage
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
contentHeader);
+
protocolSession.writeFrame(compositeBlock);
}
else
@@ -582,6 +583,50 @@ public class AMQMessage
}
+ public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ getContentHeaderBody());
+
+ final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ if(bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+
+ AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentBody(_messageId, i);
+ protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ }
+
+
+ }
+
+
+ }
+
+
private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
@@ -595,6 +640,21 @@ public class AMQMessage
return buf;
}
+ private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ BasicPublishBody pb = getPublishBody();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0,
+ deliveryTag, pb.exchange,
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
@@ -643,6 +703,24 @@ public class AMQMessage
}
}
+
+ public long getSize()
+ {
+ try
+ {
+ long size = getContentHeaderBody().bodySize;
+
+ return size;
+ }
+ catch (AMQException e)
+ {
+ _log.error(e);
+ return 0;
+ }
+
+ }
+
+
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 368cb979e8..18b3adc635 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -29,11 +29,14 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -41,6 +44,30 @@ import java.util.concurrent.Executor;
*/
public class AMQQueue implements Managable, Comparable
{
+
+ public static final class ExistingExclusiveSubscription extends AMQException
+ {
+
+ public ExistingExclusiveSubscription()
+ {
+ super("");
+ }
+ }
+
+ public static final class ExistingSubscriptionPreventsExclusive extends AMQException
+ {
+
+ public ExistingSubscriptionPreventsExclusive()
+ {
+ super("");
+ }
+ }
+
+ private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription();
+ private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
+
+
+
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
@@ -64,6 +91,11 @@ public class AMQQueue implements Managable, Comparable
private final SubscriptionFactory _subscriptionFactory;
+ private final AtomicInteger _subscriberCount = new AtomicInteger();
+
+ private final AtomicBoolean _isExclusive = new AtomicBoolean();
+
+
/**
* Manages message delivery.
*/
@@ -187,31 +219,7 @@ public class AMQQueue implements Managable, Comparable
_managedObject.register();
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -352,9 +360,9 @@ public class AMQQueue implements Managable, Comparable
/**
* removes all the messages from the queue.
*/
- public void clearQueue(StoreContext storeContext) throws AMQException
+ public long clearQueue(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.clearAllMessages(storeContext);
+ return _deliveryMgr.clearAllMessages(storeContext);
}
public void bind(AMQShortString routingKey, Exchange exchange)
@@ -362,14 +370,30 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException
- {
- registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
- }
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
+ if(incrementSubscriberCount() > 1)
+ {
+ if(isExclusive())
+ {
+ decrementSubscriberCount();
+ throw EXISTING_EXCLUSIVE;
+ }
+ else if(exclusive)
+ {
+ decrementSubscriberCount();
+ throw EXISTING_SUBSCRIPTION;
+ }
+
+ }
+ else if(exclusive)
+ {
+ setExclusive(true);
+ }
+
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
@@ -385,6 +409,28 @@ public class AMQQueue implements Managable, Comparable
_subscribers.addSubscriber(subscription);
}
+
+ private boolean isExclusive()
+ {
+ return _isExclusive.get();
+ }
+
+ private void setExclusive(boolean exclusive)
+ {
+ _isExclusive.set(exclusive);
+ }
+
+ private int incrementSubscriberCount()
+ {
+ return _subscriberCount.incrementAndGet();
+ }
+
+ private int decrementSubscriberCount()
+ {
+ return _subscriberCount.decrementAndGet();
+ }
+
+
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
@@ -400,6 +446,10 @@ public class AMQQueue implements Managable, Comparable
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ setExclusive(false);
+ decrementSubscriberCount();
+
+
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -454,6 +504,23 @@ public class AMQQueue implements Managable, Comparable
delete();
}
+ public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+ {
+ _deliveryMgr.deliver(storeContext, getName(), msg);
+ try
+ {
+ msg.checkDeliveredToConsumer();
+ updateReceivedMessageCount(msg);
+ }
+ catch (NoConsumersException e)
+ {
+ // as this message will be returned, it should be removed
+ // from the queue:
+ dequeue(storeContext, msg);
+ }
+ }
+
+
public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
_deliveryMgr.deliver(storeContext, getName(), msg);
@@ -547,4 +614,12 @@ public class AMQQueue implements Managable, Comparable
_logger.debug(MessageFormat.format(msg, args));
}
}
+
+ public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+ {
+ return _deliveryMgr.performGet(session, channel, acks);
+ }
+
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
deleted file mode 100644
index 1a44e86f1a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-public class ConcurrentDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
-
- @Configured(path = "advanced.compressBufferOnQueue",
- defaultValue = "false")
- public boolean compressBufferOnQueue;
-
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
-
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
- /**
- * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
- */
- private ReentrantLock _lock = new ReentrantLock();
-
- ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
-
- //Set values from configuration
- Configurator.configure(this);
-
- if (compressBufferOnQueue)
- {
- _log.info("Compressing Buffers on queue.");
- }
-
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- /**
- * @return boolean if we are queueing
- */
- private boolean queueing()
- {
- return hasQueuedMessages();
- }
-
- /**
- * @param msg to enqueue
- * @return true if we are queue this message
- */
- private boolean enqueue(AMQMessage msg) throws AMQException
- {
- if (msg.getPublishBody().immediate)
- {
- return false;
- }
- else
- {
- _lock.lock();
- try
- {
- if (queueing())
- {
- return addMessageToQueue(msg);
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private void startQueueing(AMQMessage msg) throws AMQException
- {
- if (!msg.getPublishBody().immediate)
- {
- addMessageToQueue(msg);
- }
- }
-
- private boolean addMessageToQueue(AMQMessage msg)
- {
- // Shrink the ContentBodies to their actual size to save memory.
- /* TODO need to reimplement this - probably not in this class though
- * for obvious reasons
-
- if (compressBufferOnQueue)
- {
- Iterator it = msg.getContentBodies().iterator();
- while (it.hasNext())
- {
- ContentBody cb = (ContentBody) it.next();
- cb.reduceBufferToFit();
- }
- }
- */
- _messages.offer(msg);
-
- return true;
- }
-
- public boolean hasQueuedMessages()
- {
- _lock.lock();
- try
- {
- return !_messages.isEmpty();
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public int getQueueMessageCount()
- {
- return getMessageCount();
- }
-
- /**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
- *
- * @return int the number of messages in the delivery queue.
- */
- private int getMessageCount()
- {
- return _messages.size();
- }
-
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(storeContext, _queue);
- }
- }
-
- public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(storeContext, _queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue() throws AMQException
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- AMQMessage message = peek();
-
- //While we have messages to send and subscribers to send them to.
- while (message != null && hasSubscribers)
- {
- // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
- Subscription next = _subscriptions.nextSubscriber(message);
- //FIXME Is there still not the chance that this subscribe could be suspended between here and the send?
-
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- next.send(message, _queue);
- poll();
- message = peek();
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- catch (FailedDequeueException e)
- {
- _log.error("Unable to deliver message as dequeue failed: " + e, e);
- }
- finally
- {
- _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
- }
- }
-
- private AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private AMQMessage poll()
- {
- return _messages.poll();
- }
-
- Runner asyncDelivery = new Runner();
-
- public void processAsync(Executor executor)
- {
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
-
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(asyncDelivery);
- }
- }
- }
- }
-
- public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- // not queueing so deliver message to 'next' subscriber
- _lock.lock();
- try
- {
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- if (!msg.getPublishBody().immediate)
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- boolean running = true;
- while (running)
- {
- try
- {
- processQueue();
- }
- catch (AMQException e)
- {
- _log.error("Error processing queue: " + e, e);
- _log.error("Delivery manager terminating.");
- running = false;
- _processing.set(false);
- break;
- }
-
- //Check that messages have not been added since we did our last peek();
- // Synchronize with the thread that adds to the queue.
- // If the queue is still empty then we can exit
- _lock.lock();
- try
- {
- if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
- {
- running = false;
- _processing.set(false);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 91c49a4cf9..ba4d0bf4ba 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -28,6 +28,8 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.ArrayList;
import java.util.Iterator;
@@ -52,6 +54,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
* Holds any queued messages
*/
private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+ private final ReentrantLock _messageAccessLock = new ReentrantLock();
+
//private int _messageCount;
/**
* Ensures that only one asynchronous task is running for this manager at
@@ -169,6 +174,56 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
+ {
+ AMQMessage msg = getNextMessage();
+ if(msg == null)
+ {
+ return false;
+ }
+ else
+ {
+
+ try
+ {
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
+
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
+
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!acks)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ _queue.dequeue(channel.getStoreContext(), msg);
+ }
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ if (acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+ }
+
+ msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ }
+ }
+ finally
+ {
+ msg.setDeliveredToConsumer();
+ }
+ return true;
+
+ }
+ }
+
public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
@@ -178,22 +233,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
+ public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
{
+ long count = 0;
AMQMessage msg = poll();
while (msg != null)
{
msg.dequeue(storeContext, _queue);
+ count++;
msg = poll();
}
+ return count;
+ }
+
+ public synchronized AMQMessage getNextMessage() throws AMQException
+ {
+ return getNextMessage(_messages);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ {
+ return getNextMessage(messages, false);
+ }
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
{
AMQMessage message = messages.peek();
- while (message != null && (sub.isBrowser() || message.taken()))
+ while (message != null && (browsing || message.taken()))
{
//remove the already taken message
messages.poll();
@@ -208,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub);
+ message = getNextMessage(messageQueue, sub.isBrowser());
// message will be null if we have no messages in the messageQueue.
if (message == null)
@@ -287,6 +355,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug(id() + "deliver :" + msg);
}
+ msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index d3d235f07f..6954be8473 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.concurrent.Executor;
import java.util.List;
@@ -72,9 +74,11 @@ interface DeliveryManager
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
- void clearAllMessages(StoreContext storeContext) throws AMQException;
+ long clearAllMessages(StoreContext storeContext) throws AMQException;
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
+
+ boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 2dab551e07..5277069d33 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -45,4 +45,6 @@ public interface Subscription
void close();
boolean isBrowser();
+
+ boolean wouldSuspend(AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e2356faaf5..e120752959 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -66,6 +66,7 @@ public class SubscriptionImpl implements Subscription
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
public static class Factory implements SubscriptionFactory
{
@@ -300,37 +301,54 @@ public class SubscriptionImpl implements Subscription
{
if (_noLocal)
{
+ boolean isLocal;
// We don't want local messages so check to see if message is one we sent
- Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString());
- Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString());
+ Object localInstance;
+ Object msgInstance;
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ if((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (_logger.isTraceEnabled())
+ if((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
}
- return false;
}
- else // if not then filter the message.
+ else
{
- if (_logger.isTraceEnabled())
+ localInstance = protocolSession.getClientIdentifier();
+ msgInstance = msg.getPublisher().getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
- _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
- ") but not ours so filtering");
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
}
- return checkFilters(msg);
+
}
+
+
}
- else
+
+
+ if (_logger.isTraceEnabled())
{
- if (_logger.isTraceEnabled())
- {
- _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
- }
- return checkFilters(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
}
+ return checkFilters(msg);
+
}
private boolean checkFilters(AMQMessage msg)
@@ -393,6 +411,11 @@ public class SubscriptionImpl implements Subscription
return _isBrowser;
}
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ return channel.wouldSuspend(msg);
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, AMQShortString routingKey, AMQShortString exchange)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 8272202571..e7c90fb201 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -137,7 +137,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended())
+ if (!(subscription.isSuspended() || subscription.wouldSuspend(msg)))
{
if (subscription.hasInterest(msg))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
deleted file mode 100644
index 02fe86a083..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.log4j.Logger;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-class SynchronizedDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
-
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
-
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
-
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * An indication of the mode we are in. If this is true then messages are
- * being queued up in _messages for asynchronous delivery. If it is false
- * then messages can be delivered directly as they come in.
- */
- private volatile boolean _queueing;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
- SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- private synchronized boolean enqueue(AMQMessage msg) throws AMQException
- {
- if (msg.getPublishBody().immediate)
- {
- return false;
- }
- else
- {
- if (_queueing)
- {
- _messages.offer(msg);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private synchronized void startQueueing(AMQMessage msg) throws AMQException
- {
- _queueing = true;
- enqueue(msg);
- }
-
- /**
- * Determines whether there are queued messages. Sets _queueing to false if
- * there are no queued messages. This needs to be atomic.
- *
- * @return true if there are queued messages
- */
- public synchronized boolean hasQueuedMessages()
- {
- boolean empty = _messages.isEmpty();
- if (empty)
- {
- _queueing = false;
- }
- return !empty;
- }
-
- public synchronized int getQueueMessageCount()
- {
- return _messages.size();
- }
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(storeContext, _queue);
- }
- }
-
- public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(storeContext, _queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue()
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- while (hasQueuedMessages() && hasSubscribers)
- {
- Subscription next = _subscriptions.nextSubscriber(peek());
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- try
- {
- next.send(poll(), _queue);
- }
- catch (AMQException e)
- {
- _log.error("Unable to deliver message: " + e, e);
- }
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- finally
- {
- _processing.set(false);
- }
- }
-
- private synchronized AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private synchronized AMQMessage poll()
- {
- return _messages.poll();
- }
-
- /**
- * Requests that the delivery manager start processing the queue asynchronously
- * if there is work that can be done (i.e. there are messages queued up and
- * subscribers that can receive them.
- * <p/>
- * This should be called when subscribers are added, but only after the consume-ok
- * message has been returned as message delivery may start immediately. It should also
- * be called after unsuspending a client.
- * <p/>
- *
- * @param executor the executor on which the delivery should take place
- */
- public void processAsync(Executor executor)
- {
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(new Runner());
- }
- }
- }
- }
-
- /**
- * Handles message delivery. The delivery manager is always in one of two modes;
- * it is either queueing messages for asynchronous delivery or delivering
- * directly.
- *
- * @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
- * @throws NoConsumersException if there are no active subscribers to deliver
- * the message to
- */
- public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- synchronized(this)
- {
- // not queueing so deliver message to 'next' subscriber
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- }
-
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- processQueue();
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 70e530699e..81ce704026 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.handler.*;
@@ -28,6 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.log4j.Logger;
import java.util.HashMap;
@@ -118,12 +121,14 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
+ frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
+ frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
@@ -168,12 +173,26 @@ public class AMQStateManager implements AMQMethodListener
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
+
+ checkChannel(evt, _protocolSession);
+
handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
return true;
}
return false;
}
+ private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
+ throws AMQException
+ {
+ if(evt.getChannelId() != 0
+ && !(evt.getMethod() instanceof ChannelOpenBody)
+ && protocolSession.getChannel(evt.getChannelId()) == null)
+ {
+ throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId());
+ }
+ }
+
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
B frame)
throws IllegalStateTransitionException
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b3ae54f982..f038f1fdea 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -269,14 +269,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
+ byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
+ Destination dest = AMQDestination.createDestination(url);
+ jmsMsg.setJMSDestination(dest);
+
if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
- byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
- Destination dest = AMQDestination.createDestination(url);
- jmsMsg.setJMSDestination(dest);
-
}
+
_session.setInRecovery(false);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index ef5239fc87..364aea81c0 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -134,11 +134,12 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
- public void sendNextMessage(AMQQueue queue)
+ public boolean wouldSuspend(AMQMessage msg)
{
-
+ return _suspended;
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
index 477123a4ec..c6a874bcf3 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
@@ -32,6 +32,7 @@ public class AMQConnectionException extends AMQException
/* AMQP version for which exception ocurred */
private final byte major;
private final byte minor;
+ boolean _closeConnetion;
public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t)
{
@@ -51,9 +52,12 @@ public class AMQConnectionException extends AMQException
this.minor = minor;
}
+
+
public AMQFrame getCloseFrame(int channel)
{
return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage()));
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 5ccc900b2c..cd178a6197 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -22,6 +22,7 @@ package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
public abstract class AMQMethodBody extends AMQBody
{
@@ -101,4 +102,17 @@ public abstract class AMQMethodBody extends AMQBody
{
return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause);
}
+
+ public AMQConnectionException getConnectionException(int code, String message)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor);
+ }
+
+
+
+ public AMQConnectionException getConnectionException(int code, String message, Throwable cause)
+ {
+ return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause);
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 91a26632a1..64492e3d67 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -78,7 +78,7 @@ public class AMQQueueMBeanTest extends TestCase
_protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
- _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null);
+ _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index e428b9ef60..f090f431c3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -53,7 +53,7 @@ public class ConcurrencyTest extends MessageTestHelper
public ConcurrencyTest() throws Exception
{
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
new DefaultQueueRegistry()));
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
deleted file mode 100644
index 1943532a51..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-
-public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
-{
- public ConcurrentDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class);
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index d88614298f..e1be640c8e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -172,8 +172,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
public static junit.framework.Test suite()
{
TestSuite suite = new TestSuite();
- suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
return suite;
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 3586749f53..1fb2a1024f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -132,4 +132,9 @@ public class MockProtocolSession implements AMQProtocolSession
public void setClientProperties(FieldTable clientProperties)
{
}
+
+ public Object getClientIdentifier()
+ {
+ return null;
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index fea3c93280..b3574ecba4 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -67,6 +67,12 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
+ public boolean wouldSuspend(AMQMessage msg)
+ {
+ return isSuspended;
+ }
+
+
public void queueDeleted(AMQQueue queue)
{
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
deleted file mode 100644
index 3c5aab0911..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-
-import junit.framework.TestSuite;
-
-public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
-{
- public SynchronizedDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","false");
- _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}