diff options
Diffstat (limited to 'java/client/src/main')
4 files changed, 117 insertions, 100 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 2030876952..adf2a4bda2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -70,20 +70,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AtomicInteger _idFactory = new AtomicInteger(0); /** - * This is the "root" mutex that must be held when doing anything that could be impacted by failover. - * This must be held by any child objects of this connection such as the session, producers and consumers. + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be + * held by any child objects of this connection such as the session, producers and consumers. */ private final Object _failoverMutex = new Object(); /** - * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels - * per session and we must prevent the client from opening too many. Zero means unlimited. + * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session + * and we must prevent the client from opening too many. Zero means unlimited. */ private long _maximumChannelCount; - /** - * The maximum size of frame supported by the server - */ + /** The maximum size of frame supported by the server */ private long _maximumFrameSize; /** @@ -93,26 +91,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private AMQProtocolHandler _protocolHandler; - /** - * Maps from session id (Integer) to AMQSession instance - */ + /** Maps from session id (Integer) to AMQSession instance */ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap private String _clientName; - /** - * The user name to use for authentication - */ + /** The user name to use for authentication */ private String _username; - /** - * The password to use for authentication - */ + /** The password to use for authentication */ private String _password; - /** - * The virtual path to connect to on the AMQ server - */ + /** The virtual path to connect to on the AMQ server */ private String _virtualHost; private ExceptionListener _exceptionListener; @@ -122,14 +112,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ConnectionURL _connectionURL; /** - * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for - * message publication. + * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message + * publication. */ private boolean _started; - /** - * Policy dictating how to failover - */ + /** Policy dictating how to failover */ private FailoverPolicy _failoverPolicy; /* @@ -148,9 +136,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private QpidConnectionMetaData _connectionMetaData; - /** - * Configuration info for SSL - */ + /** Configuration info for SSL */ private SSLConfiguration _sslConfiguration; private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; @@ -164,6 +150,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param password password * @param clientName clientid * @param virtualHost virtualhost + * * @throws AMQException * @throws URLSyntaxException */ @@ -182,6 +169,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param password password * @param clientName clientid * @param virtualHost virtualhost + * * @throws AMQException * @throws URLSyntaxException */ @@ -238,7 +226,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { - _logger.info("Connection:" + connectionURL); + if (_logger.isInfoEnabled()) + { + _logger.info("Connection:" + connectionURL); + } _sslConfiguration = sslConfig; if (connectionURL == null) { @@ -297,11 +288,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { lastException = e; - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause()); + } } } - _logger.debug("Are we connected:" + _connected); + if (_logger.isDebugEnabled()) + { + _logger.debug("Are we connected:" + _connected); + } if (!_connected) { @@ -402,7 +399,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (Exception e) { - _logger.info("Unable to connect to broker at " + bd); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + bd); + } attemptReconnection(); } return false; @@ -421,11 +421,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!(e instanceof AMQException)) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + if (_logger.isInfoEnabled()) + { + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); + } } else { - _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + if (_logger.isInfoEnabled()) + { + _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails()); + } } } } @@ -437,8 +443,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Get the details of the currently active broker * - * @return null if no broker is active (i.e. no successful connection has been made, or - * the BrokerDetail instance otherwise + * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance + * otherwise */ public BrokerDetails getActiveBrokerDetails() { @@ -593,12 +599,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return QueueSession + * * @throws JMSException */ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -607,12 +615,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions - * where specified in the JMS spec + * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in + * the JMS spec * * @param transacted * @param acknowledgeMode + * * @return TopicSession + * * @throws JMSException */ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException @@ -718,11 +728,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Marks all sessions and their children as closed without sending any protocol messages. Useful when - * you need to mark objects "visible" in userland as closed after failover or other significant event that - * impacts the connection. - * <p/> - * The caller must hold the failover mutex before calling this method. + * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to + * mark objects "visible" in userland as closed after failover or other significant event that impacts the + * connection. <p/> The caller must hold the failover mutex before calling this method. */ private void markAllSessionsClosed() { @@ -740,9 +748,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Close all the sessions, either due to normal connection closure or due to an error occurring. * - * @param cause if not null, the error that is causing this shutdown - * <p/> - * The caller must hold the failover mutex before calling this method. + * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex + * before calling this method. */ private void closeAllSessions(Throwable cause, long timeout) throws JMSException { @@ -891,6 +898,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * Fire the preFailover event to the registered connection listener (if any) * * @param redirect true if this is the result of a redirect request rather than a connection error + * * @return true if no listener or listener does not veto change */ public boolean firePreFailover(boolean redirect) @@ -904,10 +912,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Fire the preResubscribe event to the registered connection listener (if any). If the listener - * vetoes resubscription then all the sessions are closed. + * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes + * resubscription then all the sessions are closed. * * @return true if no listener or listener does not veto resubscription. + * * @throws JMSException */ public boolean firePreResubscribe() throws JMSException @@ -927,9 +936,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - /** - * Fires a failover complete event to the registered connection listener (if any). - */ + /** Fires a failover complete event to the registered connection listener (if any). */ public void fireFailoverComplete() { if (_connectionListener != null) @@ -939,8 +946,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * In order to protect the consistency of the connection and its child sessions, consumers and producers, - * the "failover mutex" must be held when doing any operations that could be corrupted during failover. + * In order to protect the consistency of the connection and its child sessions, consumers and producers, the + * "failover mutex" must be held when doing any operations that could be corrupted during failover. * * @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs. */ @@ -950,8 +957,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * If failover is taking place this will block until it has completed. If failover - * is not taking place it will return immediately. + * If failover is taking place this will block until it has completed. If failover is not taking place it will + * return immediately. * * @throws InterruptedException */ @@ -961,18 +968,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } /** - * Invoked by the AMQProtocolSession when a protocol session exception has occurred. - * This method sends the exception to a JMS exception listener, if configured, and - * propagates the exception to sessions, which in turn will propagate to consumers. - * This allows synchronous consumers to have exceptions thrown to them. + * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception + * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will + * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them. * * @param cause the exception */ public void exceptionReceived(Throwable cause) { - _logger.debug("Connection Close done by:" + Thread.currentThread().getName()); - _logger.debug("exceptionReceived is ", cause); + if (_logger.isDebugEnabled()) + { + _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause); + } final JMSException je; if (cause instanceof JMSException) @@ -1012,7 +1020,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - _logger.info("Closing AMQConnection due to :" + cause.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing AMQConnection due to :" + cause.getMessage()); + } _closed.set(true); closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 7ab26f3b47..dc2ffc38c4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1629,9 +1629,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_logger.isDebugEnabled()) { - _logger.debug("Message received in session with channel id " + _channelId); + _logger.debug("Message[" + (message.getDeliverBody() == null ? + "B:" + message.getBounceBody() : "D:" + message.getDeliverBody()) + + "] received in session with channel id " + _channelId); } + startDistpatcherIfNecessary(); + _queue.add(message); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 57d987712a..9c8e9188ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -60,36 +60,41 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); AMQShortString reason = method.replyText; - // TODO: check whether channel id of zero is appropriate - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor())); - - if (errorCode != AMQConstant.REPLY_SUCCESS) + try { - if(errorCode == AMQConstant.NOT_ALLOWED) + // TODO: check whether channel id of zero is appropriate + // Be aware of possible changes to parameter order as versions change. + protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor())); + + if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.info("Authentication Error:"+Thread.currentThread().getName()); + if (errorCode == AMQConstant.NOT_ALLOWED) + { + _logger.info("Authentication Error:" + Thread.currentThread().getName()); - protocolSession.closeProtocolSession(); + protocolSession.closeProtocolSession(); - //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. - stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); + //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. + stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); - } - else - { - _logger.info("Connection close received with error code " + errorCode); + throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); + } + else + { + _logger.info("Connection close received with error code " + errorCode); - throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } } } + finally + { + // this actually closes the connection in the case where it is not an error. - // this actually closes the connection in the case where it is not an error. - - protocolSession.closeProtocolSession(); + protocolSession.closeProtocolSession(); - stateManager.changeState(AMQState.CONNECTION_CLOSED); + stateManager.changeState(AMQState.CONNECTION_CLOSED); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 09dd6ecfff..825baf95d1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -59,25 +59,22 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; /** - * The state manager is responsible for managing the state of the protocol session. - * <p/> - * For each AMQProtocolHandler there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler + * there is a separate state manager. */ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); private AMQProtocolSession _protocolSession; - /** - * The current state - */ + /** The current state */ private AMQState _currentState; /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. - * The class must be a subclass of AMQFrame. + * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of + * AMQFrame. */ - private final Map _state2HandlersMap = new HashMap(); + protected final Map _state2HandlersMap = new HashMap(); private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); private final Object _stateLock = new Object(); @@ -87,7 +84,7 @@ public class AMQStateManager implements AMQMethodListener { this(null); } - + public AMQStateManager(AMQProtocolSession protocolSession) { @@ -98,7 +95,7 @@ public class AMQStateManager implements AMQMethodListener { _protocolSession = protocolSession; _currentState = state; - if(register) + if (register) { registerListeners(); } @@ -194,7 +191,7 @@ public class AMQStateManager implements AMQMethodListener final Class clazz = frame.getClass(); if (_logger.isDebugEnabled()) { - _logger.debug("Looking for state transition handler for frame " + clazz); + _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz); } final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState); @@ -228,12 +225,12 @@ public class AMQStateManager implements AMQMethodListener public void attainState(final AMQState s) throws AMQException { - synchronized(_stateLock) + synchronized (_stateLock) { final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; long waitTime = MAXIMUM_STATE_WAIT_TIME; - while(_currentState != s && waitTime > 0) + while (_currentState != s && waitTime > 0) { try { @@ -243,12 +240,12 @@ public class AMQStateManager implements AMQMethodListener { _logger.warn("Thread interrupted"); } - if(_currentState != s) + if (_currentState != s) { waitTime = waitUntilTime - System.currentTimeMillis(); } } - if(_currentState != s) + if (_currentState != s) { _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); |
