diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-19 10:35:21 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-19 10:35:21 +0000 |
commit | cbee9e6623bd4c1a9790613c39517a600ca289d6 (patch) | |
tree | 165522a99560d49de2eaf2e73b803a28cdec9dd4 /java | |
parent | eae73349c9690704b54c3e5a3a77c7f95482f593 (diff) | |
download | qpid-python-cbee9e6623bd4c1a9790613c39517a600ca289d6.tar.gz |
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@497770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
38 files changed, 592 insertions, 900 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 10f039779c..2529ddc064 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.mina.common.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,8 @@ public class AMQChannel private long _prefetch_LowWaterMark; + private long _prefetchSize; + /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out @@ -108,6 +111,8 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -151,6 +156,17 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } + public long getPrefetchSize() + { + return _prefetchSize; + } + + + public void setPrefetchSize(long prefetchSize) + { + _prefetchSize = prefetchSize; + } + public long getPrefetchLowMarkCount() { return _prefetch_LowWaterMark; @@ -213,14 +229,15 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - // returns true iff the message was delivered (i.e. if all data was - // received if (_log.isDebugEnabled()) { _log.debug("Content body received on channel " + _channelId); } try { + + // returns true iff the message was delivered (i.e. if all data was + // received if (_currentMessage.addContentBodyFrame(_storeContext, contentBody)) { // callback to allow the context to do any post message processing @@ -269,13 +286,14 @@ public class AMQChannel * @param queue the queue to subscribe to * @param session the protocol session of the subscriber * @param noLocal + * @param exclusive * @return the consumer tag. This is returned to the subscriber and used in * subsequent unsubscribe requests * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -286,7 +304,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal); + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; } @@ -364,8 +382,10 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(final AMQProtocolSession session) throws AMQException + public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException { + final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException @@ -374,7 +394,20 @@ public class AMQChannel AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); - msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + { + msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null && (consumerTag == null || requeue)) + { + msgToRequeue.add(message); + } + } // false means continue processing return false; } @@ -383,6 +416,12 @@ public class AMQChannel { } }); + + for(UnacknowledgedMessage message : msgToRequeue) + { + _txnContext.deliver(message.message, message.queue); + _unacknowledgedMessageMap.remove(message.deliveryTag); + } } /** @@ -459,8 +498,9 @@ public class AMQChannel { boolean suspend; - suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; - + suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) + || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + setSuspended(suspend); } @@ -545,4 +585,31 @@ public class AMQChannel } _returnMessages.clear(); } + + + public boolean wouldSuspend(AMQMessage msg) + { + if (isSuspended()) + { + return true; + } + else + { + boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); + if(!willSuspend) + { + final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); + + willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize); + } + + + if(willSuspend) + { + setSuspended(true); + } + return willSuspend; + } + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index ef58ba01a3..7ea22a447f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -73,5 +73,7 @@ public interface UnacknowledgedMessageMap * @return a set of delivery tags */ Set<Long> getDeliveryTags(); + + public long getUnacknowledgeBytes(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index a21e4cfff6..e50d239d57 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -32,6 +32,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { private final Object _lock = new Object(); + private long _unackedSize; + private Map<Long, UnacknowledgedMessage> _map; private long _lastDeliveryTag; @@ -77,7 +79,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { for (UnacknowledgedMessage msg : msgs) { - _map.remove(msg.deliveryTag); + remove(msg.deliveryTag); + } } } @@ -86,7 +89,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - return _map.remove(deliveryTag); + + UnacknowledgedMessage message = _map.remove(deliveryTag); + if(message != null) + { + _unackedSize -= message.message.getSize(); + } + + return message; } } @@ -113,6 +123,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.put(deliveryTag, message); + _unackedSize += message.message.getSize(); _lastDeliveryTag = deliveryTag; } } @@ -123,6 +134,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { Collection<UnacknowledgedMessage> currentEntries = _map.values(); _map = new LinkedHashMap<Long, UnacknowledgedMessage>(_prefetchLimit); + _unackedSize = 0l; return currentEntries; } } @@ -149,6 +161,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { _map.clear(); + _unackedSize = 0l; } } @@ -169,6 +182,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } it.remove(); + _unackedSize -= unacked.getValue().message.getSize(); destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) @@ -189,7 +203,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap AMQShortString consumerTag = entry.getValue().consumerTag; AMQMessage msg = entry.getValue().message; - msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + if(consumerTag != null) + { + msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); + } } } } @@ -224,4 +241,9 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } } + + public long getUnacknowledgeBytes() + { + return _unackedSize; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index cadcd22001..374772bc4a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -38,6 +38,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>(); + private Exchange _defaultExchange; + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { //create 'standard' exchanges: @@ -53,9 +55,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) { + if(_defaultExchange == null) + { + setDefaultExchange(exchange); + } _exchangeMap.put(exchange.getName(), exchange); } + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException { // TODO: check inUse argument @@ -72,7 +83,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(AMQShortString name) { - return _exchangeMap.get(name); + + if(name == null || name.length() == 0) + { + return _defaultExchange; + } + else + { + return _exchangeMap.get(name); + } + } /** @@ -83,7 +103,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void routeContent(AMQMessage payload) throws AMQException { final AMQShortString exchange = payload.getPublishBody().exchange; - final Exchange exch = _exchangeMap.get(exchange); + final Exchange exch = getExchange(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the BasicPublish being received (where the exchange is validated) and the final // content body being received (which triggers this method) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index efcb963f8b..24884d20d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -38,4 +38,6 @@ public interface ExchangeRegistry extends MessageRouter void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException; Exchange getExchange(AMQShortString name); + + void setDefaultExchange(Exchange exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index e078b0cdee..721001b454 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; @@ -66,6 +68,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); if (queue == null) @@ -73,29 +76,13 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic _log.info("No queue for '" + body.queue + "'"); if(body.queue!=null) { - AMQShortString msg = new AMQShortString("No such queue, '" + body.queue + "'"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_FOUND.getCode(), // replyCode - msg)); // replyText + String msg = "No such queue, '" + body.queue + "'"; + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); } else { - AMQShortString msg = new AMQShortString("No queue name provided, no default queue defined."); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText + String msg = "No queue name provided, no default queue defined."; + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg ); } } else @@ -103,7 +90,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic try { AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal); + body.arguments, body.noLocal, body.exclusive); if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -143,6 +130,21 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg)); // replyText } + catch (AMQQueue.ExistingExclusiveSubscription e) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer"); + } + catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer"); + } + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 25cc981693..0ef30be265 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicPublishBody; @@ -42,7 +43,6 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); - private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name"); public static BasicPublishMethodHandler getInstance() { @@ -74,19 +74,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // if the exchange does not exist we raise a channel exception if (e == null) { - protocolSession.closeChannel(evt.getChannelId()); - // TODO: modify code gen to make getClazz and getMethod public methods rather than protected - // then we can remove the hardcoded 0,0 - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), - (byte)8, (byte)0, // AMQP version (major, minor) - ChannelCloseBody.getClazz((byte)8, (byte)0), // classId - ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId - 500, // replyCode - UNKNOWN_EXCHANGE_NAME); // replyText - protocolSession.writeFrame(cf); + throw body.getChannelException(500, "Unknown exchange name"); + } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 60f6458b8c..2bab4cac5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -44,6 +44,8 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException { session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); + session.getChannel(evt.getChannelId()).setPrefetchSize(evt.getMethod().prefetchSize); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index 0e37871439..f3e0cc3a63 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -52,6 +52,8 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic { throw new AMQException("Unknown channel " + evt.getChannelId()); } - channel.resend(protocolSession); + BasicRecoverBody body = evt.getMethod(); + channel.resend(protocolSession, body.requeue); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index b16de88851..8056ff9adb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -56,21 +56,22 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException { ConnectionOpenBody body = evt.getMethod(); - AMQShortString contextKey = body.virtualHost; + + //todo //FIXME The virtual host must be validated by the server for the connection to open-ok // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (contextKey == null) + if (protocolSession.getContextKey() == null) { - contextKey = generateClientID(); + protocolSession.setContextKey(generateClientID()); } - protocolSession.setContextKey(contextKey); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, (byte)8, (byte)0, // AMQP version (major, minor) - contextKey); // knownHosts + body.virtualHost); // knownHosts stateManager.changeState(AMQState.CONNECTION_OPEN); protocolSession.writeFrame(response); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index f6897227aa..84e9a4e3f4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -76,7 +76,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { if(body.passive && ((body.type == null) || body.type.length() ==0)) { - throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor()); + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " + body.exchange); } else { @@ -89,7 +89,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange } catch(AMQUnknownExchangeType e) { - throw new AMQConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),e); + throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange: " + body.exchange,e); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index c2bec68b89..19df23b103 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -77,22 +77,19 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar { body.queue = createName(); } + + AMQQueue queue = null; //TODO: do we need to check that the queue already exists with exactly the same "configuration"? synchronized (queueRegistry) { - AMQQueue queue; + if (((queue = queueRegistry.getQueue(body.queue)) == null) ) { if(body.passive) { String msg = "Queue: " + body.queue + " not found."; - throw new AMQChannelException(AMQConstant.NOT_FOUND.getCode(), - msg, - body.getClazz(), - body.getMethod(), - (byte)8, - (byte)0 ); + throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(),msg ); } else @@ -112,9 +109,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } + else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner())) + { + // todo - constant + throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection"); + + } //set this as the default queue on the channel: protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue); } + if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) @@ -122,8 +126,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar // Be aware of possible changes to parameter order as versions change. AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0, // AMQP version (major, minor) - 0L, // consumerCount - 0L, // messageCount + queue.getConsumerCount(), // consumerCount + queue.getMessageCount(), // messageCount body.queue); // queue _log.info("Queue " + body.queue + " declared successfully"); protocolSession.writeFrame(response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 3f6d752f74..245d86a7a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQChannelException; import org.apache.qpid.protocol.AMQConstant; public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody> @@ -84,15 +85,12 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete { if(body.ifEmpty && !queue.isEmpty()) { - AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is not empty."); - // TODO - Error code - session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg )); + throw body.getChannelException(406, "Queue: " + body.queue + " is not empty." ); } else if(body.ifUnused && !queue.isUnused()) - { - AMQShortString msg = new AMQShortString("Queue: " + body.queue + " is still used."); + { // TODO - Error code - session.writeFrame(ChannelCloseBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0, body.getClazz(), body.getMethod(), 406, msg )); + throw body.getChannelException(406, "Queue: " + body.queue + " is still used." ); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 7dd1f9579b..d71c93a6c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -57,7 +57,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0)); //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). - channel.resend(protocolSession); + channel.resend(protocolSession, false); }catch(AMQException e){ throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index fa43b8809d..ed74263596 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -27,6 +27,7 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -57,6 +58,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + + private final IoSession _minaProtocolSession; private AMQShortString _contextKey; @@ -218,31 +222,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, (AMQMethodBody) frame.bodyFrame); try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - - if(!_frameListeners.isEmpty()) + try { - for (AMQMethodListener listener : _frameListeners) + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + + if(!_frameListeners.isEmpty()) { - wasAnyoneInterested = listener.methodReceived(evt) || - wasAnyoneInterested; + for (AMQMethodListener listener : _frameListeners) + { + wasAnyoneInterested = listener.methodReceived(evt) || + wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); } } - if (!wasAnyoneInterested) + catch (AMQChannelException e) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + _logger.error("Closing channel due to: " + e.getMessage()); + writeFrame(e.getCloseFrame(frame.channel)); + closeChannel(frame.channel); + } + catch (AMQConnectionException e) + { + _logger.error("Closing connection due to: " + e.getMessage()); + closeSession(); + writeFrame(e.getCloseFrame(frame.channel)); } } - catch (AMQChannelException e) - { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.channel)); - } - catch (AMQConnectionException e) - { - _logger.error("Closing connection due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.channel)); - } catch (Exception e) { _stateManager.error(e); @@ -516,6 +525,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void setClientProperties(FieldTable clientProperties) { _clientProperties = clientProperties; + if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) + { + setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); + } } /** @@ -537,4 +550,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _major == major && _minor == minor; } + + + public Object getClientIdentifier() + { + return _minaProtocolSession.getRemoteAddress(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 934d1ccff8..a1249723ee 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -124,4 +124,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter FieldTable getClientProperties(); void setClientProperties(FieldTable clientProperties); + + Object getClientIdentifier(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index e1fac55d3b..8b5f05e8ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -33,6 +33,8 @@ public class ExchangeInitialiser define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + + registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index c55d24d507..c227cd5094 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -43,8 +43,6 @@ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); - public static final String JMS_MESSAGE = "jms.message"; - /** * Used in clustering */ @@ -75,6 +73,8 @@ public class AMQMessage private TransientMessageData _transientMessageData = new TransientMessageData(); + + /** * Used to iterate through all the body frames associated with this message. Will not * keep all the data in memory therefore is memory-efficient. @@ -550,6 +550,7 @@ public class AMQMessage { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, contentHeader); + protocolSession.writeFrame(compositeBlock); } else @@ -582,6 +583,50 @@ public class AMQMessage } + public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException + { + ByteBuffer deliver = createEncodedGetOkFrame(channelId, deliveryTag, queueSize); + AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, + getContentHeaderBody()); + + final int bodyCount = _messageHandle.getBodyCount(_messageId); + if(bodyCount == 0) + { + SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, + contentHeader); + protocolSession.writeFrame(compositeBlock); + } + else + { + + + // + // Optimise the case where we have a single content body. In that case we create a composite block + // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. + // + ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + + AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); + AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent); + protocolSession.writeFrame(compositeBlock); + + // + // Now start writing out the other content bodies + // + for(int i = 1; i < bodyCount; i++) + { + cb = _messageHandle.getContentBody(_messageId, i); + protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); + } + + + } + + + } + + private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { @@ -595,6 +640,21 @@ public class AMQMessage return buf; } + private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize) + throws AMQException + { + BasicPublishBody pb = getPublishBody(); + AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId, (byte) 8, (byte) 0, + deliveryTag, pb.exchange, + queueSize, + _messageHandle.isRedelivered(), + pb.routingKey); + ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem? + getOkFrame.writePayload(buf); + buf.flip(); + return buf; + } + private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException { AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, @@ -643,6 +703,24 @@ public class AMQMessage } } + + public long getSize() + { + try + { + long size = getContentHeaderBody().bodySize; + + return size; + } + catch (AMQException e) + { + _log.error(e); + return 0; + } + + } + + public String toString() { return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 368cb979e8..18b3adc635 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -29,11 +29,14 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.AMQChannel; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like @@ -41,6 +44,30 @@ import java.util.concurrent.Executor; */ public class AMQQueue implements Managable, Comparable { + + public static final class ExistingExclusiveSubscription extends AMQException + { + + public ExistingExclusiveSubscription() + { + super(""); + } + } + + public static final class ExistingSubscriptionPreventsExclusive extends AMQException + { + + public ExistingSubscriptionPreventsExclusive() + { + super(""); + } + } + + private static final ExistingExclusiveSubscription EXISTING_EXCLUSIVE = new ExistingExclusiveSubscription(); + private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive(); + + + private static final Logger _logger = Logger.getLogger(AMQQueue.class); private final AMQShortString _name; @@ -64,6 +91,11 @@ public class AMQQueue implements Managable, Comparable private final SubscriptionFactory _subscriptionFactory; + private final AtomicInteger _subscriberCount = new AtomicInteger(); + + private final AtomicBoolean _isExclusive = new AtomicBoolean(); + + /** * Manages message delivery. */ @@ -187,31 +219,7 @@ public class AMQQueue implements Managable, Comparable _managedObject.register(); _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; - - //fixme - Make this configurable via the broker config.xml - if (System.getProperties().getProperty("deliverymanager") != null) - { - if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) - { - _logger.info("Using ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } - else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) - { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); - } - else - { - _logger.info("Using SynchronizedDeliveryManager"); - _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); - } - } - else - { - _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager"); - _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); - } + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); } private AMQQueueMBean createMBean() throws AMQException @@ -352,9 +360,9 @@ public class AMQQueue implements Managable, Comparable /** * removes all the messages from the queue. */ - public void clearQueue(StoreContext storeContext) throws AMQException + public long clearQueue(StoreContext storeContext) throws AMQException { - _deliveryMgr.clearAllMessages(storeContext); + return _deliveryMgr.clearAllMessages(storeContext); } public void bind(AMQShortString routingKey, Exchange exchange) @@ -362,14 +370,30 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException - { - registerProtocolSession(ps, channel, consumerTag, acks, filters, false); - } - public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) + public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException { + if(incrementSubscriberCount() > 1) + { + if(isExclusive()) + { + decrementSubscriberCount(); + throw EXISTING_EXCLUSIVE; + } + else if(exclusive) + { + decrementSubscriberCount(); + throw EXISTING_SUBSCRIPTION; + } + + } + else if(exclusive) + { + setExclusive(true); + } + debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); @@ -385,6 +409,28 @@ public class AMQQueue implements Managable, Comparable _subscribers.addSubscriber(subscription); } + + private boolean isExclusive() + { + return _isExclusive.get(); + } + + private void setExclusive(boolean exclusive) + { + _isExclusive.set(exclusive); + } + + private int incrementSubscriberCount() + { + return _subscriberCount.incrementAndGet(); + } + + private int decrementSubscriberCount() + { + return _subscriberCount.decrementAndGet(); + } + + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, @@ -400,6 +446,10 @@ public class AMQQueue implements Managable, Comparable " and protocol session key " + ps.getKey() + " not registered with queue " + this); } + setExclusive(false); + decrementSubscriberCount(); + + // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { @@ -454,6 +504,23 @@ public class AMQQueue implements Managable, Comparable delete(); } + public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException + { + _deliveryMgr.deliver(storeContext, getName(), msg); + try + { + msg.checkDeliveredToConsumer(); + updateReceivedMessageCount(msg); + } + catch (NoConsumersException e) + { + // as this message will be returned, it should be removed + // from the queue: + dequeue(storeContext, msg); + } + } + + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { _deliveryMgr.deliver(storeContext, getName(), msg); @@ -547,4 +614,12 @@ public class AMQQueue implements Managable, Comparable _logger.debug(MessageFormat.format(msg, args)); } } + + public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException + { + return _deliveryMgr.performGet(session, channel, acks); + } + + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java deleted file mode 100644 index 1a44e86f1a..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * Manages delivery of messages on behalf of a queue - */ -public class ConcurrentDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); - - @Configured(path = "advanced.compressBufferOnQueue", - defaultValue = "false") - public boolean compressBufferOnQueue; - - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); - - //private int _messageCount; - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - /** - * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced - * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered - * via the async thread. - * <p/> - * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. - */ - private ReentrantLock _lock = new ReentrantLock(); - - ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - - //Set values from configuration - Configurator.configure(this); - - if (compressBufferOnQueue) - { - _log.info("Compressing Buffers on queue."); - } - - _subscriptions = subscriptions; - _queue = queue; - } - - /** - * @return boolean if we are queueing - */ - private boolean queueing() - { - return hasQueuedMessages(); - } - - /** - * @param msg to enqueue - * @return true if we are queue this message - */ - private boolean enqueue(AMQMessage msg) throws AMQException - { - if (msg.getPublishBody().immediate) - { - return false; - } - else - { - _lock.lock(); - try - { - if (queueing()) - { - return addMessageToQueue(msg); - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - } - - private void startQueueing(AMQMessage msg) throws AMQException - { - if (!msg.getPublishBody().immediate) - { - addMessageToQueue(msg); - } - } - - private boolean addMessageToQueue(AMQMessage msg) - { - // Shrink the ContentBodies to their actual size to save memory. - /* TODO need to reimplement this - probably not in this class though - * for obvious reasons - - if (compressBufferOnQueue) - { - Iterator it = msg.getContentBodies().iterator(); - while (it.hasNext()) - { - ContentBody cb = (ContentBody) it.next(); - cb.reduceBufferToFit(); - } - } - */ - _messages.offer(msg); - - return true; - } - - public boolean hasQueuedMessages() - { - _lock.lock(); - try - { - return !_messages.isEmpty(); - } - finally - { - _lock.unlock(); - } - } - - public int getQueueMessageCount() - { - return getMessageCount(); - } - - /** - * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size. - * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue. - * - * @return int the number of messages in the delivery queue. - */ - private int getMessageCount() - { - return _messages.size(); - } - - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(storeContext, _queue); - } - } - - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(storeContext, _queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() throws AMQException - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - AMQMessage message = peek(); - - //While we have messages to send and subscribers to send them to. - while (message != null && hasSubscribers) - { - // _log.debug("Have messages(" + _messages.size() + ") and subscribers"); - Subscription next = _subscriptions.nextSubscriber(message); - //FIXME Is there still not the chance that this subscribe could be suspended between here and the send? - - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - next.send(message, _queue); - poll(); - message = peek(); - } - else - { - hasSubscribers = false; - } - } - } - catch (FailedDequeueException e) - { - _log.error("Unable to deliver message as dequeue failed: " + e, e); - } - finally - { - _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers()); - } - } - - private AMQMessage peek() - { - return _messages.peek(); - } - - private AMQMessage poll() - { - return _messages.poll(); - } - - Runner asyncDelivery = new Runner(); - - public void processAsync(Executor executor) - { - _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); - - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(asyncDelivery); - } - } - } - } - - public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - // not queueing so deliver message to 'next' subscriber - _lock.lock(); - try - { - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - if (!msg.getPublishBody().immediate) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - } - else - { - s.send(msg, _queue); - } - } - finally - { - _lock.unlock(); - } - } - } - - private class Runner implements Runnable - { - public void run() - { - boolean running = true; - while (running) - { - try - { - processQueue(); - } - catch (AMQException e) - { - _log.error("Error processing queue: " + e, e); - _log.error("Delivery manager terminating."); - running = false; - _processing.set(false); - break; - } - - //Check that messages have not been added since we did our last peek(); - // Synchronize with the thread that adds to the queue. - // If the queue is still empty then we can exit - _lock.lock(); - try - { - if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers())) - { - running = false; - _processing.set(false); - } - } - finally - { - _lock.unlock(); - } - } - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 91c49a4cf9..ba4d0bf4ba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -28,6 +28,8 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.ArrayList; import java.util.Iterator; @@ -52,6 +54,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Holds any queued messages */ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + + private final ReentrantLock _messageAccessLock = new ReentrantLock(); + //private int _messageCount; /** * Ensures that only one asynchronous task is running for this manager at @@ -169,6 +174,56 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException + { + AMQMessage msg = getNextMessage(); + if(msg == null) + { + return false; + } + else + { + + try + { + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. + + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. + + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!acks) + { + if (_log.isDebugEnabled()) + { + _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + _queue.dequeue(channel.getStoreContext(), msg); + } + synchronized(channel) + { + long deliveryTag = channel.getNextDeliveryTag(); + + if (acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue); + } + + msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + } + } + finally + { + msg.setDeliveredToConsumer(); + } + return true; + + } + } + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); @@ -178,22 +233,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException + public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException { + long count = 0; AMQMessage msg = poll(); while (msg != null) { msg.dequeue(storeContext, _queue); + count++; msg = poll(); } + return count; + } + + public synchronized AMQMessage getNextMessage() throws AMQException + { + return getNextMessage(_messages); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) + private AMQMessage getNextMessage(Queue<AMQMessage> messages) + { + return getNextMessage(messages, false); + } + + private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing) { AMQMessage message = messages.peek(); - while (message != null && (sub.isBrowser() || message.taken())) + while (message != null && (browsing || message.taken())) { //remove the already taken message messages.poll(); @@ -208,7 +276,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = null; try { - message = getNextMessage(messageQueue, sub); + message = getNextMessage(messageQueue, sub.isBrowser()); // message will be null if we have no messages in the messageQueue. if (message == null) @@ -287,6 +355,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug(id() + "deliver :" + msg); } + msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index d3d235f07f..6954be8473 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.concurrent.Executor; import java.util.List; @@ -72,9 +74,11 @@ interface DeliveryManager void removeAMessageFromTop(StoreContext storeContext) throws AMQException; - void clearAllMessages(StoreContext storeContext) throws AMQException; + long clearAllMessages(StoreContext storeContext) throws AMQException; List<AMQMessage> getMessages(); void populatePreDeliveryQueue(Subscription subscription); + + boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 2dab551e07..5277069d33 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -45,4 +45,6 @@ public interface Subscription void close(); boolean isBrowser(); + + boolean wouldSuspend(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e2356faaf5..e120752959 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -66,6 +66,7 @@ public class SubscriptionImpl implements Subscription private final boolean _isBrowser; private final Boolean _autoClose; private boolean _closed = false; + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); public static class Factory implements SubscriptionFactory { @@ -300,37 +301,54 @@ public class SubscriptionImpl implements Subscription { if (_noLocal) { + boolean isLocal; // We don't want local messages so check to see if message is one we sent - Object localInstance = protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()); - Object msgInstance = msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString()); + Object localInstance; + Object msgInstance; - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + if((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (_logger.isTraceEnabled()) + if((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; + } } - return false; } - else // if not then filter the message. + else { - if (_logger.isTraceEnabled()) + localInstance = protocolSession.getClientIdentifier(); + msgInstance = msg.getPublisher().getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { - _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) + - ") but not ours so filtering"); + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; } - return checkFilters(msg); + } + + } - else + + + if (_logger.isTraceEnabled()) { - if (_logger.isTraceEnabled()) - { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); - } - return checkFilters(msg); + _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); } + return checkFilters(msg); + } private boolean checkFilters(AMQMessage msg) @@ -393,6 +411,11 @@ public class SubscriptionImpl implements Subscription return _isBrowser; } + public boolean wouldSuspend(AMQMessage msg) + { + return channel.wouldSuspend(msg); + } + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, AMQShortString routingKey, AMQShortString exchange) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 8272202571..e7c90fb201 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -137,7 +137,7 @@ class SubscriptionSet implements WeightedSubscriptionManager ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { if (subscription.hasInterest(msg)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java deleted file mode 100644 index 02fe86a083..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.store.StoreContext; -import org.apache.log4j.Logger; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Manages delivery of messages on behalf of a queue - */ -class SynchronizedDeliveryManager implements DeliveryManager -{ - private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); - - /** - * Holds any queued messages - */ - private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>(); - - /** - * Ensures that only one asynchronous task is running for this manager at - * any time. - */ - private final AtomicBoolean _processing = new AtomicBoolean(); - - /** - * The subscriptions on the queue to whom messages are delivered - */ - private final SubscriptionManager _subscriptions; - - /** - * An indication of the mode we are in. If this is true then messages are - * being queued up in _messages for asynchronous delivery. If it is false - * then messages can be delivered directly as they come in. - */ - private volatile boolean _queueing; - - /** - * A reference to the queue we are delivering messages for. We need this to be able - * to pass the code that handles acknowledgements a handle on the queue. - */ - private final AMQQueue _queue; - - SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) - { - _subscriptions = subscriptions; - _queue = queue; - } - - private synchronized boolean enqueue(AMQMessage msg) throws AMQException - { - if (msg.getPublishBody().immediate) - { - return false; - } - else - { - if (_queueing) - { - _messages.offer(msg); - return true; - } - else - { - return false; - } - } - } - - private synchronized void startQueueing(AMQMessage msg) throws AMQException - { - _queueing = true; - enqueue(msg); - } - - /** - * Determines whether there are queued messages. Sets _queueing to false if - * there are no queued messages. This needs to be atomic. - * - * @return true if there are queued messages - */ - public synchronized boolean hasQueuedMessages() - { - boolean empty = _messages.isEmpty(); - if (empty) - { - _queueing = false; - } - return !empty; - } - - public synchronized int getQueueMessageCount() - { - return _messages.size(); - } - - public synchronized List<AMQMessage> getMessages() - { - return new ArrayList<AMQMessage>(_messages); - } - - public void populatePreDeliveryQueue(Subscription subscription) - { - //no-op . This DM has no PreDeliveryQueues - } - - public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - if (msg != null) - { - msg.dequeue(storeContext, _queue); - } - } - - public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException - { - AMQMessage msg = poll(); - while (msg != null) - { - msg.dequeue(storeContext, _queue); - msg = poll(); - } - } - - /** - * Only one thread should ever execute this method concurrently, but - * it can do so while other threads invoke deliver(). - */ - private void processQueue() - { - try - { - boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); - while (hasQueuedMessages() && hasSubscribers) - { - Subscription next = _subscriptions.nextSubscriber(peek()); - //We don't synchronize access to subscribers so need to re-check - if (next != null) - { - try - { - next.send(poll(), _queue); - } - catch (AMQException e) - { - _log.error("Unable to deliver message: " + e, e); - } - } - else - { - hasSubscribers = false; - } - } - } - finally - { - _processing.set(false); - } - } - - private synchronized AMQMessage peek() - { - return _messages.peek(); - } - - private synchronized AMQMessage poll() - { - return _messages.poll(); - } - - /** - * Requests that the delivery manager start processing the queue asynchronously - * if there is work that can be done (i.e. there are messages queued up and - * subscribers that can receive them. - * <p/> - * This should be called when subscribers are added, but only after the consume-ok - * message has been returned as message delivery may start immediately. It should also - * be called after unsuspending a client. - * <p/> - * - * @param executor the executor on which the delivery should take place - */ - public void processAsync(Executor executor) - { - if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) - { - //are we already running? if so, don't re-run - if (_processing.compareAndSet(false, true)) - { - // Do we need this? - // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok. - //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown()) - { - executor.execute(new Runner()); - } - } - } - } - - /** - * Handles message delivery. The delivery manager is always in one of two modes; - * it is either queueing messages for asynchronous delivery or delivering - * directly. - * - * @param name the name of the entity on whose behalf we are delivering the message - * @param msg the message to deliver - * @throws NoConsumersException if there are no active subscribers to deliver - * the message to - */ - public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException - { - // first check whether we are queueing, and enqueue if we are - if (!enqueue(msg)) - { - synchronized(this) - { - // not queueing so deliver message to 'next' subscriber - Subscription s = _subscriptions.nextSubscriber(msg); - if (s == null) - { - // no subscribers yet so enter 'queueing' mode and queue this message - startQueueing(msg); - } - else - { - s.send(msg, _queue); - } - } - } - - } - - private class Runner implements Runnable - { - public void run() - { - processQueue(); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 70e530699e..81ce704026 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; @@ -28,6 +30,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.AMQChannel; import org.apache.log4j.Logger; import java.util.HashMap; @@ -118,12 +121,14 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); + frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance()); frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance()); frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance()); frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance()); frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance()); frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance()); + frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance()); frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance()); frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance()); @@ -168,12 +173,26 @@ public class AMQStateManager implements AMQMethodListener StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { + + checkChannel(evt, _protocolSession); + handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); return true; } return false; } + private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession) + throws AMQException + { + if(evt.getChannelId() != 0 + && !(evt.getMethod() instanceof ChannelOpenBody) + && protocolSession.getChannel(evt.getChannelId()) == null) + { + throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId()); + } + } + protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState, B frame) throws IllegalStateTransitionException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index b3ae54f982..f038f1fdea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -269,14 +269,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { + byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); + Destination dest = AMQDestination.createDestination(url); + jmsMsg.setJMSDestination(dest); + if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); - byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); - Destination dest = AMQDestination.createDestination(url); - jmsMsg.setJMSDestination(dest); - } + _session.setInRecovery(false); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index ef5239fc87..364aea81c0 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -134,11 +134,12 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } - public void sendNextMessage(AMQQueue queue) + public boolean wouldSuspend(AMQMessage msg) { - + return _suspended; } + } diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java index 477123a4ec..c6a874bcf3 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java @@ -32,6 +32,7 @@ public class AMQConnectionException extends AMQException /* AMQP version for which exception ocurred */ private final byte major; private final byte minor; + boolean _closeConnetion; public AMQConnectionException(int errorCode, String msg, int classId, int methodId, byte major, byte minor, Throwable t) { @@ -51,9 +52,12 @@ public class AMQConnectionException extends AMQException this.minor = minor; } + + public AMQFrame getCloseFrame(int channel) { return ConnectionCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), new AMQShortString(getMessage())); } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 5ccc900b2c..cd178a6197 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -22,6 +22,7 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; public abstract class AMQMethodBody extends AMQBody { @@ -101,4 +102,17 @@ public abstract class AMQMethodBody extends AMQBody { return new AMQChannelException(code, message, getClazz(), getMethod(), major, minor, cause); } + + public AMQConnectionException getConnectionException(int code, String message) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor); + } + + + + public AMQConnectionException getConnectionException(int code, String message, Throwable cause) + { + return new AMQConnectionException(code, message, getClazz(), getMethod(), major, minor, cause); + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 91a26632a1..64492e3d67 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -78,7 +78,7 @@ public class AMQQueueMBeanTest extends TestCase _protocolSession = new MockProtocolSession(_messageStore); _protocolSession.addChannel(_channel); - _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null); + _queue.registerProtocolSession(_protocolSession, 1, new AMQShortString("test"), false, null,false,false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index e428b9ef60..f090f431c3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -53,7 +53,7 @@ public class ConcurrencyTest extends MessageTestHelper public ConcurrencyTest() throws Exception { - _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, new DefaultQueueRegistry())); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java deleted file mode 100644 index 1943532a51..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; - -public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest -{ - public ConcurrentDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","true"); - _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ConcurrentDeliveryManagerTest.class); - } -} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index d88614298f..e1be640c8e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -172,8 +172,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper public static junit.framework.Test suite() { TestSuite suite = new TestSuite(); - suite.addTestSuite(ConcurrentDeliveryManagerTest.class); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); return suite; } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 3586749f53..1fb2a1024f 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -132,4 +132,9 @@ public class MockProtocolSession implements AMQProtocolSession public void setClientProperties(FieldTable clientProperties) { } + + public Object getClientIdentifier() + { + return null; + } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index fea3c93280..b3574ecba4 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -67,6 +67,12 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } + public boolean wouldSuspend(AMQMessage msg) + { + return isSuspended; + } + + public void queueDeleted(AMQQueue queue) { } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java deleted file mode 100644 index 3c5aab0911..0000000000 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.queue.SynchronizedDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; - -import junit.framework.TestSuite; - -public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest -{ - public SynchronizedDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","false"); - _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - TestSuite suite = new TestSuite(); - suite.addTestSuite(SynchronizedDeliveryManagerTest.class); - return suite; - } -} |