diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 14:15:47 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 14:15:47 +0000 |
commit | bca9448fbbe9b7009ebad15589c57edb62fa1e24 (patch) | |
tree | 5a8dcf850cd52cb2c40959089c44c687309d39ef | |
parent | 283aaa337b4ab30cd59c321ef6abe20cb1dd53a5 (diff) | |
download | qpid-python-bca9448fbbe9b7009ebad15589c57edb62fa1e24.tar.gz |
implemented some toString() and added better checking for Channel/Connection closure in the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829575 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 83 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 52059836b7..ea5d0fdc9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -393,6 +393,7 @@ public class AMQChannel */ public void close() throws AMQException { + setClosing(true); unsubscribeAllConsumers(); _transaction.rollback(); @@ -406,7 +407,6 @@ public class AMQChannel _logger.error("Caught AMQException whilst attempting to reque:" + e); } - setClosing(true); } private void setClosing(boolean closing) @@ -792,7 +792,7 @@ public class AMQChannel public boolean isSuspended() { - return _suspended.get(); + return _suspended.get() || _closing || _session.isClosing(); } public void commit() throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 1d0ca19e91..04b3e75f8c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.atomic.AtomicBoolean; import javax.management.JMException; import javax.security.sasl.SaslServer; @@ -124,7 +126,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private Object _lastSent; - protected boolean _closed; + protected volatile boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; @@ -158,6 +160,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private long _maxFrameSize; + private final AtomicBoolean _closing = new AtomicBoolean(false); public ManagedObject getManagedObject() { @@ -204,6 +207,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return _maxFrameSize; } + public boolean isClosing() + { + return _closing.get(); + } + public void received(final ByteBuffer msg) { _lastIoTime = System.currentTimeMillis(); @@ -679,34 +687,58 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() throws AMQException { - // REMOVE THIS SHOULD NOT BE HERE. - if (CurrentActor.get() == null) - { - CurrentActor.set(_actor); - } - if (!_closed) + if(_closing.compareAndSet(false,true)) { - if (_virtualHost != null) + // REMOVE THIS SHOULD NOT BE HERE. + if (CurrentActor.get() == null) { - _virtualHost.getConnectionRegistry().deregisterConnection(this); + CurrentActor.set(_actor); } - - closeAllChannels(); - if (_managedObject != null) + if (!_closed) { - _managedObject.unregister(); - // Ensure we only do this once. - _managedObject = null; - } + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } - for (Task task : _taskList) - { - task.doTask(this); + closeAllChannels(); + if (_managedObject != null) + { + _managedObject.unregister(); + // Ensure we only do this once. + _managedObject = null; + } + + for (Task task : _taskList) + { + task.doTask(this); + } + + synchronized(this) + { + _closed = true; + notifyAll(); + } + _poolReference.releaseExecutorService(); + CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); } + } + else + { + synchronized(this) + { + while(!_closed) + { + try + { + wait(1000); + } + catch (InterruptedException e) + { - _closed = true; - _poolReference.releaseExecutorService(); - CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); + } + } + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 54b38a7f26..48dd16a98c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -47,6 +47,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin long getMaxFrameSize(); + boolean isClosing(); + public static final class ProtocolSessionIdentifier { private final Object _sessionIdentifier; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 8712a1eaa5..77a6fb9328 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1890,4 +1890,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _resourceName; } + + + @Override + public String toString() + { + return String.valueOf(getName()); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index e4e5ba26b4..c321fdf3e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -599,4 +599,10 @@ public class VirtualHostImpl implements Accessable, VirtualHost } } } + + @Override + public String toString() + { + return _name; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index cd049c24a1..8910920017 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -102,7 +102,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -156,7 +156,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public SaslClient getSaslClient() { - return _saslClient; + return _saslClient; } /** @@ -192,7 +192,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * @throws AMQException if this was not expected */ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException - { + { if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { _channelId2UnprocessedMsgArray[channelId] = message; @@ -468,4 +468,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // No-op, interface munging } + + + @Override + public String toString() + { + return "AMQProtocolSession[" + _connection + ']'; + } } diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile index fb339c6823..2887501a9b 100644 --- a/qpid/java/test-profiles/java-derby.testprofile +++ b/qpid/java/test-profiles/java-derby.testprofile @@ -4,5 +4,5 @@ broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work broker.ready=BRK-1004 broker.stopped=Exception broker.config=${project.root}/build/etc/config-systests-derby.xml - +qpid.amqp.version=0-9 profile.excludes=08StandaloneExcludes diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile index a9e067e143..8dd835a335 100644 --- a/qpid/java/test-profiles/java.testprofile +++ b/qpid/java/test-profiles/java.testprofile @@ -3,5 +3,5 @@ broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @ broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB broker.ready=BRK-1004 broker.stopped=Exception - +qpid.amqp.version=0-9 profile.excludes=08TransientExcludes 08StandaloneExcludes |