diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-03-02 16:37:28 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-03-02 16:37:28 +0000 |
commit | 74e1b99486b47d788f193533c2f630f8ce5cbff3 (patch) | |
tree | 6d580b945fcefa73ca05327ca1eb8cff0dea43ba | |
parent | 409f1c667a0eb41ebaefb8c072565e23641152dd (diff) | |
download | qpid-python-74e1b99486b47d788f193533c2f630f8ce5cbff3.tar.gz |
QPID-308
Broker:
AMQStateManager - Added extra logging to keep track of what is going on as the findStateTransitionHandler is recursive.
Client:
AMQConnection - Comment Changes. Added timeouts to connections.
AMQSession - Added timeout on closure
FailoverHandler - Comment changes and adjusted logging
AMQProtocolHandler - Comments changed and added timeouts to the syncwait calls.
AMQProtocolSession - Added timeouts to writeFrame joins.
BlockingMethodFrameListener - Added timeouts to blockFrame waits.
AMQStateManager - Added additional logging
ResetMessageListenerTest - Fixed logging level on a single log line.
Created ManualTests
Added MessageAgeAlert test case supplied by customer.
MessageRequeueTest - Moved QpidClientConnection to its own class
QpidClientConnection - Added based on a class from a customer.
AMQTimeoutException - Added new exception based on timeouts
AMQConstant - Added timeout constant
AMQQueueAlertTest - adjusted values as my dual core would fail occasionally.
BrokerFillMemoryRun - added test to fill the broker's memory.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@513835 13f79535-47bb-0310-9956-ffa450edef68
16 files changed, 749 insertions, 475 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 4e9deeb8db..7c3988cd74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -35,23 +35,19 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; /** - * The state manager is responsible for managing the state of the protocol session. - * <p/> - * For each AMQProtocolHandler there is a separate state manager. - * + * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler + * there is a separate state manager. */ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - /** - * The current state - */ + /** The current state */ private AMQState _currentState; /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. - * The class must be a subclass of AMQFrame. + * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of + * AMQFrame. */ private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap = new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(); @@ -159,9 +155,9 @@ public class AMQStateManager implements AMQMethodListener } public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException + AMQProtocolSession protocolSession, + QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry) throws AMQException { StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) @@ -173,20 +169,19 @@ public class AMQStateManager implements AMQMethodListener } protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState, - B frame) + B frame) throws IllegalStateTransitionException { if (_logger.isDebugEnabled()) { - _logger.debug("Looking for state transition handler for frame " + frame.getClass()); + _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + frame.getClass()); } final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap = _state2HandlersMap.get(currentState); if (classToHandlerMap == null) { - // if no specialised per state handler is registered look for a - // handler registered for "all" states + _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states"); return findStateTransitionHandler(null, frame); } final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); @@ -199,8 +194,7 @@ public class AMQStateManager implements AMQMethodListener } else { - // if no specialised per state handler is registered look for a - // handler registered for "all" states + _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states"); return findStateTransitionHandler(null, frame); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 994ed7944e..879f2f9548 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -64,23 +64,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { private static final Logger _logger = Logger.getLogger(AMQConnection.class); + private static final long MAXIMUM_WAIT_TIME = 30000l; + private AtomicInteger _idFactory = new AtomicInteger(0); /** - * This is the "root" mutex that must be held when doing anything that could be impacted by failover. - * This must be held by any child objects of this connection such as the session, producers and consumers. + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be + * held by any child objects of this connection such as the session, producers and consumers. */ private final Object _failoverMutex = new Object(); /** - * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels - * per session and we must prevent the client from opening too many. Zero means unlimited. + * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session + * and we must prevent the client from opening too many. Zero means unlimited. */ private long _maximumChannelCount; - /** - * The maximum size of frame supported by the server - */ + /** The maximum size of frame supported by the server */ private long _maximumFrameSize; /** @@ -90,26 +90,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private AMQProtocolHandler _protocolHandler; - /** - * Maps from session id (Integer) to AMQSession instance - */ + /** Maps from session id (Integer) to AMQSession instance */ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; - /** - * The user name to use for authentication - */ + /** The user name to use for authentication */ private String _username; - /** - * The password to use for authentication - */ + /** The password to use for authentication */ private String _password; - /** - * The virtual path to connect to on the AMQ server - */ + /** The virtual path to connect to on the AMQ server */ private String _virtualHost; private ExceptionListener _exceptionListener; @@ -119,14 +111,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ConnectionURL _connectionURL; /** - * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for - * message publication. + * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message + * publication. */ private boolean _started; - /** - * Policy dictating how to failover - */ + /** Policy dictating how to failover */ private FailoverPolicy _failoverPolicy; /* @@ -353,8 +343,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Get the details of the currently active broker * - * @return null if no broker is active (i.e. no successful connection has been made, or - * the BrokerDetail instance otherwise + * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance + * otherwise */ public BrokerDetails getActiveBrokerDetails() { @@ -507,12 +497,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return QueueSession + * * @throws JMSException */ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -521,12 +513,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return TopicSession + * * @throws JMSException */ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -623,14 +617,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { + close(-1); + } + + public void close(long timeout) throws JMSException + { synchronized (getFailoverMutex()) { if (!_closed.getAndSet(true)) { try { - closeAllSessions(null); - _protocolHandler.closeConnection(); + closeAllSessions(null, timeout); + _protocolHandler.closeConnection(timeout); } catch (AMQException e) { @@ -641,11 +640,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Marks all sessions and their children as closed without sending any protocol messages. Useful when - * you need to mark objects "visible" in userland as closed after failover or other significant event that - * impacts the connection. - * <p/> - * The caller must hold the failover mutex before calling this method. + * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to + * mark objects "visible" in userland as closed after failover or other significant event that impacts the + * connection. <p/> The caller must hold the failover mutex before calling this method. */ private void markAllSessionsClosed() { @@ -663,11 +660,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Close all the sessions, either due to normal connection closure or due to an error occurring. * - * @param cause if not null, the error that is causing this shutdown - * <p/> - * The caller must hold the failover mutex before calling this method. + * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex + * before calling this method. */ - private void closeAllSessions(Throwable cause) throws JMSException + private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); @@ -683,7 +679,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - session.close(); + session.close(timeout); } catch (JMSException e) { @@ -788,7 +784,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _protocolHandler; } - + public boolean started() { return _started; @@ -814,6 +810,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error + * * @return true if no listener or listener does not veto change */ public boolean firePreFailover(boolean redirect) @@ -827,10 +824,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Fire the preResubscribe event to the registered connection listener (if any). If the listener - * vetoes resubscription then all the sessions are closed. + * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes + * resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. + * * @throws JMSException */ public boolean firePreResubscribe() throws JMSException @@ -850,9 +848,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - /** - * Fires a failover complete event to the registered connection listener (if any). - */ + /** Fires a failover complete event to the registered connection listener (if any). */ public void fireFailoverComplete() { if (_connectionListener != null) @@ -862,8 +858,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * In order to protect the consistency of the connection and its child sessions, consumers and producers, - * the "failover mutex" must be held when doing any operations that could be corrupted during failover. + * In order to protect the consistency of the connection and its child sessions, consumers and producers, the + * "failover mutex" must be held when doing any operations that could be corrupted during failover. * * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. */ @@ -873,8 +869,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * If failover is taking place this will block until it has completed. If failover - * is not taking place it will return immediately. + * If failover is taking place this will block until it has completed. If failover is not taking place it will + * return immediately. * * @throws InterruptedException */ @@ -884,10 +880,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Invoked by the AMQProtocolSession when a protocol session exception has occurred. - * This method sends the exception to a JMS exception listener, if configured, and - * propagates the exception to sessions, which in turn will propagate to consumers. - * This allows synchronous consumers to have exceptions thrown to them. + * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception + * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will + * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception */ @@ -937,7 +932,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info("Closing AMQConnection due to :" + cause.getMessage()); _closed.set(true); - closeAllSessions(cause); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, MAXIMUM_WAIT_TIME); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0648ed472d..6de63be63a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -338,7 +338,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _queue.add(msg); } - + if (stopped) { _dispatcher.setConnectionStopped(stopped); @@ -648,6 +648,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close() throws JMSException { + close(-1); + } + + public void close(long timeout) throws JMSException + { // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session synchronized (_connection.getFailoverMutex()) @@ -657,7 +662,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // we pass null since this is not an error case - closeProducersAndConsumers(null); + closeProducersAndConsumers(null); try { @@ -671,7 +676,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode "JMS client closing channel"); // replyText - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); + + _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -696,8 +702,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param amqe the exception, may be null to indicate no error has occurred */ - private void closeProducersAndConsumers(AMQException amqe) + private void closeProducersAndConsumers(AMQException amqe) throws JMSException { + JMSException jmse = null; + try { closeProducers(); @@ -705,6 +713,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (JMSException e) { _logger.error("Error closing session: " + e, e); + jmse = e; } try { @@ -713,6 +722,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (JMSException e) { _logger.error("Error closing session: " + e, e); + if (jmse == null) + { + jmse = e; + } + } + + if (jmse != null) + { + throw jmse; } } @@ -727,7 +745,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @param e the exception that caused this session to be closed. Null causes the */ - public void closed(Throwable e) + public void closed(Throwable e) throws JMSException { synchronized (_connection.getFailoverMutex()) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 53291a3fd0..d9ae7e91ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -31,13 +31,11 @@ import org.apache.log4j.Logger; import java.util.concurrent.CountDownLatch; /** - * When failover is required, we need a separate thread to handle the establishment of the new connection and - * the transfer of subscriptions. - * </p> - * The reason this needs to be a separate thread is because you cannot do this work inside the MINA IO processor - * thread. One significant task is the connection setup which involves a protocol exchange until a particular state - * is achieved. However if you do this in the MINA thread, you have to block until the state is achieved which means - * the IO processor is not able to do anything at all. + * When failover is required, we need a separate thread to handle the establishment of the new connection and the + * transfer of subscriptions. </p> The reason this needs to be a separate thread is because you cannot do this work + * inside the MINA IO processor thread. One significant task is the connection setup which involves a protocol exchange + * until a particular state is achieved. However if you do this in the MINA thread, you have to block until the state is + * achieved which means the IO processor is not able to do anything at all. */ public class FailoverHandler implements Runnable { @@ -46,14 +44,10 @@ public class FailoverHandler implements Runnable private final IoSession _session; private AMQProtocolHandler _amqProtocolHandler; - /** - * Used where forcing the failover host - */ + /** Used where forcing the failover host */ private String _host; - /** - * Used where forcing the failover port - */ + /** Used where forcing the failover port */ private int _port; public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) @@ -82,8 +76,6 @@ public class FailoverHandler implements Runnable // client code which runs in a separate thread. synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) { - _logger.info("Starting failover process"); - // We switch in a new state manager temporarily so that the interaction to get to the "connection open" // state works, without us having to terminate any existing "state waiters". We could theoretically // have a state waiter waiting until the connection is closed for some reason. Or in future we may have @@ -92,6 +84,8 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(new AMQStateManager()); if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { + _logger.info("Starting failover process"); + _amqProtocolHandler.setStateManager(existingStateManager); if (_host != null) { @@ -105,6 +99,9 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setFailoverLatch(null); return; } + + _logger.info("Starting failover process"); + boolean failoverSucceeded; // when host is non null we have a specified failover host otherwise we all the client to cycle through // all specified hosts @@ -123,7 +120,7 @@ public class FailoverHandler implements Runnable _amqProtocolHandler.setStateManager(existingStateManager); _amqProtocolHandler.getConnection().exceptionReceived( new AMQDisconnectedException("Server closed connection and no failover " + - "was successful")); + "was successful")); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 09a6f3be38..3d8c208ea4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -29,6 +29,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -58,21 +59,18 @@ public class AMQProtocolHandler extends IoHandlerAdapter private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); /** - * The connection that this protocol handler is associated with. There is a 1-1 - * mapping between connection instances and protocol handler instances. + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. */ private AMQConnection _connection; /** - * Used only when determining whether to add the SSL filter or not. This should be made more - * generic in future since we will potentially have many transport layer options + * Used only when determining whether to add the SSL filter or not. This should be made more generic in future since + * we will potentially have many transport layer options */ private boolean _useSSL; - /** - * Our wrapper for a protocol session that provides access to session values - * in a typesafe manner. - */ + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ private volatile AMQProtocolSession _protocolSession; private AMQStateManager _stateManager = new AMQStateManager(); @@ -86,6 +84,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private FailoverHandler _failoverHandler; + private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly * attempting failover where it is failing. @@ -175,6 +175,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * sessionClosed() depending on whether we were trying to send data at the time of failure. * * @param session + * * @throws Exception */ public void sessionClosed(IoSession session) throws Exception @@ -229,9 +230,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.info("Protocol Session [" + this + "] closed"); } - /** - * See {@link FailoverHandler} to see rationale for separate thread. - */ + /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { Thread failoverThread = new Thread(_failoverHandler); @@ -294,10 +293,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * There are two cases where we have other threads potentially blocking for events to be handled by this - * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a - * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can - * react appropriately. + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * * @param e the exception to propagate */ @@ -406,8 +404,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -422,9 +420,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter } /** - * Convenience method that writes a frame to the protocol session and waits for - * a particular response. Equivalent to calling getProtocolSession().write() then - * waiting for the response. + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. * * @param frame * @param listener the blocking listener. Note the calling thread will block. @@ -433,12 +430,27 @@ public class AMQProtocolHandler extends IoHandlerAdapter BlockingMethodFrameListener listener) throws AMQException { + return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); + } + + /** + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. + * + * @param frame + * @param listener the blocking listener. Note the calling thread will block. + */ + private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, + BlockingMethodFrameListener listener, + long timeout) + throws AMQException + { try { _frameListeners.add(listener); _protocolSession.writeFrame(frame); - AMQMethodEvent e = listener.blockForFrame(); + AMQMethodEvent e = listener.blockForFrame(timeout); return e; // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener @@ -451,19 +463,23 @@ public class AMQProtocolHandler extends IoHandlerAdapter } - /** - * More convenient method to write a frame and wait for it's response. - */ + /** More convenient method to write a frame and wait for it's response. */ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException { + return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); + } + + /** More convenient method to write a frame and wait for it's response. */ + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException + { return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.channel, responseClass)); + new SpecificMethodFrameListener(frame.channel, responseClass), + timeout); } /** - * Convenience method to register an AMQSession with the protocol handler. Registering - * a session with the protocol handler will ensure that messages are delivered to the - * consumer(s) on that session. + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. * * @param channelId the channel id of the session * @param session the session instance. @@ -490,33 +506,40 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { + closeConnection(-1); + } + + public void closeConnection(long timeout) throws AMQException + { _stateManager.changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - (byte)8, (byte)0, // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - "JMS client is closing the connection."); // replyText - syncWrite(frame, ConnectionCloseOkBody.class); - - _protocolSession.closeProtocolSession(); + (byte) 8, (byte) 0, // AMQP version (major, minor) + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + "JMS client is closing the connection."); // replyText + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } } - /** - * @return the number of bytes read from this protocol session - */ + /** @return the number of bytes read from this protocol session */ public long getReadBytes() { return _protocolSession.getIoSession().getReadBytes(); } - /** - * @return the number of bytes written to this protocol session - */ + /** @return the number of bytes written to this protocol session */ public long getWrittenBytes() { return _protocolSession.getIoSession().getWrittenBytes(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6a40fd3133..7b27d5b1f9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -46,13 +46,12 @@ import java.util.concurrent.ConcurrentMap; /** * Wrapper for protocol session that provides type-safe access to session attributes. * - * The underlying protocol session is still available but clients should not - * use it to obtain session attributes. + * The underlying protocol session is still available but clients should not use it to obtain session attributes. */ public class AMQProtocolSession implements ProtocolVersionList { - protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; + protected static final int WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 30; protected static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -69,33 +68,29 @@ public class AMQProtocolSession implements ProtocolVersionList protected WriteFuture _lastWriteFuture; /** - * The handler from which this session was created and which is used to handle protocol events. - * We send failover events to the handler. + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. */ protected final AMQProtocolHandler _protocolHandler; - /** - * Maps from the channel id to the AMQSession that it represents. - */ + /** Maps from the channel id to the AMQSession that it represents. */ protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** - * Maps from a channel id to an unprocessed message. This is used to tie together the - * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives + * first) with the subsequent content header and content bodies. */ protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); - /** - * Counter to ensure unique queue names - */ + /** Counter to ensure unique queue names */ protected int _queueId = 1; protected final Object _queueIdLock = new Object(); /** - * No-arg constructor for use by test subclass - has to initialise final vars - * NOT intended for use other then for test + * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for + * test */ public AMQProtocolSession() { @@ -167,8 +162,8 @@ public class AMQProtocolSession implements ProtocolVersionList /** * Store the SASL client currently being used for the authentication handshake - * @param client if non-null, stores this in the session. if null clears any existing client - * being stored + * + * @param client if non-null, stores this in the session. if null clears any existing client being stored */ public void setSaslClient(SaslClient client) { @@ -197,9 +192,11 @@ public class AMQProtocolSession implements ProtocolVersionList } /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. - * This is invoked on the MINA dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA + * dispatcher thread. + * * @param message + * * @throws AMQException if this was not expected */ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException @@ -254,10 +251,10 @@ public class AMQProtocolSession implements ProtocolVersionList } /** - * Deliver a message to the appropriate session, removing the unprocessed message - * from our map + * Deliver a message to the appropriate session, removing the unprocessed message from our map + * * @param channelId the channel id the message should be delivered to - * @param msg the message + * @param msg the message */ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) { @@ -267,8 +264,8 @@ public class AMQProtocolSession implements ProtocolVersionList } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -282,7 +279,7 @@ public class AMQProtocolSession implements ProtocolVersionList WriteFuture f = _minaProtocolSession.write(frame); if (wait) { - f.join(); + f.join(WRITE_FUTURE_JOIN_TIMEOUT); } else { @@ -316,6 +313,7 @@ public class AMQProtocolSession implements ProtocolVersionList /** * Starts the process of closing a session + * * @param session the AMQSession being closed */ public void closeSession(AMQSession session) @@ -333,23 +331,30 @@ public class AMQProtocolSession implements ProtocolVersionList } /** - * Called from the ChannelClose handler when a channel close frame is received. - * This method decides whether this is a response or an initiation. The latter - * case causes the AMQSession to be closed and an exception to be thrown if + * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is + * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if * appropriate. + * * @param channelId the id of the channel (session) - * @return true if the client must respond to the server, i.e. if the server - * initiated the channel close, false if the channel close is just the server - * responding to the client's earlier request to close the channel. + * + * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if + * the channel close is just the server responding to the client's earlier request to close the channel. */ - public boolean channelClosed(int channelId, int code, String text) + public boolean channelClosed(int channelId, int code, String text) throws AMQException { final Integer chId = channelId; // if this is not a response to an earlier request to close the channel if (_closingChannels.remove(chId) == null) { final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); - session.closed(new AMQException(_logger, code, text)); + try + { + session.closed(new AMQException(_logger, code, text)); + } + catch (JMSException e) + { + throw new AMQException("JMSException received while closing session", e); + } return true; } else @@ -365,15 +370,20 @@ public class AMQProtocolSession implements ProtocolVersionList public void closeProtocolSession() { + closeProtocolSession(true); + } + + public void closeProtocolSession(boolean waitLast) + { _logger.debug("Waiting for last write to join."); - if (_lastWriteFuture != null) + if (waitLast && _lastWriteFuture != null) { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _lastWriteFuture.join(WRITE_FUTURE_JOIN_TIMEOUT); } _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); - future.join(); + future.join(WRITE_FUTURE_JOIN_TIMEOUT); } public void failover(String host, int port) @@ -384,19 +394,16 @@ public class AMQProtocolSession implements ProtocolVersionList protected String generateQueueName() { int id; - synchronized(_queueIdLock) + synchronized (_queueIdLock) { id = _queueId++; } //get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:",""); + String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); return "tmp_" + localAddress + "_" + id; } - /** - * - * @param delay delay in seconds (not ms) - */ + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { if (delay > 0) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 5342eb86f6..48cf8d9366 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.framing.AMQMethodBody; @@ -33,8 +34,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener private final Object _lock = new Object(); /** - * This is set if there is an exception thrown from processCommandFrame and the - * exception is rethrown to the caller of blockForFrame() + * This is set if there is an exception thrown from processCommandFrame and the exception is rethrown to the caller + * of blockForFrame() */ private volatile Exception _error; @@ -48,11 +49,13 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } /** - * This method is called by the MINA dispatching thread. Note that it could - * be called before blockForFrame() has been called. + * This method is called by the MINA dispatching thread. Note that it could be called before blockForFrame() has + * been called. * * @param evt the frame event + * * @return true if the listener has dealt with this frame + * * @throws AMQException */ public boolean methodReceived(AMQMethodEvent evt) throws AMQException @@ -85,10 +88,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } } - /** - * This method is called by the thread that wants to wait for a frame. - */ - public AMQMethodEvent blockForFrame() throws AMQException + /** This method is called by the thread that wants to wait for a frame. */ + public AMQMethodEvent blockForFrame(long timeout) throws AMQException { synchronized (_lock) { @@ -96,7 +97,20 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener { try { - _lock.wait(); + if (timeout == -1) + { + _lock.wait(); + } + else + { + + _lock.wait(timeout); + if (!_ready) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion"); + _ready = true; + } + } } catch (InterruptedException e) { @@ -125,8 +139,8 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener } /** - * This is a callback, called by the MINA dispatcher thread only. It is also called from within this - * class to avoid code repetition but again is only called by the MINA dispatcher thread. + * This is a callback, called by the MINA dispatcher thread only. It is also called from within this class to avoid + * code repetition but again is only called by the MINA dispatcher thread. * * @param e */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 56b49230ff..105ac462ae 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -33,22 +33,19 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; /** - * The state manager is responsible for managing the state of the protocol session. - * <p/> - * For each AMQProtocolHandler there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler + * there is a separate state manager. */ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - /** - * The current state - */ + /** The current state */ private AMQState _currentState; /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. - * The class must be a subclass of AMQFrame. + * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of + * AMQFrame. */ private final Map _state2HandlersMap = new HashMap(); @@ -159,14 +156,13 @@ public class AMQStateManager implements AMQMethodListener final Class clazz = frame.getClass(); if (_logger.isDebugEnabled()) { - _logger.debug("Looking for state transition handler for frame " + clazz); + _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz); } final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState); if (classToHandlerMap == null) { - // if no specialised per state handler is registered look for a - // handler registered for "all" states + _logger.debug("no specialised per state handler is registered look for a handler registered for 'all' states"); return findStateTransitionHandler(null, frame); } final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz); @@ -174,13 +170,12 @@ public class AMQStateManager implements AMQMethodListener { if (currentState == null) { - _logger.debug("No state transition handler defined for receiving frame " + frame); + _logger.debug("No state[" + currentState + "] transition handler defined for receiving frame " + frame); return null; } else { - // if no specialised per state handler is registered look for a - // handler registered for "all" states + _logger.debug("No specialised per state handler is registered look for a handler registered for 'all' states"); return findStateTransitionHandler(null, frame); } } @@ -214,7 +209,7 @@ public class AMQStateManager implements AMQMethodListener } if (_currentState != s) { - _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); + _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java new file mode 100644 index 0000000000..13c44d14f5 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/BrokerFillMemoryRun.java @@ -0,0 +1,102 @@ +package org.apache.qpid.ManualTests; + +import junit.framework.TestCase; +import org.apache.qpid.testutil.QpidClientConnection; + +import javax.jms.JMSException; + +/** NF101: heap exhaustion behaviour + * Provided by customer + */ +public class BrokerFillMemoryRun extends TestCase +{ + protected QpidClientConnection conn; + protected final String vhost = "/test"; + protected final String queue = "direct://amq.direct//queue"; + + protected String hundredK; + protected String megabyte; + + protected final String BROKER = "tcp://localhost:5672"; + + + protected void log(String msg) + { + System.out.println(msg); + } + + protected String generatePayloadOfSize(Integer numBytes) + { + return new String(new byte[numBytes]); + } + + protected void setUp() throws Exception + { + super.setUp(); + conn = new QpidClientConnection(BROKER); + conn.setVirtualHost(vhost); + + conn.connect(); + // clear queue + log("setup: clearing test queue"); + conn.consume(queue, 2000); + + hundredK = generatePayloadOfSize(1024 * 100); + megabyte = generatePayloadOfSize(1024 * 1024); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + try + { + conn.disconnect(); + } + catch (JMSException d) + { + log("disconnectJMSE:" + d.getMessage()); + } + } + + + /** PUT at maximum rate (although we commit after each PUT) until failure */ + public void testUntilFailure() throws Exception + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length + String.valueOf("0").getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + log("put copy " + copies + " OK for total bytes: " + total); + } + } + + /** PUT at lower rate (5 per second) until failure */ + public void testUntilFailureWithDelays() throws Exception + { + try + { + int copies = 0; + int total = 0; + String payload = hundredK; + int size = payload.getBytes().length + String.valueOf("0").getBytes().length; + while (true) + { + conn.put(queue, payload, 1); + copies++; + total += size; + log("put copy " + copies + " OK for total bytes: " + total); + Thread.sleep(200); + } + } + catch (JMSException e) + { + log("putJMSE:" + e.getMessage()); + } + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java new file mode 100644 index 0000000000..93810936d2 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/ManualTests/MessageAgeAlert.java @@ -0,0 +1,61 @@ +package org.apache.qpid.ManualTests; + +import junit.framework.TestCase; + +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.client.transport.TransportConnection; + +/** FT401: alert on message age + * Provided by customer + */ +public class MessageAgeAlert extends TestCase +{ + protected QpidClientConnection conn; + protected final String queue = "direct://amq.direct//queue"; + protected final String vhost = "/test"; + + protected String payload = "xyzzy"; + + protected Integer agePeriod = 30000; + + protected final String BROKER = "localhost"; + + protected void log(String msg) + { + System.out.println(msg); + } + + protected void setUp() throws Exception + { + super.setUp(); + +// TransportConnection.createVMBroker(1); + + conn = new QpidClientConnection(BROKER); + conn.setVirtualHost(vhost); + + conn.connect(); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + conn.disconnect(); +// TransportConnection.killVMBroker(1); + } + + /** + * put a message and wait for age alert + * + * @throws Exception on error + */ + public void testSinglePutThenWait() throws Exception + { + conn.put(queue, payload, 1); + log("waiting ms: " + agePeriod); + Thread.sleep(agePeriod); + log("wait period over"); + conn.put(queue, payload, 1); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java index 28bb2b614b..b792359906 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java @@ -239,7 +239,7 @@ public class ResetMessageListenerTest extends TestCase try { - _logger.error("Send additional messages"); + _logger.info("Send additional messages"); for (int msg = 0; msg < MSG_COUNT; msg++) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 67049765a5..86721d54c6 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -40,9 +40,14 @@ import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.testutil.QpidClientConnection; import org.apache.log4j.Logger; import org.apache.log4j.Level; +/** + * based on FT304 - Competing consumers + * provided by customer + */ public class MessageRequeueTest extends TestCase { @@ -67,7 +72,7 @@ public class MessageRequeueTest extends TestCase Logger session = Logger.getLogger("org.apache.qpid.client.AMQSession"); session.setLevel(Level.ERROR); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -77,7 +82,7 @@ public class MessageRequeueTest extends TestCase conn.put(queue, payload, numTestMessages); // close this connection conn.disconnect(); - } + } protected void tearDown() throws Exception { @@ -139,7 +144,7 @@ public class MessageRequeueTest extends TestCase try { _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -179,252 +184,4 @@ public class MessageRequeueTest extends TestCase return id; } } - - - public class QpidClientConnection implements ExceptionListener - { - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection() - { - super(); - setVirtualHost("/test"); - setBrokerList(BROKER); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public String getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) - { - connect(); - } - - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - String result; - - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message).getText(); - } - else if (null == message) - { - result = null; - } - else - { - _logger.info("warning: received non-text message"); - result = message.toString(); - } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public String getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - - session.commit(); - consumer.close(); - _logger.info("consumed: " + messagesReceived); - } - } - }
\ No newline at end of file diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..c6bbcc416b --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,273 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +/** + * This shouldn't be used for making production connections. + * As it has some unusual operations. + * + * Supplied by customer + */ +public class QpidClientConnection implements ExceptionListener +{ + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String BROKER) + { + super(); + setVirtualHost("/test"); + setBrokerList(BROKER); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public String getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + String result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message).getText(); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message.toString(); + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public String getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } + } + diff --git a/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java b/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java new file mode 100644 index 0000000000..9dea7a7d01 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +public class AMQTimeoutException extends AMQException +{ + public AMQTimeoutException(String message) + { + super(AMQConstant.REQUEST_TIMEOUT.getCode(), message); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index a0d243ca30..52ebc0485c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -82,6 +82,8 @@ public final class AMQConstant public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true); + public static final AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "Request Timeout", true); + public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true); public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fee091e3bd..978126fa39 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -26,13 +26,11 @@ import org.apache.qpid.framing.ContentHeaderBody; import javax.management.Notification; -/** - * This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters - */ +/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase { - private final static int MAX_MESSAGE_COUNT = 50; - private final static long MAX_MESSAGE_AGE = 2000; // 2 sec + private final static int MAX_MESSAGE_COUNT = 50; + private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB private AMQQueue _queue; @@ -42,19 +40,20 @@ public class AMQQueueAlertTest extends TestCase /** * Tests if the alert gets thrown when message count increases the threshold limit + * * @throws Exception */ public void testMessageCountAlert() throws Exception { _queue = new AMQQueue("testQueue1", false, "AMQueueAlertTest", false, _queueRegistry); - _queueMBean = (AMQQueueMBean)_queue.getManagedObject(); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); sendMessages(MAX_MESSAGE_COUNT, 256l); assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT); - Notification lastNotification= _queueMBean.getLastNotification(); + Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); String notificationMsg = lastNotification.getMessage(); @@ -63,19 +62,20 @@ public class AMQQueueAlertTest extends TestCase /** * Tests if the Message Size alert gets thrown when message of higher than threshold limit is sent + * * @throws Exception */ public void testMessageSizeAlert() throws Exception { _queue = new AMQQueue("testQueue2", false, "AMQueueAlertTest", false, _queueRegistry); - _queueMBean = (AMQQueueMBean)_queue.getManagedObject(); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE); sendMessages(1, MAX_MESSAGE_SIZE * 2); assertTrue(_queueMBean.getMessageCount() == 1); - Notification lastNotification= _queueMBean.getLastNotification(); + Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); String notificationMsg = lastNotification.getMessage(); @@ -84,12 +84,13 @@ public class AMQQueueAlertTest extends TestCase /** * Tests if Queue Depth alert is thrown when queue depth reaches the threshold value + * * @throws Exception */ public void testQueueDepthAlert() throws Exception { _queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry); - _queueMBean = (AMQQueueMBean)_queue.getManagedObject(); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); @@ -98,7 +99,7 @@ public class AMQQueueAlertTest extends TestCase sendMessages(1, MAX_MESSAGE_SIZE); } - Notification lastNotification= _queueMBean.getLastNotification(); + Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); String notificationMsg = lastNotification.getMessage(); @@ -106,23 +107,27 @@ public class AMQQueueAlertTest extends TestCase } /** - * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than - * threshold value of message age + * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of + * message age + * * @throws Exception */ public void testMessageAgeAlert() throws Exception { _queue = new AMQQueue("testQueue4", false, "AMQueueAlertTest", false, _queueRegistry); - _queueMBean = (AMQQueueMBean)_queue.getManagedObject(); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE); sendMessages(1, MAX_MESSAGE_SIZE); - Thread.sleep(MAX_MESSAGE_AGE); + + // Ensure message sits on queue long enough to age. + Thread.sleep(MAX_MESSAGE_AGE * 2); + sendMessages(1, MAX_MESSAGE_SIZE); assertTrue(_queueMBean.getMessageCount() == 2); - Notification lastNotification= _queueMBean.getLastNotification(); + Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); String notificationMsg = lastNotification.getMessage(); @@ -133,7 +138,7 @@ public class AMQQueueAlertTest extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); + BasicPublishBody publish = new BasicPublishBody((byte) 8, (byte) 0); publish.immediate = immediate; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = size; // in bytes |