summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-16 11:00:46 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-16 11:00:46 +0000
commit0c7c587920299fdc5b2639e5ed7efc143a83c532 (patch)
treecc3d73d92cef05ebf83f003af69bb30bedd068d6
parentf45338ccb5afdf1ff2484f0ebf3f74abf2acd263 (diff)
downloadqpid-python-0c7c587920299fdc5b2639e5ed7efc143a83c532.tar.gz
QPID-372 Broker doesn't wait for ChannelClose-Ok.
Updated AMQProtocolSession to have new methods to query and release a channel from the awaiting close-ok state. Once a channel has been signalled to be closed any further methods on that channel are ignored until a close-ok is sent. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508366 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java123
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java10
5 files changed, 144 insertions, 98 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
index 988413a3de..ad5604e7ea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
@@ -44,6 +44,10 @@ public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCl
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
{
- _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+ int channelId = evt.getChannelId();
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+
+ // Let the Protocol Session know the channel is now closed.
+ stateManager.getProtocolSession().closeChannelOk(channelId);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 27a61f4967..e53410420f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -64,6 +64,7 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -87,7 +88,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
- private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -108,12 +109,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
- private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
- private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
+ private byte _major = pv[pv.length - 1][PROTOCOL_MAJOR];
+ private byte _minor = pv[pv.length - 1][PROTOCOL_MINOR];
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
-
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length - 1][PROTOCOL_MAJOR], pv[pv.length - 1][PROTOCOL_MINOR]);
+ private List<Integer> _closingChannelsList = new ArrayList<Integer>();
public ManagedObject getManagedObject()
@@ -129,7 +130,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
@@ -145,24 +145,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
-
-
-
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
@@ -205,7 +201,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
pi.checkVersion(this); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
- setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+ setProtocolVersion(pi.protocolMajor, pi.protocolMinor);
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
@@ -213,12 +209,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- _major, _minor, // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short)_major, // versionMajor
- (short)_minor); // versionMinor
+ _major, _minor, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) _major, // versionMajor
+ (short) _minor); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -266,7 +262,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
- if(!_frameListeners.isEmpty())
+ if (!_frameListeners.isEmpty())
{
for (AMQMethodListener listener : _frameListeners)
{
@@ -276,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
}
}
catch (AMQChannelException e)
@@ -338,12 +334,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this);
+ getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -370,22 +366,40 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public AMQChannel getChannel(int channelId) throws AMQException
{
- return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- ? _cachedChannels[channelId]
- : _channelMap.get(channelId);
+ if (channelAwaitingClosure(channelId))
+ {
+ return null;
+ }
+ else
+ {
+ return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ ? _cachedChannels[channelId]
+ : _channelMap.get(channelId);
+ }
+ }
+
+ public boolean channelAwaitingClosure(int channelId)
+ {
+ return _closingChannelsList.contains(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
{
if (_closed)
{
- throw new AMQException("Session is closed");
+ throw new AMQException("Session is closed");
}
final int channelId = channel.getChannelId();
+
+ if (_closingChannelsList.contains(channelId))
+ {
+ throw new AMQException("Session is marked awaiting channel close");
+ }
+
_channelMap.put(channelId, channel);
- if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+ if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
{
_cachedChannels[channelId] = channel;
}
@@ -428,12 +442,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
+ * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+ * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
*
* @param channelId id of the channel to close
- * @throws AMQException if an error occurs closing the channel
+ *
+ * @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
public void closeChannel(int channelId) throws AMQException
@@ -447,16 +461,26 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
try
{
+ markChannelawaitingCloseOk(channelId);
channel.close(this);
}
finally
{
removeChannel(channelId);
-
}
}
}
+ public void closeChannelOk(int channelId)
+ {
+ _closingChannelsList.remove(new Integer(channelId));
+ }
+
+ private void markChannelawaitingCloseOk(int channelId)
+ {
+ _closingChannelsList.add(channelId);
+ }
+
/**
* In our current implementation this is used by the clustering code.
*
@@ -465,7 +489,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void removeChannel(int channelId)
{
_channelMap.remove(channelId);
- if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
{
_cachedChannels[channelId] = null;
}
@@ -486,8 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
/**
- * Closes all channels that were opened by this protocol session. This frees up all resources
- * used by the channel.
+ * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
*
* @throws AMQException if an error occurs while closing any channel
*/
@@ -498,16 +521,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
channel.close(this);
}
_channelMap.clear();
- for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+ for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
- _cachedChannels[i]=null;
+ _cachedChannels[i] = null;
}
}
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
+ /** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
if (!_closed)
@@ -518,7 +538,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
- for(Task task : _taskList)
+ for (Task task : _taskList)
{
task.doTask(this);
}
@@ -535,17 +555,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
}
- /**
- * @return an object that can be used to identity
- */
+ /** @return an object that can be used to identity */
public Object getKey()
{
return _minaProtocolSession.getRemoteAddress();
}
/**
- * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
- * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+ * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+ * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
*
* @return a String FQDN
*/
@@ -586,7 +604,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void setClientProperties(FieldTable clientProperties)
{
_clientProperties = clientProperties;
- if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
+ if ((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
@@ -596,7 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_major = major;
_minor = minor;
- _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+ _registry = MainRegistry.getVersionSpecificRegistry(major, minor);
}
public byte getProtocolMajorVersion()
@@ -620,10 +638,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
-
public Object getClientIdentifier()
{
- return _minaProtocolSession.getRemoteAddress();
+ return _minaProtocolSession.getRemoteAddress();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index e0fb1eca2f..503dc8b554 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -34,8 +34,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
-
-
public static interface Task
{
public void doTask(AMQProtocolSession session) throws AMQException;
@@ -43,88 +41,108 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
/**
* Called when a protocol data block is received
+ *
* @param message the data block that has been received
+ *
* @throws Exception if processing the datablock fails
*/
void dataBlockReceived(AMQDataBlock message) throws Exception;
/**
- * Get the context key associated with this session. Context key is described
- * in the AMQ protocol specification (RFC 6).
+ * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+ * 6).
+ *
* @return the context key
*/
AMQShortString getContextKey();
/**
- * Set the context key associated with this session. Context key is described
- * in the AMQ protocol specification (RFC 6).
+ * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
+ * 6).
+ *
* @param contextKey the context key
*/
void setContextKey(AMQShortString contextKey);
/**
- * Get the channel for this session associated with the specified id. A channel
- * id is unique per connection (i.e. per session).
+ * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e.
+ * per session).
+ *
* @param channelId the channel id which must be valid
+ *
* @return null if no channel exists, the channel otherwise
*/
AMQChannel getChannel(int channelId) throws AMQException;
/**
* Associate a channel with this session.
- * @param channel the channel to associate with this session. It is an error to
- * associate the same channel with more than one session but this is not validated.
+ *
+ * @param channel the channel to associate with this session. It is an error to associate the same channel with more
+ * than one session but this is not validated.
*/
void addChannel(AMQChannel channel) throws AMQException;
/**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
+ * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
+ * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
+ *
* @param channelId id of the channel to close
+ *
* @throws org.apache.qpid.AMQException if an error occurs closing the channel
- * @throws IllegalArgumentException if the channel id is not valid
+ * @throws IllegalArgumentException if the channel id is not valid
*/
void closeChannel(int channelId) throws AMQException;
/**
+ * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be
+ * created on that id.
+ *
+ * @param channelId id of the channel to close
+ */
+ void closeChannelOk(int channelId);
+
+ /**
+ * Check to see if this chanel is closing
+ *
+ * @param channelId id to check
+ * @return boolean with state of channel awaiting closure
+ */
+ boolean channelAwaitingClosure(int channelId);
+
+ /**
* Remove a channel from the session but do not close it.
+ *
* @param channelId
*/
void removeChannel(int channelId);
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
void initHeartbeats(int delay);
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
+ /** This must be called when the session is _closed in order to free up any resources managed by the session. */
void closeSession() throws AMQException;
- /**
- * @return a key that uniquely identifies this session
- */
+ /** @return a key that uniquely identifies this session */
Object getKey();
/**
- * Get the fully qualified domain name of the local address to which this session is bound. Since some servers
- * may be bound to multiple addresses this could vary depending on the acceptor this session was created from.
+ * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may
+ * be bound to multiple addresses this could vary depending on the acceptor this session was created from.
*
* @return a String FQDN
*/
String getLocalFQDN();
- /**
- * @return the sasl server that can perform authentication for this session.
- */
+ /** @return the sasl server that can perform authentication for this session. */
SaslServer getSaslServer();
/**
* Set the sasl server that is to perform authentication for this session.
+ *
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 982443f1e8..6d1e9ce99d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -89,10 +89,8 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
- *
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
*/
public class AMQStateManager implements AMQMethodListener
{
@@ -100,14 +98,12 @@ public class AMQStateManager implements AMQMethodListener
private final VirtualHostRegistry _virtualHostRegistry;
private final AMQProtocolSession _protocolSession;
- /**
- * The current state
- */
+ /** The current state */
private AMQState _currentState;
/**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
- * The class must be a subclass of AMQFrame.
+ * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+ * AMQFrame.
*/
private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class);
@@ -206,7 +202,7 @@ public class AMQStateManager implements AMQMethodListener
public void error(Exception e)
{
- _logger.error("State manager received error notification: " + e, e);
+ _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
for (StateListener l : _stateListeners)
{
l.error(e);
@@ -221,7 +217,7 @@ public class AMQStateManager implements AMQMethodListener
checkChannel(evt, _protocolSession);
- handler.methodReceived(this, evt);
+ handler.methodReceived(this, evt);
return true;
}
return false;
@@ -230,16 +226,17 @@ public class AMQStateManager implements AMQMethodListener
private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
throws AMQException
{
- if(evt.getChannelId() != 0
- && !(evt.getMethod() instanceof ChannelOpenBody)
- && protocolSession.getChannel(evt.getChannelId()) == null)
+ if (evt.getChannelId() != 0
+ && !(evt.getMethod() instanceof ChannelOpenBody)
+ && (protocolSession.getChannel(evt.getChannelId()) == null)
+ && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
{
- throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(),"No such channel: " + evt.getChannelId());
+ throw evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR.getCode(), "No such channel: " + evt.getChannelId());
}
}
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
- B frame)
+ B frame)
throws IllegalStateTransitionException
{
if (_logger.isDebugEnabled())
@@ -250,8 +247,8 @@ public class AMQStateManager implements AMQMethodListener
classToHandlerMap = _state2HandlersMap.get(currentState);
final StateAwareMethodListener<B> handler = classToHandlerMap == null
- ? null
- : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+ ? null
+ : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
if (handler == null)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 0dd7744d1f..bab7954d11 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -94,6 +94,16 @@ public class MockProtocolSession implements AMQProtocolSession
{
}
+ public void closeChannelOk(int channelId)
+ {
+
+ }
+
+ public boolean channelAwaitingClosure(int channelId)
+ {
+ return false;
+ }
+
public void removeChannel(int channelId)
{
_channelMap.remove(channelId);