diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-07-25 12:40:24 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-07-25 12:40:24 +0000 |
commit | 593d2628c0378594d2e51d1c5149ffeeea660d62 (patch) | |
tree | 73f2075adeeaa7ef2770c75f0790042840459ebf | |
parent | 6d6b33149a156052d6d768ec839b60f3afb62c9e (diff) | |
download | qpid-python-593d2628c0378594d2e51d1c5149ffeeea660d62.tar.gz |
AMQMessage - added //todo-s and removed unused parameter StoreContext from expired() method call.
ConcurrentSelectorDeliveryManager - Update to reflect expired() call change. Created new _reaperContextStore to be used when performing reaper operations such as message dequeue due to expiration. Removed old commented code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@559427 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 32 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 29d6c26b66..bb9eeed1d4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.HeartbeatConfig; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; @@ -72,7 +73,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< SaslServer ss = null; try - { + { ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN()); if (ss == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index ac43cb7e2f..476e608b01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -204,6 +204,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter if (message instanceof AMQDataBlock) { amqProtocolSession.dataBlockReceived((AMQDataBlock) message); + } else if (message instanceof ByteBuffer) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 695c2611cb..afa581f0c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -81,12 +81,17 @@ public class AMQMessage // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); + //todo: this should be part of a messageOnQueue object private Set<Subscription> _rejectedBy = null; + //todo: this should be part of a messageOnQueue object private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); + //todo: this should be part of a messageOnQueue object private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); private final int hashcode = System.identityHashCode(this); + + //todo: this should be part of a messageOnQueue object private long _expiration; public String debugIdentity() @@ -652,14 +657,13 @@ public class AMQMessage /** * Checks to see if the message has expired. If it has the message is dequeued. * - * @param storecontext - * @param queue + * @param queue The queue to check the expiration against. (Currently not used) * * @return true if the message has expire * * @throws AMQException */ - public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException + public boolean expired(AMQQueue queue) throws AMQException { // note: If the storecontext isn't need then we can remove the getChannel() from Subscription. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1cdee20598..907d68b733 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -87,6 +87,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private final Object _queueHeadLock = new Object(); private String _processingThreadName = ""; + + /** Used by any reaping thread to purge messages */ + private StoreContext _reapingStoreContext = new StoreContext(); + ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -463,17 +467,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager assert removed == message; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(sub.getChannel().getStoreContext(), _queue)) + if (message.expired(_queue)) { _totalMessageSize.addAndGet(-message.getSize()); - message.dequeue(sub.getChannel().getStoreContext(), _queue); + // Use the reapingStoreContext as any sub(if we have one) may be in a tx. + message.dequeue(_reapingStoreContext, _queue); if (_log.isInfoEnabled()) { _log.info(debugIdentity() + " Doing clean up of the main _message queue."); } } + //else the clean up is not required as the message has already been taken for this queue therefore // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated. @@ -513,15 +519,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // if the message is null then don't purge as we have no messagse. if (message != null) { + // Check that the message hasn't expired. + if (message.expired(_queue)) + { + return true; + } + // if we have a subscriber perform message checks if (sub != null) { - // Check that the message hasn't expired. - if (message.expired(sub.getChannel().getStoreContext(), _queue)) - { - return true; - } - // if we have a queue browser(we don't purge) so check mark the message as taken purge = ((!sub.isBrowser() || message.isTaken(_queue))); } @@ -640,7 +646,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } catch (AMQException e) { - message.release(_queue); + if (message != null) + { + message.release(_queue); + } + else + { + _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e); + } _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -719,25 +732,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } -// private void sendNextMessage(Subscription sub) -// { -// if (sub.filtersMessages()) -// { -// sendNextMessage(sub, sub.getPreDeliveryQueue()); -// if (sub.isAutoClose()) -// { -// if (sub.getPreDeliveryQueue().isEmpty()) -// { -// sub.close(); -// } -// } -// } -// else -// { -// sendNextMessage(sub, _messages); -// } -// } - public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException { @@ -746,8 +740,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); } - // This shouldn't be done here. -// msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); |