summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-22 15:05:58 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-22 15:05:58 +0000
commit5401782b97dd4185b122f234e67373574b16671d (patch)
tree470c6bddce4f7e39a1b00c2e6794c725dfa741f9
parent80653440840bbd4a846835688bda4ce418e5323d (diff)
downloadqpid-python-5401782b97dd4185b122f234e67373574b16671d.tar.gz
QPID-310 Propagated JMS Exception to client.
QPID-308 Configurable timeout on blockForFrame. Timeouts added but need to be configurable. QPID-311 Dispatcher Thread is not thread safe. Added the missing Synchronized code and renamed vars to make it more readable git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@498637 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java93
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java89
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java29
8 files changed, 265 insertions, 108 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b82a735f06..a155117a7f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -142,12 +142,21 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private QpidConnectionMetaData _connectionMetaData;
+ /**
+ * @param broker brokerdetails
+ * @param username username
+ * @param password password
+ * @param clientName clientid
+ * @param virtualHost virtualhost
+ * @throws AMQException
+ * @throws URLSyntaxException
+ */
public AMQConnection(String broker, String username, String password,
String clientName, String virtualHost) throws AMQException, URLSyntaxException
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
}
@@ -163,12 +172,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='true'" :
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
@@ -466,22 +475,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null), // outOfBand
- ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ BasicQosBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
if (transacted)
{
@@ -492,7 +501,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class);
}
}
@@ -524,6 +533,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return QueueSession
@@ -537,6 +547,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return TopicSession
@@ -571,7 +582,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
checkNotClosed();
return _connectionMetaData;
-
+
}
public ExceptionListener getExceptionListener() throws JMSException
@@ -622,14 +633,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- synchronized(getFailoverMutex())
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
+ synchronized (getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
try
{
- closeAllSessions(null);
- _protocolHandler.closeConnection();
+ closeAllSessions(null, timeout);
+ _protocolHandler.closeConnection(timeout);
}
catch (AMQException e)
{
@@ -666,7 +682,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* <p/>
* The caller must hold the failover mutex before calling this method.
*/
- private void closeAllSessions(Throwable cause) throws JMSException
+ private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
@@ -682,7 +698,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- session.close();
+ session.close(timeout);
}
catch (JMSException e)
{
@@ -900,7 +916,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause);
}
else
{
@@ -931,7 +947,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
_closed.set(true);
- closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
catch (JMSException e)
{
@@ -953,8 +969,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bf812ee302..573c1fcc61 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -140,12 +140,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
*/
- private final AtomicBoolean _pausing = new AtomicBoolean(false);
+ private final AtomicBoolean _pausingDispatcher = new AtomicBoolean(false);
/**
* Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
*/
- private final AtomicBoolean _paused = new AtomicBoolean(false);
+ private final AtomicBoolean _pausedDispatcher = new AtomicBoolean(false);
/**
* Set when recover is called. This is to handle the case where recover() is called by application code
@@ -171,7 +171,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private class Dispatcher extends Thread
{
- private final Logger _logger = Logger.getLogger(Dispatcher.class);
+ private final Logger _logger = Logger.getLogger(Dispatcher.class);
+ private boolean _reDispatching = true;
public Dispatcher()
{
@@ -184,41 +185,47 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
while (!_stopped.get())
{
- if (_pausing.get())
+ synchronized (_pausingDispatcher)
{
- try
+ if (_pausingDispatcher.get())
{
- //Wait for unpausing
- synchronized (_pausing)
+ try
{
- synchronized (_paused)
+
+ _pausingDispatcher.set(false);
+
+ //Wait to continue with pause code.
+ synchronized (_pausedDispatcher)
{
- _paused.notify();
+ _pausedDispatcher.notify();
}
- _logger.info("dispatcher paused");
-
- _pausing.wait();
- _logger.info("dispatcher notified");
- }
+ _reDispatching = true;
+ _logger.info("Dispatcher paused");
+ _pausingDispatcher.wait();
+ _logger.info("Dispatcher notified");
+
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("dispacher interrupted");
+ }
}
- catch (InterruptedException e)
- {
- //do nothing... occurs when a pause request occurs will already
- // be here if another pause event is pending
- _logger.info("dispacher interrupted");
- }
+ }
+ if (_reDispatching)
+ {
doReDispatch();
-
}
else
{
doNormalDispatch();
}
+
}
+
_logger.info("Dispatcher thread terminating for channel " + _channelId);
}
@@ -227,7 +234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
UnprocessedMessage message;
try
{
- while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_stopped.get() && !_pausingDispatcher.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
dispatchMessage(message);
}
@@ -257,7 +264,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_reprocessQueue == null || _reprocessQueue.isEmpty())
{
_logger.info("Reprocess Queue emptied");
- _pausing.set(false);
+
+ _reDispatching = false;
}
else
{
@@ -343,30 +351,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void pause()
{
_logger.info("pausing");
- _pausing.set(true);
+ synchronized (_pausedDispatcher)
+ {
+ _pausingDispatcher.set(true);
- interrupt();
+ interrupt();
- synchronized (_paused)
- {
try
{
- _paused.wait();
+ _pausedDispatcher.wait();
}
catch (InterruptedException e)
{
- //do nothing
+ //do nothing
}
}
}
public void reprocess()
{
- synchronized (_pausing)
+ synchronized (_pausingDispatcher)
{
_logger.info("reprocessing");
- _pausing.notify();
+ _pausingDispatcher.notify();
}
}
}
@@ -578,6 +586,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close() throws JMSException
{
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
synchronized (_connection.getFailoverMutex())
@@ -624,8 +637,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param amqe the exception, may be null to indicate no error has occurred
*/
- private void closeProducersAndConsumers(AMQException amqe)
+ private void closeProducersAndConsumers(AMQException amqe) throws JMSException
{
+ JMSException jmse = null;
try
{
closeProducers();
@@ -633,6 +647,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ jmse = e;
}
try
{
@@ -641,7 +656,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ if (jmse == null)
+ {
+ jmse = e;
+ }
}
+ finally
+ {
+ if (jmse != null)
+ {
+ throw jmse;
+ }
+ }
+
}
/**
@@ -650,7 +677,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param e the exception that caused this session to be closed. Null causes the
*/
- public void closed(Throwable e)
+ public void closed(Throwable e) throws JMSException
{
synchronized (_connection.getFailoverMutex())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 50596d4bfc..7b789aa09d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -82,8 +82,6 @@ public class FailoverHandler implements Runnable
// client code which runs in a separate thread.
synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
{
- _logger.info("Starting failover process");
-
// We switch in a new state manager temporarily so that the interaction to get to the "connection open"
// state works, without us having to terminate any existing "state waiters". We could theoretically
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
@@ -92,6 +90,8 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
+ _logger.info("Failover process veto-ed by client");
+
_amqProtocolHandler.setStateManager(existingStateManager);
if (_host != null)
{
@@ -105,6 +105,9 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverLatch(null);
return;
}
+
+ _logger.info("Starting failover process");
+
boolean failoverSucceeded;
// when host is non null we have a specified failover host otherwise we all the client to cycle through
// all specified hosts
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 0d2877c926..fbf195d20e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -29,6 +29,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -89,6 +90,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private CountDownLatch _failoverLatch;
+ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
@@ -280,7 +283,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToWaiters(Exception e)
{
getStateManager().error(e);
- if(!_frameListeners.isEmpty())
+ if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
while (it.hasNext())
@@ -319,7 +322,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if(!_frameListeners.isEmpty())
+ if (!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
while (it.hasNext())
@@ -330,13 +333,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
catch (AMQException e)
{
getStateManager().error(e);
- if(!_frameListeners.isEmpty())
+ if (!_frameListeners.isEmpty())
{
Iterator it = _frameListeners.iterator();
while (it.hasNext())
@@ -383,17 +386,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.debug("Sent frame " + message);
}
}
-/*
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
- */
+ /*
+ public void addFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.add(listener);
+ }
+
+ public void removeFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.remove(listener);
+ }
+ */
public void attainState(AMQState s) throws AMQException
{
getStateManager().attainState(s);
@@ -427,12 +431,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter
BlockingMethodFrameListener listener)
throws AMQException
{
+ return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session and waits for
+ * a particular response. Equivalent to calling getProtocolSession().write() then
+ * waiting for the response.
+ *
+ * @param frame
+ * @param listener the blocking listener. Note the calling thread will block.
+ */
+ private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ BlockingMethodFrameListener listener, long timeout)
+ throws AMQException
+ {
try
{
_frameListeners.add(listener);
_protocolSession.writeFrame(frame);
- AMQMethodEvent e = listener.blockForFrame();
+ AMQMethodEvent e = listener.blockForFrame(timeout);
return e;
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
@@ -454,8 +473,16 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
{
+ return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
+ }
+
+ /**
+ * More convenient method to write a frame and wait for it's response.
+ */
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+ {
return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.channel, responseClass));
+ new SpecificMethodFrameListener(frame.channel, responseClass), timeout);
}
/**
@@ -488,20 +515,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
+ closeConnection(-1);
+ }
+
+ public void closeConnection(long timeout) throws AMQException
+ {
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
- syncWrite(frame, ConnectionCloseOkBody.class);
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection.")); // replyText
+
+ try
+ {
+ syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
+
- _protocolSession.closeProtocolSession();
}
/**
@@ -566,7 +607,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_stateManager = stateManager;
_protocolSession.setStateManager(stateManager);
}
-
+
public AMQProtocolSession getProtocolSession()
{
return _protocolSession;
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 8523e1cfce..b6dd05d761 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentMap;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
- *
+ * <p/>
* The underlying protocol session is still available but clients should not
* use it to obtain session attributes.
*/
@@ -110,6 +110,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ //fixme - real value needed
+ _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = new AMQStateManager(this);
}
@@ -119,10 +121,11 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-
+ //fixme - real value needed
+ _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
-
+
}
public void init()
@@ -153,12 +156,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
getAMQConnection().setClientID(clientID);
}
-
+
public AMQStateManager getStateManager()
{
return _stateManager;
}
-
+
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
@@ -191,8 +194,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
/**
* Store the SASL client currently being used for the authentication handshake
+ *
* @param client if non-null, stores this in the session. if null clears any existing client
- * being stored
+ * being stored
*/
public void setSaslClient(SaslClient client)
{
@@ -223,6 +227,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
/**
* Callback invoked from the BasicDeliverMethodHandler when a message has been received.
* This is invoked on the MINA dispatcher thread.
+ *
* @param message
* @throws AMQException if this was not expected
*/
@@ -280,8 +285,9 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
/**
* Deliver a message to the appropriate session, removing the unprocessed message
* from our map
+ *
* @param channelId the channel id the message should be delivered to
- * @param msg the message
+ * @param msg the message
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
@@ -306,6 +312,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
WriteFuture f = _minaProtocolSession.write(frame);
if (wait)
{
+ //fixme -- time out?
f.join();
}
else
@@ -340,6 +347,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
/**
* Starts the process of closing a session
+ *
* @param session the AMQSession being closed
*/
public void closeSession(AMQSession session)
@@ -361,19 +369,27 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
* This method decides whether this is a response or an initiation. The latter
* case causes the AMQSession to be closed and an exception to be thrown if
* appropriate.
+ *
* @param channelId the id of the channel (session)
* @return true if the client must respond to the server, i.e. if the server
- * initiated the channel close, false if the channel close is just the server
- * responding to the client's earlier request to close the channel.
+ * initiated the channel close, false if the channel close is just the server
+ * responding to the client's earlier request to close the channel.
*/
- public boolean channelClosed(int channelId, int code, String text)
+ public boolean channelClosed(int channelId, int code, String text) throws AMQException
{
final Integer chId = channelId;
// if this is not a response to an earlier request to close the channel
if (_closingChannels.remove(chId) == null)
{
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
- session.closed(new AMQException(_logger, code, text));
+ try
+ {
+ session.closed(new AMQException(_logger, code, text));
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("JMSException received while closing session", e);
+ }
return true;
}
else
@@ -389,15 +405,20 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
public void closeProtocolSession()
{
+ closeProtocolSession(true);
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
_logger.debug("Waiting for last write to join.");
- if (_lastWriteFuture != null)
+ if (waitLast && _lastWriteFuture != null)
{
_lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
- future.join();
+ future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
public void failover(String host, int port)
@@ -408,17 +429,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
protected AMQShortString generateQueueName()
{
int id;
- synchronized(_queueIdLock)
+ synchronized (_queueIdLock)
{
id = _queueId++;
}
//get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
+ String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
/**
- *
* @param delay delay in seconds (not ms)
*/
void initHeartbeats(int delay)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index f96da300ff..1656695ba9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -91,7 +93,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
/**
* This method is called by the thread that wants to wait for a frame.
*/
- public AMQMethodEvent blockForFrame() throws AMQException
+ public AMQMethodEvent blockForFrame(long timeout) throws AMQException
{
synchronized (_lock)
{
@@ -99,11 +101,29 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
try
{
- _lock.wait();
+ if (timeout == -1)
+ {
+ _lock.wait();
+ }
+ else
+ {
+
+ _lock.wait(timeout);
+ if (!_ready)
+ {
+ _error = new AMQTimeoutException("Server did not respond in a timely fashion");
+ _ready = true;
+ }
+ }
}
catch (InterruptedException e)
{
- // IGNORE
+ // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
+ if (!_ready && timeout != -1)
+ {
+ _error = new AMQException("Server did not respond timely");
+ _ready = true;
+ }
}
}
}
@@ -115,7 +135,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
else if (_error instanceof FailoverException)
{
- throw (FailoverException)_error; // needed to expose FailoverException.
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw(FailoverException) _error; // needed to expose FailoverException.
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 18e1fdad82..b2940d73ae 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
/**
* Waits for a particular state to be reached.
- *
*/
public class StateWaiter implements StateListener
{
@@ -38,6 +37,7 @@ public class StateWaiter implements StateListener
private volatile Throwable _throwable;
private final Object _monitor = new Object();
+ private static final long TIME_OUT = 1000 * 60 * 2;
public StateWaiter(AMQState state)
{
@@ -46,7 +46,7 @@ public class StateWaiter implements StateListener
public void waituntilStateHasChanged() throws AMQException
{
- synchronized(_monitor)
+ synchronized (_monitor)
{
//
// The guard is required in case we are woken up by a spurious
@@ -57,7 +57,7 @@ public class StateWaiter implements StateListener
try
{
_logger.debug("State " + _state + " not achieved so waiting...");
- _monitor.wait();
+ _monitor.wait(TIME_OUT);
}
catch (InterruptedException e)
{
@@ -82,7 +82,7 @@ public class StateWaiter implements StateListener
public void stateChanged(AMQState oldState, AMQState newState)
{
- synchronized(_monitor)
+ synchronized (_monitor)
{
if (_logger.isDebugEnabled())
{
@@ -103,7 +103,7 @@ public class StateWaiter implements StateListener
public void error(Throwable t)
{
- synchronized(_monitor)
+ synchronized (_monitor)
{
if (_logger.isDebugEnabled())
{
diff --git a/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
new file mode 100644
index 0000000000..6af681f479
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+public class AMQTimeoutException extends AMQException
+{
+ public AMQTimeoutException(String message)
+ {
+ super(message);
+ }
+}