diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 11:00:46 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 11:00:46 +0000 |
commit | 0c7c587920299fdc5b2639e5ed7efc143a83c532 (patch) | |
tree | cc3d73d92cef05ebf83f003af69bb30bedd068d6 | |
parent | f45338ccb5afdf1ff2484f0ebf3f74abf2acd263 (diff) | |
download | qpid-python-0c7c587920299fdc5b2639e5ed7efc143a83c532.tar.gz |
QPID-372 Broker doesn't wait for ChannelClose-Ok.
Updated AMQProtocolSession to have new methods to query and release a channel from the awaiting close-ok state. Once a channel has been signalled to be closed any further methods on that channel are ignored until a close-ok is sent.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508366 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 144 insertions, 98 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java index 988413a3de..ad5604e7ea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java @@ -44,6 +44,10 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException { - _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); + int channelId = evt.getChannelId(); + _logger.info("Received channel-close-ok for channel-id " + channelId); + + // Let the Protocol Session know the channel is now closed. + stateManager.getProtocolSession().closeChannelOk(channelId); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 27a61f4967..e53410420f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -64,6 +64,7 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -87,7 +88,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1]; + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); @@ -108,12 +109,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major = pv[pv.length-1][PROTOCOL_MAJOR]; - private byte _minor = pv[pv.length-1][PROTOCOL_MINOR]; + private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR]; + private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR]; private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); - + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]); + private List<Integer> _closingChannelsList = new ArrayList<Integer>(); public ManagedObject getManagedObject() @@ -129,7 +130,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; @@ -145,24 +145,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } - - - // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; @@ -205,7 +201,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, pi.checkVersion(this); // Fails if not correct // This sets the protocol version (and hence framing classes) for this session. - setProtocolVersion(pi.protocolMajor,pi.protocolMinor); + setProtocolVersion(pi.protocolMajor, pi.protocolMinor); String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); @@ -213,12 +209,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // Interfacing with generated code - be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - _major, _minor, // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short)_major, // versionMajor - (short)_minor); // versionMinor + _major, _minor, // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) _major, // versionMajor + (short) _minor); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -266,7 +262,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - if(!_frameListeners.isEmpty()) + if (!_frameListeners.isEmpty()) { for (AMQMethodListener listener : _frameListeners) { @@ -276,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker."); } } catch (AMQChannelException e) @@ -338,12 +334,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this); + getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -370,22 +366,40 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public AMQChannel getChannel(int channelId) throws AMQException { - return ((channelId & CHANNEL_CACHE_SIZE) == channelId) - ? _cachedChannels[channelId] - : _channelMap.get(channelId); + if (channelAwaitingClosure(channelId)) + { + return null; + } + else + { + return ((channelId & CHANNEL_CACHE_SIZE) == channelId) + ? _cachedChannels[channelId] + : _channelMap.get(channelId); + } + } + + public boolean channelAwaitingClosure(int channelId) + { + return _closingChannelsList.contains(channelId); } public void addChannel(AMQChannel channel) throws AMQException { if (_closed) { - throw new AMQException("Session is closed"); + throw new AMQException("Session is closed"); } final int channelId = channel.getChannelId(); + + if (_closingChannelsList.contains(channelId)) + { + throw new AMQException("Session is marked awaiting channel close"); + } + _channelMap.put(channelId, channel); - if(((channelId & CHANNEL_CACHE_SIZE) == channelId)) + if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) { _cachedChannels[channelId] = channel; } @@ -428,12 +442,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Close a specific channel. This will remove any resources used by the channel, including: - * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> - * </ul> + * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue + * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> * * @param channelId id of the channel to close - * @throws AMQException if an error occurs closing the channel + * + * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ public void closeChannel(int channelId) throws AMQException @@ -447,16 +461,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { try { + markChannelawaitingCloseOk(channelId); channel.close(this); } finally { removeChannel(channelId); - } } } + public void closeChannelOk(int channelId) + { + _closingChannelsList.remove(new Integer(channelId)); + } + + private void markChannelawaitingCloseOk(int channelId) + { + _closingChannelsList.add(channelId); + } + /** * In our current implementation this is used by the clustering code. * @@ -465,7 +489,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void removeChannel(int channelId) { _channelMap.remove(channelId); - if((channelId & CHANNEL_CACHE_SIZE) == channelId) + if ((channelId & CHANNEL_CACHE_SIZE) == channelId) { _cachedChannels[channelId] = null; } @@ -486,8 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } /** - * Closes all channels that were opened by this protocol session. This frees up all resources - * used by the channel. + * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. * * @throws AMQException if an error occurs while closing any channel */ @@ -498,16 +521,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, channel.close(this); } _channelMap.clear(); - for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++) + for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { - _cachedChannels[i]=null; + _cachedChannels[i] = null; } } - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() throws AMQException { if (!_closed) @@ -518,7 +538,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } - for(Task task : _taskList) + for (Task task : _taskList) { task.doTask(this); } @@ -535,17 +555,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; } - /** - * @return an object that can be used to identity - */ + /** @return an object that can be used to identity */ public Object getKey() { return _minaProtocolSession.getRemoteAddress(); } /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers - * may be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. * * @return a String FQDN */ @@ -586,7 +604,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void setClientProperties(FieldTable clientProperties) { _clientProperties = clientProperties; - if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) + if ((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null)) { setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); } @@ -596,7 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _major = major; _minor = minor; - _registry = MainRegistry.getVersionSpecificRegistry(major,minor); + _registry = MainRegistry.getVersionSpecificRegistry(major, minor); } public byte getProtocolMajorVersion() @@ -620,10 +638,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } - public Object getClientIdentifier() { - return _minaProtocolSession.getRemoteAddress(); + return _minaProtocolSession.getRemoteAddress(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index e0fb1eca2f..503dc8b554 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -34,8 +34,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { - - public static interface Task { public void doTask(AMQProtocolSession session) throws AMQException; @@ -43,88 +41,108 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession /** * Called when a protocol data block is received + * * @param message the data block that has been received + * * @throws Exception if processing the datablock fails */ void dataBlockReceived(AMQDataBlock message) throws Exception; /** - * Get the context key associated with this session. Context key is described - * in the AMQ protocol specification (RFC 6). + * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC + * 6). + * * @return the context key */ AMQShortString getContextKey(); /** - * Set the context key associated with this session. Context key is described - * in the AMQ protocol specification (RFC 6). + * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC + * 6). + * * @param contextKey the context key */ void setContextKey(AMQShortString contextKey); /** - * Get the channel for this session associated with the specified id. A channel - * id is unique per connection (i.e. per session). + * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e. + * per session). + * * @param channelId the channel id which must be valid + * * @return null if no channel exists, the channel otherwise */ AMQChannel getChannel(int channelId) throws AMQException; /** * Associate a channel with this session. - * @param channel the channel to associate with this session. It is an error to - * associate the same channel with more than one session but this is not validated. + * + * @param channel the channel to associate with this session. It is an error to associate the same channel with more + * than one session but this is not validated. */ void addChannel(AMQChannel channel) throws AMQException; /** - * Close a specific channel. This will remove any resources used by the channel, including: - * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> - * </ul> + * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue + * subscriptions (this may in turn remove queues if they are auto delete</li> </ul> + * * @param channelId id of the channel to close + * * @throws org.apache.qpid.AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid + * @throws IllegalArgumentException if the channel id is not valid */ void closeChannel(int channelId) throws AMQException; /** + * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be + * created on that id. + * + * @param channelId id of the channel to close + */ + void closeChannelOk(int channelId); + + /** + * Check to see if this chanel is closing + * + * @param channelId id to check + * @return boolean with state of channel awaiting closure + */ + boolean channelAwaitingClosure(int channelId); + + /** * Remove a channel from the session but do not close it. + * * @param channelId */ void removeChannel(int channelId); /** * Initialise heartbeats on the session. + * * @param delay delay in seconds (not ms) */ void initHeartbeats(int delay); - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ void closeSession() throws AMQException; - /** - * @return a key that uniquely identifies this session - */ + /** @return a key that uniquely identifies this session */ Object getKey(); /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers - * may be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. * * @return a String FQDN */ String getLocalFQDN(); - /** - * @return the sasl server that can perform authentication for this session. - */ + /** @return the sasl server that can perform authentication for this session. */ SaslServer getSaslServer(); /** * Set the sasl server that is to perform authentication for this session. + * * @param saslServer */ void setSaslServer(SaslServer saslServer); diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 982443f1e8..6d1e9ce99d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -89,10 +89,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** - * The state manager is responsible for managing the state of the protocol session. - * <p/> - * For each AMQProtocolHandler there is a separate state manager. - * + * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler + * there is a separate state manager. */ public class AMQStateManager implements AMQMethodListener { @@ -100,14 +98,12 @@ public class AMQStateManager implements AMQMethodListener private final VirtualHostRegistry _virtualHostRegistry; private final AMQProtocolSession _protocolSession; - /** - * The current state - */ + /** The current state */ private AMQState _currentState; /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. - * The class must be a subclass of AMQFrame. + * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of + * AMQFrame. */ private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap = new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class); @@ -206,7 +202,7 @@ public class AMQStateManager implements AMQMethodListener public void error(Exception e) { - _logger.error("State manager received error notification: " + e, e); + _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e); for (StateListener l : _stateListeners) { l.error(e); @@ -221,7 +217,7 @@ public class AMQStateManager implements AMQMethodListener checkChannel(evt, _protocolSession); - handler.methodReceived(this, evt); + handler.methodReceived(this, evt); return true; } return false; @@ -230,16 +226,17 @@ public class AMQStateManager implements AMQMethodListener private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession) throws AMQException { - if(evt.getChannelId() != 0 - && !(evt.getMethod() instanceof ChannelOpenBody) - && protocolSession.getChannel(evt.getChannelId()) == null) + if (evt.getChannelId() != 0 + && !(evt.getMethod() instanceof ChannelOpenBody) + && (protocolSession.getChannel(evt.getChannelId()) == null) + && !protocolSession.channelAwaitingClosure(evt.getChannelId())) { - throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId()); + throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId()); } } protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState, - B frame) + B frame) throws IllegalStateTransitionException { if (_logger.isDebugEnabled()) @@ -250,8 +247,8 @@ public class AMQStateManager implements AMQMethodListener classToHandlerMap = _state2HandlersMap.get(currentState); final StateAwareMethodListener<B> handler = classToHandlerMap == null - ? null - : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); + ? null + : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); if (handler == null) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 0dd7744d1f..bab7954d11 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -94,6 +94,16 @@ public class MockProtocolSession implements AMQProtocolSession { } + public void closeChannelOk(int channelId) + { + + } + + public boolean channelAwaitingClosure(int channelId) + { + return false; + } + public void removeChannel(int channelId) { _channelMap.remove(channelId); |