summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-25 12:40:24 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-25 12:40:24 +0000
commit593d2628c0378594d2e51d1c5149ffeeea660d62 (patch)
tree73f2075adeeaa7ef2770c75f0790042840459ebf
parent6d6b33149a156052d6d768ec839b60f3afb62c9e (diff)
downloadqpid-python-593d2628c0378594d2e51d1c5149ffeeea660d62.tar.gz
AMQMessage - added //todo-s and removed unused parameter StoreContext from expired() method call.
ConcurrentSelectorDeliveryManager - Update to reflect expired() call change. Created new _reaperContextStore to be used when performing reaper operations such as message dequeue due to expiration. Removed old commented code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@559427 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java52
4 files changed, 32 insertions, 34 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 29d6c26b66..bb9eeed1d4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -72,7 +73,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
SaslServer ss = null;
try
- {
+ {
ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
if (ss == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index ac43cb7e2f..476e608b01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -204,6 +204,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
if (message instanceof AMQDataBlock)
{
amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+
}
else if (message instanceof ByteBuffer)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 695c2611cb..afa581f0c5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -81,12 +81,17 @@ public class AMQMessage
// private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
+ //todo: this should be part of a messageOnQueue object
private Set<Subscription> _rejectedBy = null;
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
private final int hashcode = System.identityHashCode(this);
+
+ //todo: this should be part of a messageOnQueue object
private long _expiration;
public String debugIdentity()
@@ -652,14 +657,13 @@ public class AMQMessage
/**
* Checks to see if the message has expired. If it has the message is dequeued.
*
- * @param storecontext
- * @param queue
+ * @param queue The queue to check the expiration against. (Currently not used)
*
* @return true if the message has expire
*
* @throws AMQException
*/
- public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ public boolean expired(AMQQueue queue) throws AMQException
{
// note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 1cdee20598..907d68b733 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -87,6 +87,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
+
+ /** Used by any reaping thread to purge messages */
+ private StoreContext _reapingStoreContext = new StoreContext();
+
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -463,17 +467,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
assert removed == message;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ if (message.expired(_queue))
{
_totalMessageSize.addAndGet(-message.getSize());
- message.dequeue(sub.getChannel().getStoreContext(), _queue);
+ // Use the reapingStoreContext as any sub(if we have one) may be in a tx.
+ message.dequeue(_reapingStoreContext, _queue);
if (_log.isInfoEnabled())
{
_log.info(debugIdentity() + " Doing clean up of the main _message queue.");
}
}
+
//else the clean up is not required as the message has already been taken for this queue therefore
// it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
@@ -513,15 +519,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// if the message is null then don't purge as we have no messagse.
if (message != null)
{
+ // Check that the message hasn't expired.
+ if (message.expired(_queue))
+ {
+ return true;
+ }
+
// if we have a subscriber perform message checks
if (sub != null)
{
- // Check that the message hasn't expired.
- if (message.expired(sub.getChannel().getStoreContext(), _queue))
- {
- return true;
- }
-
// if we have a queue browser(we don't purge) so check mark the message as taken
purge = ((!sub.isBrowser() || message.isTaken(_queue)));
}
@@ -640,7 +646,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
catch (AMQException e)
{
- message.release(_queue);
+ if (message != null)
+ {
+ message.release(_queue);
+ }
+ else
+ {
+ _log.error(debugIdentity() + "Unable to release message as it is null. " + e, e);
+ }
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -719,25 +732,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
-// private void sendNextMessage(Subscription sub)
-// {
-// if (sub.filtersMessages())
-// {
-// sendNextMessage(sub, sub.getPreDeliveryQueue());
-// if (sub.isAutoClose())
-// {
-// if (sub.getPreDeliveryQueue().isEmpty())
-// {
-// sub.close();
-// }
-// }
-// }
-// else
-// {
-// sendNextMessage(sub, _messages);
-// }
-// }
-
public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
@@ -746,8 +740,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
}
- // This shouldn't be done here.
-// msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();