summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-04-09 14:16:26 +0000
committerRobert Gemmell <robbie@apache.org>2010-04-09 14:16:26 +0000
commita485a09a10e0ce6b7b8705cab9cd2322f80ad748 (patch)
treec0e705a6f0676870c562c77e26b45a02e4c0e1e5
parent2a6a181b22220724b6a3a68d898f5260e1e884b1 (diff)
downloadqpid-python-a485a09a10e0ce6b7b8705cab9cd2322f80ad748.tar.gz
QPID-2379: add Session.close() method implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932429 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java5
6 files changed, 78 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index c32fcfd73a..4491f3f7d5 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -21,6 +21,7 @@
package org.apache.qpid.qmf;
+import org.apache.qpid.AMQException;
import org.apache.qpid.qmf.schema.BrokerSchema;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -1336,8 +1337,17 @@ public class QMFService implements ConfigStore.ConfigEventListener
public BrokerSchema.SessionClass.CloseMethodResponseCommand close(final BrokerSchema.SessionClass.CloseMethodResponseCommandFactory factory)
{
- //todo
- throw new UnsupportedOperationException();
+ try
+ {
+ _obj.mgmtClose();
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return factory.createResponseCommand();
}
public UUID getId()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index fe5da20fa5..454b731e5f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -141,7 +141,7 @@ public class AMQChannel implements SessionConfig
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- private boolean _closing;
+ private AtomicBoolean _closing = new AtomicBoolean(false);
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
@@ -480,7 +480,13 @@ public class AMQChannel implements SessionConfig
*/
public void close() throws AMQException
{
- setClosing(true);
+ if(!_closing.compareAndSet(false, true))
+ {
+ //Channel is already closing
+ return;
+ }
+
+ CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE());
unsubscribeAllConsumers();
_transaction.rollback();
@@ -498,13 +504,6 @@ public class AMQChannel implements SessionConfig
}
- private void setClosing(boolean closing)
- {
- _closing = closing;
-
- CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE());
- }
-
private void unsubscribeAllConsumers() throws AMQException
{
if (_logger.isInfoEnabled())
@@ -881,7 +880,7 @@ public class AMQChannel implements SessionConfig
public boolean isSuspended()
{
- return _suspended.get() || _closing || _session.isClosing();
+ return _suspended.get() || _closing.get() || _session.isClosing();
}
public void commit() throws AMQException
@@ -981,7 +980,7 @@ public class AMQChannel implements SessionConfig
public boolean isClosing()
{
- return _closing;
+ return _closing.get();
}
public AMQProtocolSession getProtocolSession()
@@ -1377,4 +1376,9 @@ public class AMQChannel implements SessionConfig
{
return _createTime;
}
+
+ public void mgmtClose() throws AMQException
+ {
+ _session.mgmtCloseChannel(_channelId);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
index 5e5dd10e57..8fef642eff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.configuration;
+import org.apache.qpid.AMQException;
+
public interface SessionConfig extends ConfiguredObject<SessionConfigType, SessionConfig>
{
VirtualHostConfig getVirtualHost();
@@ -48,4 +50,6 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi
Long getTxnCount();
boolean isTransactional();
+
+ void mgmtClose() throws AMQException;
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 01ea7e5bb1..3206d4fce7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -1190,4 +1190,45 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
}
+ public void mgmtCloseChannel(int channelId)
+ {
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ AMQConstant.REPLY_SUCCESS.getCode(),
+ new AMQShortString("The channel was closed using the broker's management interface."),
+ 0,0);
+
+ // This seems ugly but because we use AMQChannel.close() in both normal
+ // broker operation and as part of the management interface it cannot
+ // be avoided. The Current Actor will be null when this method is
+ // called via the QMF management interface. As such we need to set one.
+ boolean removeActor = false;
+ if (CurrentActor.get() == null)
+ {
+ removeActor = true;
+ CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
+ }
+
+ try
+ {
+ writeFrame(responseBody.generateFrame(channelId));
+
+ try
+ {
+ closeChannel(channelId);
+ }
+ catch (AMQException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ finally
+ {
+ if (removeActor)
+ {
+ CurrentActor.remove();
+ }
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 48dd16a98c..5ed270c80d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -233,4 +233,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
void closeIfLingeringClosedChannels();
+ void mgmtCloseChannel(int channelId);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index b5d5d7bba9..2195cc4154 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -547,4 +547,9 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
{
return _createTime;
}
+
+ public void mgmtClose()
+ {
+ close();
+ }
}