summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-12 01:23:43 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-12 01:23:43 +0000
commita13a516a53557635bea11adbd854deb846b62eff (patch)
tree6f83c0ba0afc69735f3aeb4780a228d7b31c37bd
parent1ce0f5dabf4f811d388ab80652ed70ea6bebcbd3 (diff)
downloadqpid-python-a13a516a53557635bea11adbd854deb846b62eff.tar.gz
QPID-276
Update to AMQChannel to remove race condition over UnacknowledgedMessageMap git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495460 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java1
4 files changed, 29 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 470789bb1a..0497d4bb8f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -275,7 +275,7 @@ public class AMQChannel
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -326,20 +326,24 @@ public class AMQChannel
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message the message that was delivered
+ * @param message the message that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
- * the delivery tag)
- * @param queue the queue from which the message was delivered
+ * the delivery tag)
+ * @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
{
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
- checkSuspension();
+ synchronized (_unacknowledgedMessageMap.getLock())
+ {
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ checkSuspension();
+ }
}
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
* May result in delivery to this same channel or to other subscribers.
+ *
* @throws org.apache.qpid.AMQException if the requeue fails
*/
public void requeue() throws AMQException
@@ -427,8 +431,11 @@ public class AMQChannel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
- checkSuspension();
+ synchronized (_unacknowledgedMessageMap.getLock())
+ {
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+ checkSuspension();
+ }
}
/**
@@ -450,6 +457,7 @@ public class AMQChannel
private void checkSuspension()
{
boolean suspend;
+
suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
setSuspended(suspend);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index b7a75d5b71..ef58ba01a3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -43,6 +43,8 @@ public interface UnacknowledgedMessageMap
void visit(Visitor visitor) throws AMQException;
+ Object getLock();
+
void add(long deliveryTag, UnacknowledgedMessage message);
void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
@@ -67,6 +69,7 @@ public interface UnacknowledgedMessageMap
/**
* Get the set of delivery tags that are outstanding.
+ *
* @return a set of delivery tags
*/
Set<Long> getDeliveryTags();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 0677494134..a21e4cfff6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -75,7 +75,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- for(UnacknowledgedMessage msg : msgs)
+ for (UnacknowledgedMessage msg : msgs)
{
_map.remove(msg.deliveryTag);
}
@@ -95,7 +95,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
Collection<UnacknowledgedMessage> currentEntries = _map.values();
- for (UnacknowledgedMessage msg: currentEntries)
+ for (UnacknowledgedMessage msg : currentEntries)
{
visitor.callback(msg);
}
@@ -103,9 +103,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
+ public Object getLock()
+ {
+ return _lock;
+ }
+
public void add(long deliveryTag, UnacknowledgedMessage message)
{
- synchronized( _lock)
+ synchronized (_lock)
{
_map.put(deliveryTag, message);
_lastDeliveryTag = deliveryTag;
@@ -209,7 +214,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
synchronized (_lock)
{
- for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
{
msgs.add(entry.getValue());
if (entry.getKey() == key)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index f4f443b162..10e23caac3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -134,6 +134,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
{
_logger.info("Protocol Session closed");
final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+ //fixme -- this can be null
amqProtocolSession.closeSession();
}