diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-22 15:05:58 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-22 15:05:58 +0000 |
commit | 5401782b97dd4185b122f234e67373574b16671d (patch) | |
tree | 470c6bddce4f7e39a1b00c2e6794c725dfa741f9 | |
parent | 80653440840bbd4a846835688bda4ce418e5323d (diff) | |
download | qpid-python-5401782b97dd4185b122f234e67373574b16671d.tar.gz |
QPID-310 Propagated JMS Exception to client.
QPID-308 Configurable timeout on blockForFrame.
Timeouts added but need to be configurable.
QPID-311 Dispatcher Thread is not thread safe.
Added the missing Synchronized code and renamed vars to make it more readable
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@498637 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 265 insertions, 108 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b82a735f06..a155117a7f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -142,12 +142,21 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private QpidConnectionMetaData _connectionMetaData; + /** + * @param broker brokerdetails + * @param username username + * @param password password + * @param clientName clientid + * @param virtualHost virtualhost + * @throws AMQException + * @throws URLSyntaxException + */ public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws AMQException, URLSyntaxException { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); } @@ -163,12 +172,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(useSSL ? ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'" : ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'" )); @@ -466,22 +475,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null), // outOfBand - ChannelOpenOkBody.class); + ChannelOpenBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + null), // outOfBand + ChannelOpenOkBody.class); //todo send low water mark when protocol allows. // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + BasicQosBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); if (transacted) { @@ -492,7 +501,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class); + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class); } } @@ -524,6 +533,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions * where specified in the JMS spec + * * @param transacted * @param acknowledgeMode * @return QueueSession @@ -537,6 +547,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions * where specified in the JMS spec + * * @param transacted * @param acknowledgeMode * @return TopicSession @@ -571,7 +582,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); return _connectionMetaData; - + } public ExceptionListener getExceptionListener() throws JMSException @@ -622,14 +633,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - synchronized(getFailoverMutex()) + close(-1); + } + + public void close(long timeout) throws JMSException + { + synchronized (getFailoverMutex()) { if (!_closed.getAndSet(true)) { try { - closeAllSessions(null); - _protocolHandler.closeConnection(); + closeAllSessions(null, timeout); + _protocolHandler.closeConnection(timeout); } catch (AMQException e) { @@ -666,7 +682,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * <p/> * The caller must hold the failover mutex before calling this method. */ - private void closeAllSessions(Throwable cause) throws JMSException + private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); @@ -682,7 +698,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - session.close(); + session.close(timeout); } catch (JMSException e) { @@ -900,7 +916,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause); + je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause); } else { @@ -931,7 +947,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Closing AMQConnection due to :" + cause.getMessage()); _closed.set(true); - closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { @@ -953,8 +969,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect void deregisterSession(int channelId) { _sessions.remove(channelId); - } - + } + /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bf812ee302..573c1fcc61 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -140,12 +140,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer */ - private final AtomicBoolean _pausing = new AtomicBoolean(false); + private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false); /** * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer */ - private final AtomicBoolean _paused = new AtomicBoolean(false); + private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false); /** * Set when recover is called. This is to handle the case where recover() is called by application code @@ -171,7 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private class Dispatcher extends Thread { - private final Logger _logger = Logger.getLogger(Dispatcher.class); + private final Logger _logger = Logger.getLogger(Dispatcher.class); + private boolean _reDispatching = true; public Dispatcher() { @@ -184,41 +185,47 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi while (!_stopped.get()) { - if (_pausing.get()) + synchronized (_pausingDispatcher) { - try + if (_pausingDispatcher.get()) { - //Wait for unpausing - synchronized (_pausing) + try { - synchronized (_paused) + + _pausingDispatcher.set(false); + + //Wait to continue with pause code. + synchronized (_pausedDispatcher) { - _paused.notify(); + _pausedDispatcher.notify(); } - _logger.info("dispatcher paused"); - - _pausing.wait(); - _logger.info("dispatcher notified"); - } + _reDispatching = true; + _logger.info("Dispatcher paused"); + _pausingDispatcher.wait(); + _logger.info("Dispatcher notified"); + + } + catch (InterruptedException e) + { + _logger.info("dispacher interrupted"); + } } - catch (InterruptedException e) - { - //do nothing... occurs when a pause request occurs will already - // be here if another pause event is pending - _logger.info("dispacher interrupted"); - } + } + if (_reDispatching) + { doReDispatch(); - } else { doNormalDispatch(); } + } + _logger.info("Dispatcher thread terminating for channel " + _channelId); } @@ -227,7 +234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi UnprocessedMessage message; try { - while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_stopped.get() && !_pausingDispatcher.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } @@ -257,7 +264,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_reprocessQueue == null || _reprocessQueue.isEmpty()) { _logger.info("Reprocess Queue emptied"); - _pausing.set(false); + + _reDispatching = false; } else { @@ -343,30 +351,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void pause() { _logger.info("pausing"); - _pausing.set(true); + synchronized (_pausedDispatcher) + { + _pausingDispatcher.set(true); - interrupt(); + interrupt(); - synchronized (_paused) - { try { - _paused.wait(); + _pausedDispatcher.wait(); } catch (InterruptedException e) { - //do nothing + //do nothing } } } public void reprocess() { - synchronized (_pausing) + synchronized (_pausingDispatcher) { _logger.info("reprocessing"); - _pausing.notify(); + _pausingDispatcher.notify(); } } } @@ -578,6 +586,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close() throws JMSException { + close(-1); + } + + public void close(long timeout) throws JMSException + { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session synchronized (_connection.getFailoverMutex()) @@ -624,8 +637,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param amqe the exception, may be null to indicate no error has occurred */ - private void closeProducersAndConsumers(AMQException amqe) + private void closeProducersAndConsumers(AMQException amqe) throws JMSException { + JMSException jmse = null; try { closeProducers(); @@ -633,6 +647,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (JMSException e) { _logger.error("Error closing session: " + e, e); + jmse = e; } try { @@ -641,7 +656,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (JMSException e) { _logger.error("Error closing session: " + e, e); + if (jmse == null) + { + jmse = e; + } } + finally + { + if (jmse != null) + { + throw jmse; + } + } + } /** @@ -650,7 +677,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param e the exception that caused this session to be closed. Null causes the */ - public void closed(Throwable e) + public void closed(Throwable e) throws JMSException { synchronized (_connection.getFailoverMutex()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 50596d4bfc..7b789aa09d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -82,8 +82,6 @@ public class FailoverHandler implements Runnable // client code which runs in a separate thread. synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) { - _logger.info("Starting failover process"); - // We switch in a new state manager temporarily so that the interaction to get to the "connection open" // state works, without us having to terminate any existing "state waiters". We could theoretically // have a state waiter waiting until the connection is closed for some reason. Or in future we may have @@ -92,6 +90,8 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { + _logger.info("Failover process veto-ed by client"); + _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { @@ -105,6 +105,9 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setFailoverLatch(null); return; } + + _logger.info("Starting failover process"); + boolean failoverSucceeded; // when host is non null we have a specified failover host otherwise we all the client to cycle through // all specified hosts diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 0d2877c926..fbf195d20e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -29,6 +29,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -89,6 +90,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter private CountDownLatch _failoverLatch; + private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + public AMQProtocolHandler(AMQConnection con) { _connection = con; @@ -280,7 +283,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void propagateExceptionToWaiters(Exception e) { getStateManager().error(e); - if(!_frameListeners.isEmpty()) + if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); while (it.hasNext()) @@ -319,7 +322,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if(!_frameListeners.isEmpty()) + if (!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); while (it.hasNext()) @@ -330,13 +333,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); } } catch (AMQException e) { getStateManager().error(e); - if(!_frameListeners.isEmpty()) + if (!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); while (it.hasNext()) @@ -383,17 +386,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.debug("Sent frame " + message); } } -/* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ + /* + public void addFrameListener(AMQMethodListener listener) + { + _frameListeners.add(listener); + } + + public void removeFrameListener(AMQMethodListener listener) + { + _frameListeners.remove(listener); + } + */ public void attainState(AMQState s) throws AMQException { getStateManager().attainState(s); @@ -427,12 +431,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter BlockingMethodFrameListener listener) throws AMQException { + return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); + } + + /** + * Convenience method that writes a frame to the protocol session and waits for + * a particular response. Equivalent to calling getProtocolSession().write() then + * waiting for the response. + * + * @param frame + * @param listener the blocking listener. Note the calling thread will block. + */ + private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, + BlockingMethodFrameListener listener, long timeout) + throws AMQException + { try { _frameListeners.add(listener); _protocolSession.writeFrame(frame); - AMQMethodEvent e = listener.blockForFrame(); + AMQMethodEvent e = listener.blockForFrame(timeout); return e; // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener @@ -454,8 +473,16 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { + return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); + } + + /** + * More convenient method to write a frame and wait for it's response. + */ + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException + { return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.channel, responseClass)); + new SpecificMethodFrameListener(frame.channel, responseClass), timeout); } /** @@ -488,20 +515,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { + closeConnection(-1); + } + + public void closeConnection(long timeout) throws AMQException + { getStateManager().changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - (byte)8, (byte)0, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText - syncWrite(frame, ConnectionCloseOkBody.class); + (byte) 8, (byte) 0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection.")); // replyText + + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + - _protocolSession.closeProtocolSession(); } /** @@ -566,7 +607,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _stateManager = stateManager; _protocolSession.setStateManager(stateManager); } - + public AMQProtocolSession getProtocolSession() { return _protocolSession; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 8523e1cfce..b6dd05d761 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentMap; /** * Wrapper for protocol session that provides type-safe access to session attributes. - * + * <p/> * The underlying protocol session is still available but clients should not * use it to obtain session attributes. */ @@ -110,6 +110,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _minaProtocolSession = protocolSession; // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + //fixme - real value needed + _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = new AMQStateManager(this); } @@ -119,10 +121,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _minaProtocolSession = protocolSession; // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - + //fixme - real value needed + _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); - + } public void init() @@ -153,12 +156,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { getAMQConnection().setClientID(clientID); } - + public AMQStateManager getStateManager() { return _stateManager; } - + public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; @@ -191,8 +194,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis /** * Store the SASL client currently being used for the authentication handshake + * * @param client if non-null, stores this in the session. if null clears any existing client - * being stored + * being stored */ public void setSaslClient(SaslClient client) { @@ -223,6 +227,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis /** * Callback invoked from the BasicDeliverMethodHandler when a message has been received. * This is invoked on the MINA dispatcher thread. + * * @param message * @throws AMQException if this was not expected */ @@ -280,8 +285,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis /** * Deliver a message to the appropriate session, removing the unprocessed message * from our map + * * @param channelId the channel id the message should be delivered to - * @param msg the message + * @param msg the message */ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) { @@ -306,6 +312,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis WriteFuture f = _minaProtocolSession.write(frame); if (wait) { + //fixme -- time out? f.join(); } else @@ -340,6 +347,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis /** * Starts the process of closing a session + * * @param session the AMQSession being closed */ public void closeSession(AMQSession session) @@ -361,19 +369,27 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis * This method decides whether this is a response or an initiation. The latter * case causes the AMQSession to be closed and an exception to be thrown if * appropriate. + * * @param channelId the id of the channel (session) * @return true if the client must respond to the server, i.e. if the server - * initiated the channel close, false if the channel close is just the server - * responding to the client's earlier request to close the channel. + * initiated the channel close, false if the channel close is just the server + * responding to the client's earlier request to close the channel. */ - public boolean channelClosed(int channelId, int code, String text) + public boolean channelClosed(int channelId, int code, String text) throws AMQException { final Integer chId = channelId; // if this is not a response to an earlier request to close the channel if (_closingChannels.remove(chId) == null) { final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); - session.closed(new AMQException(_logger, code, text)); + try + { + session.closed(new AMQException(_logger, code, text)); + } + catch (JMSException e) + { + throw new AMQException("JMSException received while closing session", e); + } return true; } else @@ -389,15 +405,20 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis public void closeProtocolSession() { + closeProtocolSession(true); + } + + public void closeProtocolSession(boolean waitLast) + { _logger.debug("Waiting for last write to join."); - if (_lastWriteFuture != null) + if (waitLast && _lastWriteFuture != null) { _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); - future.join(); + future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } public void failover(String host, int port) @@ -408,17 +429,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis protected AMQShortString generateQueueName() { int id; - synchronized(_queueIdLock) + synchronized (_queueIdLock) { id = _queueId++; } //get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:",""); + String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } /** - * * @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index f96da300ff..1656695ba9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -21,6 +21,8 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -91,7 +93,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener /** * This method is called by the thread that wants to wait for a frame. */ - public AMQMethodEvent blockForFrame() throws AMQException + public AMQMethodEvent blockForFrame(long timeout) throws AMQException { synchronized (_lock) { @@ -99,11 +101,29 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener { try { - _lock.wait(); + if (timeout == -1) + { + _lock.wait(); + } + else + { + + _lock.wait(timeout); + if (!_ready) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion"); + _ready = true; + } + } } catch (InterruptedException e) { - // IGNORE + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess + if (!_ready && timeout != -1) + { + _error = new AMQException("Server did not respond timely"); + _ready = true; + } } } } @@ -115,7 +135,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } else if (_error instanceof FailoverException) { - throw (FailoverException)_error; // needed to expose FailoverException. + // This should ensure that FailoverException is not wrapped and can be caught. + throw(FailoverException) _error; // needed to expose FailoverException. } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 18e1fdad82..b2940d73ae 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQException; /** * Waits for a particular state to be reached. - * */ public class StateWaiter implements StateListener { @@ -38,6 +37,7 @@ public class StateWaiter implements StateListener private volatile Throwable _throwable; private final Object _monitor = new Object(); + private static final long TIME_OUT = 1000 * 60 * 2; public StateWaiter(AMQState state) { @@ -46,7 +46,7 @@ public class StateWaiter implements StateListener public void waituntilStateHasChanged() throws AMQException { - synchronized(_monitor) + synchronized (_monitor) { // // The guard is required in case we are woken up by a spurious @@ -57,7 +57,7 @@ public class StateWaiter implements StateListener try { _logger.debug("State " + _state + " not achieved so waiting..."); - _monitor.wait(); + _monitor.wait(TIME_OUT); } catch (InterruptedException e) { @@ -82,7 +82,7 @@ public class StateWaiter implements StateListener public void stateChanged(AMQState oldState, AMQState newState) { - synchronized(_monitor) + synchronized (_monitor) { if (_logger.isDebugEnabled()) { @@ -103,7 +103,7 @@ public class StateWaiter implements StateListener public void error(Throwable t) { - synchronized(_monitor) + synchronized (_monitor) { if (_logger.isDebugEnabled()) { diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java new file mode 100644 index 0000000000..6af681f479 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +public class AMQTimeoutException extends AMQException +{ + public AMQTimeoutException(String message) + { + super(message); + } +} |