diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-12 01:23:43 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-12 01:23:43 +0000 |
commit | a13a516a53557635bea11adbd854deb846b62eff (patch) | |
tree | 6f83c0ba0afc69735f3aeb4780a228d7b31c37bd | |
parent | 1ce0f5dabf4f811d388ab80652ed70ea6bebcbd3 (diff) | |
download | qpid-python-a13a516a53557635bea11adbd854deb846b62eff.tar.gz |
QPID-276
Update to AMQChannel to remove race condition over UnacknowledgedMessageMap
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495460 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 29 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 470789bb1a..0497d4bb8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -275,7 +275,7 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -326,20 +326,24 @@ public class AMQChannel /** * Add a message to the channel-based list of unacknowledged messages * - * @param message the message that was delivered + * @param message the message that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of - * the delivery tag) - * @param queue the queue from which the message was delivered + * the delivery tag) + * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); - checkSuspension(); + synchronized (_unacknowledgedMessageMap.getLock()) + { + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); + checkSuspension(); + } } /** * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. * May result in delivery to this same channel or to other subscribers. + * * @throws org.apache.qpid.AMQException if the requeue fails */ public void requeue() throws AMQException @@ -427,8 +431,11 @@ public class AMQChannel */ public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { - _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); - checkSuspension(); + synchronized (_unacknowledgedMessageMap.getLock()) + { + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); + checkSuspension(); + } } /** @@ -450,6 +457,7 @@ public class AMQChannel private void checkSuspension() { boolean suspend; + suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; setSuspended(suspend); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index b7a75d5b71..ef58ba01a3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -43,6 +43,8 @@ public interface UnacknowledgedMessageMap void visit(Visitor visitor) throws AMQException; + Object getLock(); + void add(long deliveryTag, UnacknowledgedMessage message); void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs); @@ -67,6 +69,7 @@ public interface UnacknowledgedMessageMap /** * Get the set of delivery tags that are outstanding. + * * @return a set of delivery tags */ Set<Long> getDeliveryTags(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 0677494134..a21e4cfff6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -75,7 +75,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - for(UnacknowledgedMessage msg : msgs) + for (UnacknowledgedMessage msg : msgs) { _map.remove(msg.deliveryTag); } @@ -95,7 +95,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap synchronized (_lock) { Collection<UnacknowledgedMessage> currentEntries = _map.values(); - for (UnacknowledgedMessage msg: currentEntries) + for (UnacknowledgedMessage msg : currentEntries) { visitor.callback(msg); } @@ -103,9 +103,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } + public Object getLock() + { + return _lock; + } + public void add(long deliveryTag, UnacknowledgedMessage message) { - synchronized( _lock) + synchronized (_lock) { _map.put(deliveryTag, message); _lastDeliveryTag = deliveryTag; @@ -209,7 +214,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap { synchronized (_lock) { - for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) + for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) { msgs.add(entry.getValue()); if (entry.getKey() == key) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index f4f443b162..10e23caac3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -134,6 +134,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco { _logger.info("Protocol Session closed"); final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); + //fixme -- this can be null amqProtocolSession.closeSession(); } |