summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 14:15:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 14:15:47 +0000
commitbca9448fbbe9b7009ebad15589c57edb62fa1e24 (patch)
tree5a8dcf850cd52cb2c40959089c44c687309d39ef
parent283aaa337b4ab30cd59c321ef6abe20cb1dd53a5 (diff)
downloadqpid-python-bca9448fbbe9b7009ebad15589c57edb62fa1e24.tar.gz
implemented some toString() and added better checking for Channel/Connection closure in the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829575 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java76
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java13
-rw-r--r--qpid/java/test-profiles/java-derby.testprofile2
-rw-r--r--qpid/java/test-profiles/java.testprofile2
8 files changed, 83 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 52059836b7..ea5d0fdc9b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -393,6 +393,7 @@ public class AMQChannel
*/
public void close() throws AMQException
{
+ setClosing(true);
unsubscribeAllConsumers();
_transaction.rollback();
@@ -406,7 +407,6 @@ public class AMQChannel
_logger.error("Caught AMQException whilst attempting to reque:" + e);
}
- setClosing(true);
}
private void setClosing(boolean closing)
@@ -792,7 +792,7 @@ public class AMQChannel
public boolean isSuspended()
{
- return _suspended.get();
+ return _suspended.get() || _closing || _session.isClosing();
}
public void commit() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1d0ca19e91..04b3e75f8c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -124,7 +126,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private Object _lastSent;
- protected boolean _closed;
+ protected volatile boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
@@ -158,6 +160,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
+ private final AtomicBoolean _closing = new AtomicBoolean(false);
public ManagedObject getManagedObject()
{
@@ -204,6 +207,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return _maxFrameSize;
}
+ public boolean isClosing()
+ {
+ return _closing.get();
+ }
+
public void received(final ByteBuffer msg)
{
_lastIoTime = System.currentTimeMillis();
@@ -679,34 +687,58 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
- // REMOVE THIS SHOULD NOT BE HERE.
- if (CurrentActor.get() == null)
- {
- CurrentActor.set(_actor);
- }
- if (!_closed)
+ if(_closing.compareAndSet(false,true))
{
- if (_virtualHost != null)
+ // REMOVE THIS SHOULD NOT BE HERE.
+ if (CurrentActor.get() == null)
{
- _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ CurrentActor.set(_actor);
}
-
- closeAllChannels();
- if (_managedObject != null)
+ if (!_closed)
{
- _managedObject.unregister();
- // Ensure we only do this once.
- _managedObject = null;
- }
+ if (_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
- for (Task task : _taskList)
- {
- task.doTask(this);
+ closeAllChannels();
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ // Ensure we only do this once.
+ _managedObject = null;
+ }
+
+ for (Task task : _taskList)
+ {
+ task.doTask(this);
+ }
+
+ synchronized(this)
+ {
+ _closed = true;
+ notifyAll();
+ }
+ _poolReference.releaseExecutorService();
+ CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
}
+ }
+ else
+ {
+ synchronized(this)
+ {
+ while(!_closed)
+ {
+ try
+ {
+ wait(1000);
+ }
+ catch (InterruptedException e)
+ {
- _closed = true;
- _poolReference.releaseExecutorService();
- CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
+ }
+ }
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 54b38a7f26..48dd16a98c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -47,6 +47,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
long getMaxFrameSize();
+ boolean isClosing();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 8712a1eaa5..77a6fb9328 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1890,4 +1890,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _resourceName;
}
+
+
+ @Override
+ public String toString()
+ {
+ return String.valueOf(getName());
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index e4e5ba26b4..c321fdf3e0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -599,4 +599,10 @@ public class VirtualHostImpl implements Accessable, VirtualHost
}
}
}
+
+ @Override
+ public String toString()
+ {
+ return _name;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index cd049c24a1..8910920017 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -102,7 +102,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -156,7 +156,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public SaslClient getSaslClient()
{
- return _saslClient;
+ return _saslClient;
}
/**
@@ -192,7 +192,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* @throws AMQException if this was not expected
*/
public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
- {
+ {
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = message;
@@ -468,4 +468,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// No-op, interface munging
}
+
+
+ @Override
+ public String toString()
+ {
+ return "AMQProtocolSession[" + _connection + ']';
+ }
}
diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile
index fb339c6823..2887501a9b 100644
--- a/qpid/java/test-profiles/java-derby.testprofile
+++ b/qpid/java/test-profiles/java-derby.testprofile
@@ -4,5 +4,5 @@ broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=${project.root}/build/etc/config-systests-derby.xml
-
+qpid.amqp.version=0-9
profile.excludes=08StandaloneExcludes
diff --git a/qpid/java/test-profiles/java.testprofile b/qpid/java/test-profiles/java.testprofile
index a9e067e143..8dd835a335 100644
--- a/qpid/java/test-profiles/java.testprofile
+++ b/qpid/java/test-profiles/java.testprofile
@@ -3,5 +3,5 @@ broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @
broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB
broker.ready=BRK-1004
broker.stopped=Exception
-
+qpid.amqp.version=0-9
profile.excludes=08TransientExcludes 08StandaloneExcludes