diff options
author | Robert Gemmell <robbie@apache.org> | 2010-04-09 14:16:26 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-04-09 14:16:26 +0000 |
commit | a485a09a10e0ce6b7b8705cab9cd2322f80ad748 (patch) | |
tree | c0e705a6f0676870c562c77e26b45a02e4c0e1e5 | |
parent | 2a6a181b22220724b6a3a68d898f5260e1e884b1 (diff) | |
download | qpid-python-a485a09a10e0ce6b7b8705cab9cd2322f80ad748.tar.gz |
QPID-2379: add Session.close() method implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932429 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 78 insertions, 13 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index c32fcfd73a..4491f3f7d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -21,6 +21,7 @@ package org.apache.qpid.qmf; +import org.apache.qpid.AMQException; import org.apache.qpid.qmf.schema.BrokerSchema; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -1336,8 +1337,17 @@ public class QMFService implements ConfigStore.ConfigEventListener public BrokerSchema.SessionClass.CloseMethodResponseCommand close(final BrokerSchema.SessionClass.CloseMethodResponseCommandFactory factory) { - //todo - throw new UnsupportedOperationException(); + try + { + _obj.mgmtClose(); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return factory.createResponseCommand(); } public UUID getId() diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fe5da20fa5..454b731e5f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -141,7 +141,7 @@ public class AMQChannel implements SessionConfig // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - private boolean _closing; + private AtomicBoolean _closing = new AtomicBoolean(false); private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); @@ -480,7 +480,13 @@ public class AMQChannel implements SessionConfig */ public void close() throws AMQException { - setClosing(true); + if(!_closing.compareAndSet(false, true)) + { + //Channel is already closing + return; + } + + CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE()); unsubscribeAllConsumers(); _transaction.rollback(); @@ -498,13 +504,6 @@ public class AMQChannel implements SessionConfig } - private void setClosing(boolean closing) - { - _closing = closing; - - CurrentActor.get().message(_logSubject, ChannelMessages.CHN_CLOSE()); - } - private void unsubscribeAllConsumers() throws AMQException { if (_logger.isInfoEnabled()) @@ -881,7 +880,7 @@ public class AMQChannel implements SessionConfig public boolean isSuspended() { - return _suspended.get() || _closing || _session.isClosing(); + return _suspended.get() || _closing.get() || _session.isClosing(); } public void commit() throws AMQException @@ -981,7 +980,7 @@ public class AMQChannel implements SessionConfig public boolean isClosing() { - return _closing; + return _closing.get(); } public AMQProtocolSession getProtocolSession() @@ -1377,4 +1376,9 @@ public class AMQChannel implements SessionConfig { return _createTime; } + + public void mgmtClose() throws AMQException + { + _session.mgmtCloseChannel(_channelId); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java index 5e5dd10e57..8fef642eff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SessionConfig.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.configuration; +import org.apache.qpid.AMQException; + public interface SessionConfig extends ConfiguredObject<SessionConfigType, SessionConfig> { VirtualHostConfig getVirtualHost(); @@ -48,4 +50,6 @@ public interface SessionConfig extends ConfiguredObject<SessionConfigType, Sessi Long getTxnCount(); boolean isTransactional(); + + void mgmtClose() throws AMQException; }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 01ea7e5bb1..3206d4fce7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1190,4 +1190,45 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } } + public void mgmtCloseChannel(int channelId) + { + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("The channel was closed using the broker's management interface."), + 0,0); + + // This seems ugly but because we use AMQChannel.close() in both normal + // broker operation and as part of the management interface it cannot + // be avoided. The Current Actor will be null when this method is + // called via the QMF management interface. As such we need to set one. + boolean removeActor = false; + if (CurrentActor.get() == null) + { + removeActor = true; + CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger())); + } + + try + { + writeFrame(responseBody.generateFrame(channelId)); + + try + { + closeChannel(channelId); + } + catch (AMQException ex) + { + throw new RuntimeException(ex); + } + } + finally + { + if (removeActor) + { + CurrentActor.remove(); + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 48dd16a98c..5ed270c80d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -233,4 +233,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin void closeIfLingeringClosedChannels(); + void mgmtCloseChannel(int channelId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index b5d5d7bba9..2195cc4154 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -547,4 +547,9 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { return _createTime; } + + public void mgmtClose() + { + close(); + } } |