summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-02 16:37:28 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-02 16:37:28 +0000
commit74e1b99486b47d788f193533c2f630f8ce5cbff3 (patch)
tree6d580b945fcefa73ca05327ca1eb8cff0dea43ba
parent409f1c667a0eb41ebaefb8c072565e23641152dd (diff)
downloadqpid-python-74e1b99486b47d788f193533c2f630f8ce5cbff3.tar.gz
QPID-308
Broker: AMQStateManager - Added extra logging to keep track of what is going on as the findStateTransitionHandler is recursive. Client: AMQConnection - Comment Changes. Added timeouts to connections. AMQSession - Added timeout on closure FailoverHandler - Comment changes and adjusted logging AMQProtocolHandler - Comments changed and added timeouts to the syncwait calls. AMQProtocolSession - Added timeouts to writeFrame joins. BlockingMethodFrameListener - Added timeouts to blockFrame waits. AMQStateManager - Added additional logging ResetMessageListenerTest - Fixed logging level on a single log line. Created ManualTests Added MessageAgeAlert test case supplied by customer. MessageRequeueTest - Moved QpidClientConnection to its own class QpidClientConnection - Added based on a class from a customer. AMQTimeoutException - Added new exception based on timeouts AMQConstant - Added timeout constant AMQQueueAlertTest - adjusted values as my dual core would fail occasionally. BrokerFillMemoryRun - added test to fill the broker's memory. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@513835 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java109
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java107
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java91
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java25
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java102
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java61
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java259
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java273
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java39
16 files changed, 749 insertions, 475 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 4e9deeb8db..7c3988cd74 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -35,23 +35,19 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
- *
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
*/
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- /**
- * The current state
- */
+ /** The current state */
private AMQState _currentState;
/**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
- * The class must be a subclass of AMQFrame.
+ * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+ * AMQFrame.
*/
private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>();
@@ -159,9 +155,9 @@ public class AMQStateManager implements AMQMethodListener
}
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
- AMQProtocolSession protocolSession,
- QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry) throws AMQException
+ AMQProtocolSession protocolSession,
+ QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry) throws AMQException
{
StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
@@ -173,20 +169,19 @@ public class AMQStateManager implements AMQMethodListener
}
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
- B frame)
+ B frame)
throws IllegalStateTransitionException
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Looking for state transition handler for frame " + frame.getClass());
+ _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + frame.getClass());
}
final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>
classToHandlerMap = _state2HandlersMap.get(currentState);
if (classToHandlerMap == null)
{
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
+ _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
return findStateTransitionHandler(null, frame);
}
final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
@@ -199,8 +194,7 @@ public class AMQStateManager implements AMQMethodListener
}
else
{
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
+ _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
return findStateTransitionHandler(null, frame);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 994ed7944e..879f2f9548 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -64,23 +64,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
+ private static final long MAXIMUM_WAIT_TIME = 30000l;
+
private AtomicInteger _idFactory = new AtomicInteger(0);
/**
- * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
- * This must be held by any child objects of this connection such as the session, producers and consumers.
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+ * held by any child objects of this connection such as the session, producers and consumers.
*/
private final Object _failoverMutex = new Object();
/**
- * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
- * per session and we must prevent the client from opening too many. Zero means unlimited.
+ * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+ * and we must prevent the client from opening too many. Zero means unlimited.
*/
private long _maximumChannelCount;
- /**
- * The maximum size of frame supported by the server
- */
+ /** The maximum size of frame supported by the server */
private long _maximumFrameSize;
/**
@@ -90,26 +90,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private AMQProtocolHandler _protocolHandler;
- /**
- * Maps from session id (Integer) to AMQSession instance
- */
+ /** Maps from session id (Integer) to AMQSession instance */
private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
- /**
- * The user name to use for authentication
- */
+ /** The user name to use for authentication */
private String _username;
- /**
- * The password to use for authentication
- */
+ /** The password to use for authentication */
private String _password;
- /**
- * The virtual path to connect to on the AMQ server
- */
+ /** The virtual path to connect to on the AMQ server */
private String _virtualHost;
private ExceptionListener _exceptionListener;
@@ -119,14 +111,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private ConnectionURL _connectionURL;
/**
- * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
- * message publication.
+ * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+ * publication.
*/
private boolean _started;
- /**
- * Policy dictating how to failover
- */
+ /** Policy dictating how to failover */
private FailoverPolicy _failoverPolicy;
/*
@@ -353,8 +343,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Get the details of the currently active broker
*
- * @return null if no broker is active (i.e. no successful connection has been made, or
- * the BrokerDetail instance otherwise
+ * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+ * otherwise
*/
public BrokerDetails getActiveBrokerDetails()
{
@@ -507,12 +497,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return QueueSession
+ *
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -521,12 +513,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return TopicSession
+ *
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -623,14 +617,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
synchronized (getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
try
{
- closeAllSessions(null);
- _protocolHandler.closeConnection();
+ closeAllSessions(null, timeout);
+ _protocolHandler.closeConnection(timeout);
}
catch (AMQException e)
{
@@ -641,11 +640,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Marks all sessions and their children as closed without sending any protocol messages. Useful when
- * you need to mark objects "visible" in userland as closed after failover or other significant event that
- * impacts the connection.
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+ * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+ * connection. <p/> The caller must hold the failover mutex before calling this method.
*/
private void markAllSessionsClosed()
{
@@ -663,11 +660,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Close all the sessions, either due to normal connection closure or due to an error occurring.
*
- * @param cause if not null, the error that is causing this shutdown
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+ * before calling this method.
*/
- private void closeAllSessions(Throwable cause) throws JMSException
+ private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
final LinkedList sessionCopy = new LinkedList(_sessions.values());
final Iterator it = sessionCopy.iterator();
@@ -683,7 +679,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- session.close();
+ session.close(timeout);
}
catch (JMSException e)
{
@@ -788,7 +784,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _protocolHandler;
}
-
+
public boolean started()
{
return _started;
@@ -814,6 +810,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* Fire the preFailover event to the registered connection listener (if any)
*
* @param redirect true if this is the result of a redirect request rather than a connection error
+ *
* @return true if no listener or listener does not veto change
*/
public boolean firePreFailover(boolean redirect)
@@ -827,10 +824,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Fire the preResubscribe event to the registered connection listener (if any). If the listener
- * vetoes resubscription then all the sessions are closed.
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+ * resubscription then all the sessions are closed.
*
* @return true if no listener or listener does not veto resubscription.
+ *
* @throws JMSException
*/
public boolean firePreResubscribe() throws JMSException
@@ -850,9 +848,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- /**
- * Fires a failover complete event to the registered connection listener (if any).
- */
+ /** Fires a failover complete event to the registered connection listener (if any). */
public void fireFailoverComplete()
{
if (_connectionListener != null)
@@ -862,8 +858,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * In order to protect the consistency of the connection and its child sessions, consumers and producers,
- * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+ * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+ * "failover mutex" must be held when doing any operations that could be corrupted during failover.
*
* @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
*/
@@ -873,8 +869,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * If failover is taking place this will block until it has completed. If failover
- * is not taking place it will return immediately.
+ * If failover is taking place this will block until it has completed. If failover is not taking place it will
+ * return immediately.
*
* @throws InterruptedException
*/
@@ -884,10 +880,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
- * This method sends the exception to a JMS exception listener, if configured, and
- * propagates the exception to sessions, which in turn will propagate to consumers.
- * This allows synchronous consumers to have exceptions thrown to them.
+ * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+ * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+ * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
*/
@@ -937,7 +932,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Closing AMQConnection due to :" + cause.getMessage());
_closed.set(true);
- closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, MAXIMUM_WAIT_TIME); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
catch (JMSException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0648ed472d..6de63be63a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -338,7 +338,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_queue.add(msg);
}
-
+
if (stopped)
{
_dispatcher.setConnectionStopped(stopped);
@@ -648,6 +648,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close() throws JMSException
{
+ close(-1);
+ }
+
+ public void close(long timeout) throws JMSException
+ {
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
synchronized (_connection.getFailoverMutex())
@@ -657,7 +662,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// we pass null since this is not an error case
- closeProducersAndConsumers(null);
+ closeProducersAndConsumers(null);
try
{
@@ -671,7 +676,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
0, // methodId
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
"JMS client closing channel"); // replyText
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+
+ _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
@@ -696,8 +702,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param amqe the exception, may be null to indicate no error has occurred
*/
- private void closeProducersAndConsumers(AMQException amqe)
+ private void closeProducersAndConsumers(AMQException amqe) throws JMSException
{
+ JMSException jmse = null;
+
try
{
closeProducers();
@@ -705,6 +713,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ jmse = e;
}
try
{
@@ -713,6 +722,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (JMSException e)
{
_logger.error("Error closing session: " + e, e);
+ if (jmse == null)
+ {
+ jmse = e;
+ }
+ }
+
+ if (jmse != null)
+ {
+ throw jmse;
}
}
@@ -727,7 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @param e the exception that caused this session to be closed. Null causes the
*/
- public void closed(Throwable e)
+ public void closed(Throwable e) throws JMSException
{
synchronized (_connection.getFailoverMutex())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 53291a3fd0..d9ae7e91ec 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -31,13 +31,11 @@ import org.apache.log4j.Logger;
import java.util.concurrent.CountDownLatch;
/**
- * When failover is required, we need a separate thread to handle the establishment of the new connection and
- * the transfer of subscriptions.
- * </p>
- * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor
- * thread. One significant task is the connection setup which involves a protocol exchange until a particular state
- * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means
- * the IO processor is not able to do anything at all.
+ * When failover is required, we need a separate thread to handle the establishment of the new connection and the
+ * transfer of subscriptions. </p> The reason this needs to be a separate thread is because you cannot do this work
+ * inside the MINA IO processor thread. One significant task is the connection setup which involves a protocol exchange
+ * until a particular state is achieved. However if you do this in the MINA thread, you have to block until the state is
+ * achieved which means the IO processor is not able to do anything at all.
*/
public class FailoverHandler implements Runnable
{
@@ -46,14 +44,10 @@ public class FailoverHandler implements Runnable
private final IoSession _session;
private AMQProtocolHandler _amqProtocolHandler;
- /**
- * Used where forcing the failover host
- */
+ /** Used where forcing the failover host */
private String _host;
- /**
- * Used where forcing the failover port
- */
+ /** Used where forcing the failover port */
private int _port;
public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
@@ -82,8 +76,6 @@ public class FailoverHandler implements Runnable
// client code which runs in a separate thread.
synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
{
- _logger.info("Starting failover process");
-
// We switch in a new state manager temporarily so that the interaction to get to the "connection open"
// state works, without us having to terminate any existing "state waiters". We could theoretically
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
@@ -92,6 +84,8 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(new AMQStateManager());
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
+ _logger.info("Starting failover process");
+
_amqProtocolHandler.setStateManager(existingStateManager);
if (_host != null)
{
@@ -105,6 +99,9 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverLatch(null);
return;
}
+
+ _logger.info("Starting failover process");
+
boolean failoverSucceeded;
// when host is non null we have a specified failover host otherwise we all the client to cycle through
// all specified hosts
@@ -123,7 +120,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setStateManager(existingStateManager);
_amqProtocolHandler.getConnection().exceptionReceived(
new AMQDisconnectedException("Server closed connection and no failover " +
- "was successful"));
+ "was successful"));
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 09a6f3be38..3d8c208ea4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -29,6 +29,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -58,21 +59,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
/**
- * The connection that this protocol handler is associated with. There is a 1-1
- * mapping between connection instances and protocol handler instances.
+ * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
+ * and protocol handler instances.
*/
private AMQConnection _connection;
/**
- * Used only when determining whether to add the SSL filter or not. This should be made more
- * generic in future since we will potentially have many transport layer options
+ * Used only when determining whether to add the SSL filter or not. This should be made more generic in future since
+ * we will potentially have many transport layer options
*/
private boolean _useSSL;
- /**
- * Our wrapper for a protocol session that provides access to session values
- * in a typesafe manner.
- */
+ /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
private volatile AMQProtocolSession _protocolSession;
private AMQStateManager _stateManager = new AMQStateManager();
@@ -86,6 +84,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
private FailoverHandler _failoverHandler;
+ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
/**
* This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
* attempting failover where it is failing.
@@ -175,6 +175,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* sessionClosed() depending on whether we were trying to send data at the time of failure.
*
* @param session
+ *
* @throws Exception
*/
public void sessionClosed(IoSession session) throws Exception
@@ -229,9 +230,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.info("Protocol Session [" + this + "] closed");
}
- /**
- * See {@link FailoverHandler} to see rationale for separate thread.
- */
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
Thread failoverThread = new Thread(_failoverHandler);
@@ -294,10 +293,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * There are two cases where we have other threads potentially blocking for events to be handled by this
- * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
- * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
- * react appropriately.
+ * There are two cases where we have other threads potentially blocking for events to be handled by this class.
+ * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
+ * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
* @param e the exception to propagate
*/
@@ -406,8 +404,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -422,9 +420,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
/**
- * Convenience method that writes a frame to the protocol session and waits for
- * a particular response. Equivalent to calling getProtocolSession().write() then
- * waiting for the response.
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
*
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
@@ -433,12 +430,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter
BlockingMethodFrameListener listener)
throws AMQException
{
+ return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
+ * calling getProtocolSession().write() then waiting for the response.
+ *
+ * @param frame
+ * @param listener the blocking listener. Note the calling thread will block.
+ */
+ private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ BlockingMethodFrameListener listener,
+ long timeout)
+ throws AMQException
+ {
try
{
_frameListeners.add(listener);
_protocolSession.writeFrame(frame);
- AMQMethodEvent e = listener.blockForFrame();
+ AMQMethodEvent e = listener.blockForFrame(timeout);
return e;
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
@@ -451,19 +463,23 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
- /**
- * More convenient method to write a frame and wait for it's response.
- */
+ /** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
{
+ return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
+ }
+
+ /** More convenient method to write a frame and wait for it's response. */
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
+ {
return writeCommandFrameAndWaitForReply(frame,
- new SpecificMethodFrameListener(frame.channel, responseClass));
+ new SpecificMethodFrameListener(frame.channel, responseClass),
+ timeout);
}
/**
- * Convenience method to register an AMQSession with the protocol handler. Registering
- * a session with the protocol handler will ensure that messages are delivered to the
- * consumer(s) on that session.
+ * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
+ * handler will ensure that messages are delivered to the consumer(s) on that session.
*
* @param channelId the channel id of the session
* @param session the session instance.
@@ -490,33 +506,40 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
+ closeConnection(-1);
+ }
+
+ public void closeConnection(long timeout) throws AMQException
+ {
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- "JMS client is closing the connection."); // replyText
- syncWrite(frame, ConnectionCloseOkBody.class);
-
- _protocolSession.closeProtocolSession();
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "JMS client is closing the connection."); // replyText
+ try
+ {
+ syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+ _protocolSession.closeProtocolSession();
+ }
+ catch (AMQTimeoutException e)
+ {
+ _protocolSession.closeProtocolSession(false);
+ }
}
- /**
- * @return the number of bytes read from this protocol session
- */
+ /** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
return _protocolSession.getIoSession().getReadBytes();
}
- /**
- * @return the number of bytes written to this protocol session
- */
+ /** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
return _protocolSession.getIoSession().getWrittenBytes();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6a40fd3133..7b27d5b1f9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -46,13 +46,12 @@ import java.util.concurrent.ConcurrentMap;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
*
- * The underlying protocol session is still available but clients should not
- * use it to obtain session attributes.
+ * The underlying protocol session is still available but clients should not use it to obtain session attributes.
*/
public class AMQProtocolSession implements ProtocolVersionList
{
- protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+ protected static final int WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 30;
protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -69,33 +68,29 @@ public class AMQProtocolSession implements ProtocolVersionList
protected WriteFuture _lastWriteFuture;
/**
- * The handler from which this session was created and which is used to handle protocol events.
- * We send failover events to the handler.
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
*/
protected final AMQProtocolHandler _protocolHandler;
- /**
- * Maps from the channel id to the AMQSession that it represents.
- */
+ /** Maps from the channel id to the AMQSession that it represents. */
protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
- * Maps from a channel id to an unprocessed message. This is used to tie together the
- * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+ * first) with the subsequent content header and content bodies.
*/
protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
- /**
- * Counter to ensure unique queue names
- */
+ /** Counter to ensure unique queue names */
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
/**
- * No-arg constructor for use by test subclass - has to initialise final vars
- * NOT intended for use other then for test
+ * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
+ * test
*/
public AMQProtocolSession()
{
@@ -167,8 +162,8 @@ public class AMQProtocolSession implements ProtocolVersionList
/**
* Store the SASL client currently being used for the authentication handshake
- * @param client if non-null, stores this in the session. if null clears any existing client
- * being stored
+ *
+ * @param client if non-null, stores this in the session. if null clears any existing client being stored
*/
public void setSaslClient(SaslClient client)
{
@@ -197,9 +192,11 @@ public class AMQProtocolSession implements ProtocolVersionList
}
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
- * This is invoked on the MINA dispatcher thread.
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+ * dispatcher thread.
+ *
* @param message
+ *
* @throws AMQException if this was not expected
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
@@ -254,10 +251,10 @@ public class AMQProtocolSession implements ProtocolVersionList
}
/**
- * Deliver a message to the appropriate session, removing the unprocessed message
- * from our map
+ * Deliver a message to the appropriate session, removing the unprocessed message from our map
+ *
* @param channelId the channel id the message should be delivered to
- * @param msg the message
+ * @param msg the message
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
@@ -267,8 +264,8 @@ public class AMQProtocolSession implements ProtocolVersionList
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -282,7 +279,7 @@ public class AMQProtocolSession implements ProtocolVersionList
WriteFuture f = _minaProtocolSession.write(frame);
if (wait)
{
- f.join();
+ f.join(WRITE_FUTURE_JOIN_TIMEOUT);
}
else
{
@@ -316,6 +313,7 @@ public class AMQProtocolSession implements ProtocolVersionList
/**
* Starts the process of closing a session
+ *
* @param session the AMQSession being closed
*/
public void closeSession(AMQSession session)
@@ -333,23 +331,30 @@ public class AMQProtocolSession implements ProtocolVersionList
}
/**
- * Called from the ChannelClose handler when a channel close frame is received.
- * This method decides whether this is a response or an initiation. The latter
- * case causes the AMQSession to be closed and an exception to be thrown if
+ * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+ * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
* appropriate.
+ *
* @param channelId the id of the channel (session)
- * @return true if the client must respond to the server, i.e. if the server
- * initiated the channel close, false if the channel close is just the server
- * responding to the client's earlier request to close the channel.
+ *
+ * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+ * the channel close is just the server responding to the client's earlier request to close the channel.
*/
- public boolean channelClosed(int channelId, int code, String text)
+ public boolean channelClosed(int channelId, int code, String text) throws AMQException
{
final Integer chId = channelId;
// if this is not a response to an earlier request to close the channel
if (_closingChannels.remove(chId) == null)
{
final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
- session.closed(new AMQException(_logger, code, text));
+ try
+ {
+ session.closed(new AMQException(_logger, code, text));
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException("JMSException received while closing session", e);
+ }
return true;
}
else
@@ -365,15 +370,20 @@ public class AMQProtocolSession implements ProtocolVersionList
public void closeProtocolSession()
{
+ closeProtocolSession(true);
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
_logger.debug("Waiting for last write to join.");
- if (_lastWriteFuture != null)
+ if (waitLast && _lastWriteFuture != null)
{
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _lastWriteFuture.join(WRITE_FUTURE_JOIN_TIMEOUT);
}
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
- future.join();
+ future.join(WRITE_FUTURE_JOIN_TIMEOUT);
}
public void failover(String host, int port)
@@ -384,19 +394,16 @@ public class AMQProtocolSession implements ProtocolVersionList
protected String generateQueueName()
{
int id;
- synchronized(_queueIdLock)
+ synchronized (_queueIdLock)
{
id = _queueId++;
}
//get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
+ String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
return "tmp_" + localAddress + "_" + id;
}
- /**
- *
- * @param delay delay in seconds (not ms)
- */
+ /** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
if (delay > 0)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 5342eb86f6..48cf8d9366 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -33,8 +34,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
private final Object _lock = new Object();
/**
- * This is set if there is an exception thrown from processCommandFrame and the
- * exception is rethrown to the caller of blockForFrame()
+ * This is set if there is an exception thrown from processCommandFrame and the exception is rethrown to the caller
+ * of blockForFrame()
*/
private volatile Exception _error;
@@ -48,11 +49,13 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
/**
- * This method is called by the MINA dispatching thread. Note that it could
- * be called before blockForFrame() has been called.
+ * This method is called by the MINA dispatching thread. Note that it could be called before blockForFrame() has
+ * been called.
*
* @param evt the frame event
+ *
* @return true if the listener has dealt with this frame
+ *
* @throws AMQException
*/
public boolean methodReceived(AMQMethodEvent evt) throws AMQException
@@ -85,10 +88,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
}
- /**
- * This method is called by the thread that wants to wait for a frame.
- */
- public AMQMethodEvent blockForFrame() throws AMQException
+ /** This method is called by the thread that wants to wait for a frame. */
+ public AMQMethodEvent blockForFrame(long timeout) throws AMQException
{
synchronized (_lock)
{
@@ -96,7 +97,20 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
try
{
- _lock.wait();
+ if (timeout == -1)
+ {
+ _lock.wait();
+ }
+ else
+ {
+
+ _lock.wait(timeout);
+ if (!_ready)
+ {
+ _error = new AMQTimeoutException("Server did not respond in a timely fashion");
+ _ready = true;
+ }
+ }
}
catch (InterruptedException e)
{
@@ -125,8 +139,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
/**
- * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
- * class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ * This is a callback, called by the MINA dispatcher thread only. It is also called from within this class to avoid
+ * code repetition but again is only called by the MINA dispatcher thread.
*
* @param e
*/
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 56b49230ff..105ac462ae 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -33,22 +33,19 @@ import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
*/
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- /**
- * The current state
- */
+ /** The current state */
private AMQState _currentState;
/**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
- * The class must be a subclass of AMQFrame.
+ * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+ * AMQFrame.
*/
private final Map _state2HandlersMap = new HashMap();
@@ -159,14 +156,13 @@ public class AMQStateManager implements AMQMethodListener
final Class clazz = frame.getClass();
if (_logger.isDebugEnabled())
{
- _logger.debug("Looking for state transition handler for frame " + clazz);
+ _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
}
final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
if (classToHandlerMap == null)
{
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
+ _logger.debug("no specialised per state handler is registered look for a handler registered for 'all' states");
return findStateTransitionHandler(null, frame);
}
final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
@@ -174,13 +170,12 @@ public class AMQStateManager implements AMQMethodListener
{
if (currentState == null)
{
- _logger.debug("No state transition handler defined for receiving frame " + frame);
+ _logger.debug("No state[" + currentState + "] transition handler defined for receiving frame " + frame);
return null;
}
else
{
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
+ _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states");
return findStateTransitionHandler(null, frame);
}
}
@@ -214,7 +209,7 @@ public class AMQStateManager implements AMQMethodListener
}
if (_currentState != s)
{
- _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java
new file mode 100644
index 0000000000..13c44d14f5
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java
@@ -0,0 +1,102 @@
+package org.apache.qpid.ManualTests;
+
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.QpidClientConnection;
+
+import javax.jms.JMSException;
+
+/** NF101: heap exhaustion behaviour
+ * Provided by customer
+ */
+public class BrokerFillMemoryRun extends TestCase
+{
+ protected QpidClientConnection conn;
+ protected final String vhost = "/test";
+ protected final String queue = "direct://amq.direct//queue";
+
+ protected String hundredK;
+ protected String megabyte;
+
+ protected final String BROKER = "tcp://localhost:5672";
+
+
+ protected void log(String msg)
+ {
+ System.out.println(msg);
+ }
+
+ protected String generatePayloadOfSize(Integer numBytes)
+ {
+ return new String(new byte[numBytes]);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ conn = new QpidClientConnection(BROKER);
+ conn.setVirtualHost(vhost);
+
+ conn.connect();
+ // clear queue
+ log("setup: clearing test queue");
+ conn.consume(queue, 2000);
+
+ hundredK = generatePayloadOfSize(1024 * 100);
+ megabyte = generatePayloadOfSize(1024 * 1024);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ try
+ {
+ conn.disconnect();
+ }
+ catch (JMSException d)
+ {
+ log("disconnectJMSE:" + d.getMessage());
+ }
+ }
+
+
+ /** PUT at maximum rate (although we commit after each PUT) until failure */
+ public void testUntilFailure() throws Exception
+ {
+ int copies = 0;
+ int total = 0;
+ String payload = hundredK;
+ int size = payload.getBytes().length + String.valueOf("0").getBytes().length;
+ while (true)
+ {
+ conn.put(queue, payload, 1);
+ copies++;
+ total += size;
+ log("put copy " + copies + " OK for total bytes: " + total);
+ }
+ }
+
+ /** PUT at lower rate (5 per second) until failure */
+ public void testUntilFailureWithDelays() throws Exception
+ {
+ try
+ {
+ int copies = 0;
+ int total = 0;
+ String payload = hundredK;
+ int size = payload.getBytes().length + String.valueOf("0").getBytes().length;
+ while (true)
+ {
+ conn.put(queue, payload, 1);
+ copies++;
+ total += size;
+ log("put copy " + copies + " OK for total bytes: " + total);
+ Thread.sleep(200);
+ }
+ }
+ catch (JMSException e)
+ {
+ log("putJMSE:" + e.getMessage());
+ }
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java
new file mode 100644
index 0000000000..93810936d2
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java
@@ -0,0 +1,61 @@
+package org.apache.qpid.ManualTests;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.client.transport.TransportConnection;
+
+/** FT401: alert on message age
+ * Provided by customer
+ */
+public class MessageAgeAlert extends TestCase
+{
+ protected QpidClientConnection conn;
+ protected final String queue = "direct://amq.direct//queue";
+ protected final String vhost = "/test";
+
+ protected String payload = "xyzzy";
+
+ protected Integer agePeriod = 30000;
+
+ protected final String BROKER = "localhost";
+
+ protected void log(String msg)
+ {
+ System.out.println(msg);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+// TransportConnection.createVMBroker(1);
+
+ conn = new QpidClientConnection(BROKER);
+ conn.setVirtualHost(vhost);
+
+ conn.connect();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ conn.disconnect();
+// TransportConnection.killVMBroker(1);
+ }
+
+ /**
+ * put a message and wait for age alert
+ *
+ * @throws Exception on error
+ */
+ public void testSinglePutThenWait() throws Exception
+ {
+ conn.put(queue, payload, 1);
+ log("waiting ms: " + agePeriod);
+ Thread.sleep(agePeriod);
+ log("wait period over");
+ conn.put(queue, payload, 1);
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
index 28bb2b614b..b792359906 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
@@ -239,7 +239,7 @@ public class ResetMessageListenerTest extends TestCase
try
{
- _logger.error("Send additional messages");
+ _logger.info("Send additional messages");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 67049765a5..86721d54c6 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -40,9 +40,14 @@ import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.testutil.QpidClientConnection;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
+/**
+ * based on FT304 - Competing consumers
+ * provided by customer
+ */
public class MessageRequeueTest extends TestCase
{
@@ -67,7 +72,7 @@ public class MessageRequeueTest extends TestCase
Logger session = Logger.getLogger("org.apache.qpid.client.AMQSession");
session.setLevel(Level.ERROR);
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -77,7 +82,7 @@ public class MessageRequeueTest extends TestCase
conn.put(queue, payload, numTestMessages);
// close this connection
conn.disconnect();
- }
+ }
protected void tearDown() throws Exception
{
@@ -139,7 +144,7 @@ public class MessageRequeueTest extends TestCase
try
{
_logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -179,252 +184,4 @@ public class MessageRequeueTest extends TestCase
return id;
}
}
-
-
- public class QpidClientConnection implements ExceptionListener
- {
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection()
- {
- super();
- setVirtualHost("/test");
- setBrokerList(BROKER);
- setPrefetch(5000);
- }
-
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
- }
- }
- }
-
- public void disconnect() throws JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
-
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public String getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
-
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- String result;
-
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message).getText();
- }
- else if (null == message)
- {
- result = null;
- }
- else
- {
- _logger.info("warning: received non-text message");
- result = message.toString();
- }
-
- return result;
- }
-
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public String getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
-
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
-
- session.commit();
- consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
- }
-
} \ No newline at end of file
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
new file mode 100644
index 0000000000..c6bbcc416b
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -0,0 +1,273 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+/**
+ * This shouldn't be used for making production connections.
+ * As it has some unusual operations.
+ *
+ * Supplied by customer
+ */
+public class QpidClientConnection implements ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String BROKER)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(BROKER);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public String getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ String result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message).getText();
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message.toString();
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public String getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+ }
+
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
new file mode 100644
index 0000000000..9dea7a7d01
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AMQTimeoutException extends AMQException
+{
+ public AMQTimeoutException(String message)
+ {
+ super(AMQConstant.REQUEST_TIMEOUT.getCode(), message);
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index a0d243ca30..52ebc0485c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -82,6 +82,8 @@ public final class AMQConstant
public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true);
+
public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index fee091e3bd..978126fa39 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -26,13 +26,11 @@ import org.apache.qpid.framing.ContentHeaderBody;
import javax.management.Notification;
-/**
- * This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters
- */
+/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
{
- private final static int MAX_MESSAGE_COUNT = 50;
- private final static long MAX_MESSAGE_AGE = 2000; // 2 sec
+ private final static int MAX_MESSAGE_COUNT = 50;
+ private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
private AMQQueue _queue;
@@ -42,19 +40,20 @@ public class AMQQueueAlertTest extends TestCase
/**
* Tests if the alert gets thrown when message count increases the threshold limit
+ *
* @throws Exception
*/
public void testMessageCountAlert() throws Exception
{
_queue = new AMQQueue("testQueue1", false, "AMQueueAlertTest", false, _queueRegistry);
- _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
sendMessages(MAX_MESSAGE_COUNT, 256l);
assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
- Notification lastNotification= _queueMBean.getLastNotification();
+ Notification lastNotification = _queueMBean.getLastNotification();
assertNotNull(lastNotification);
String notificationMsg = lastNotification.getMessage();
@@ -63,19 +62,20 @@ public class AMQQueueAlertTest extends TestCase
/**
* Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent
+ *
* @throws Exception
*/
public void testMessageSizeAlert() throws Exception
{
_queue = new AMQQueue("testQueue2", false, "AMQueueAlertTest", false, _queueRegistry);
- _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
sendMessages(1, MAX_MESSAGE_SIZE * 2);
assertTrue(_queueMBean.getMessageCount() == 1);
- Notification lastNotification= _queueMBean.getLastNotification();
+ Notification lastNotification = _queueMBean.getLastNotification();
assertNotNull(lastNotification);
String notificationMsg = lastNotification.getMessage();
@@ -84,12 +84,13 @@ public class AMQQueueAlertTest extends TestCase
/**
* Tests if Queue Depth alert is thrown when queue depth reaches the threshold value
+ *
* @throws Exception
*/
public void testQueueDepthAlert() throws Exception
{
_queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry);
- _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
@@ -98,7 +99,7 @@ public class AMQQueueAlertTest extends TestCase
sendMessages(1, MAX_MESSAGE_SIZE);
}
- Notification lastNotification= _queueMBean.getLastNotification();
+ Notification lastNotification = _queueMBean.getLastNotification();
assertNotNull(lastNotification);
String notificationMsg = lastNotification.getMessage();
@@ -106,23 +107,27 @@ public class AMQQueueAlertTest extends TestCase
}
/**
- * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than
- * threshold value of message age
+ * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
+ * message age
+ *
* @throws Exception
*/
public void testMessageAgeAlert() throws Exception
{
_queue = new AMQQueue("testQueue4", false, "AMQueueAlertTest", false, _queueRegistry);
- _queueMBean = (AMQQueueMBean)_queue.getManagedObject();
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
sendMessages(1, MAX_MESSAGE_SIZE);
- Thread.sleep(MAX_MESSAGE_AGE);
+
+ // Ensure message sits on queue long enough to age.
+ Thread.sleep(MAX_MESSAGE_AGE * 2);
+
sendMessages(1, MAX_MESSAGE_SIZE);
assertTrue(_queueMBean.getMessageCount() == 2);
- Notification lastNotification= _queueMBean.getLastNotification();
+ Notification lastNotification = _queueMBean.getLastNotification();
assertNotNull(lastNotification);
String notificationMsg = lastNotification.getMessage();
@@ -133,7 +138,7 @@ public class AMQQueueAlertTest extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
+ BasicPublishBody publish = new BasicPublishBody((byte) 8, (byte) 0);
publish.immediate = immediate;
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes