summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java137
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java29
4 files changed, 117 insertions, 100 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 2030876952..adf2a4bda2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -70,20 +70,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AtomicInteger _idFactory = new AtomicInteger(0);
/**
- * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
- * This must be held by any child objects of this connection such as the session, producers and consumers.
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
+ * held by any child objects of this connection such as the session, producers and consumers.
*/
private final Object _failoverMutex = new Object();
/**
- * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
- * per session and we must prevent the client from opening too many. Zero means unlimited.
+ * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
+ * and we must prevent the client from opening too many. Zero means unlimited.
*/
private long _maximumChannelCount;
- /**
- * The maximum size of frame supported by the server
- */
+ /** The maximum size of frame supported by the server */
private long _maximumFrameSize;
/**
@@ -93,26 +91,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private AMQProtocolHandler _protocolHandler;
- /**
- * Maps from session id (Integer) to AMQSession instance
- */
+ /** Maps from session id (Integer) to AMQSession instance */
private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
private String _clientName;
- /**
- * The user name to use for authentication
- */
+ /** The user name to use for authentication */
private String _username;
- /**
- * The password to use for authentication
- */
+ /** The password to use for authentication */
private String _password;
- /**
- * The virtual path to connect to on the AMQ server
- */
+ /** The virtual path to connect to on the AMQ server */
private String _virtualHost;
private ExceptionListener _exceptionListener;
@@ -122,14 +112,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private ConnectionURL _connectionURL;
/**
- * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
- * message publication.
+ * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
+ * publication.
*/
private boolean _started;
- /**
- * Policy dictating how to failover
- */
+ /** Policy dictating how to failover */
private FailoverPolicy _failoverPolicy;
/*
@@ -148,9 +136,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
private QpidConnectionMetaData _connectionMetaData;
- /**
- * Configuration info for SSL
- */
+ /** Configuration info for SSL */
private SSLConfiguration _sslConfiguration;
private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
@@ -164,6 +150,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
+ *
* @throws AMQException
* @throws URLSyntaxException
*/
@@ -182,6 +169,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* @param password password
* @param clientName clientid
* @param virtualHost virtualhost
+ *
* @throws AMQException
* @throws URLSyntaxException
*/
@@ -238,7 +226,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
- _logger.info("Connection:" + connectionURL);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connection:" + connectionURL);
+ }
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -297,11 +288,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
lastException = e;
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
+ }
}
}
- _logger.debug("Are we connected:" + _connected);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Are we connected:" + _connected);
+ }
if (!_connected)
{
@@ -402,7 +399,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (Exception e)
{
- _logger.info("Unable to connect to broker at " + bd);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + bd);
+ }
attemptReconnection();
}
return false;
@@ -421,11 +421,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!(e instanceof AMQException))
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
+ }
}
else
{
- _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info(e.getMessage() + ":Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails());
+ }
}
}
}
@@ -437,8 +443,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Get the details of the currently active broker
*
- * @return null if no broker is active (i.e. no successful connection has been made, or
- * the BrokerDetail instance otherwise
+ * @return null if no broker is active (i.e. no successful connection has been made, or the BrokerDetail instance
+ * otherwise
*/
public BrokerDetails getActiveBrokerDetails()
{
@@ -593,12 +599,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return QueueSession
+ *
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -607,12 +615,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
- * where specified in the JMS spec
+ * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions where specified in
+ * the JMS spec
*
* @param transacted
* @param acknowledgeMode
+ *
* @return TopicSession
+ *
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
@@ -718,11 +728,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Marks all sessions and their children as closed without sending any protocol messages. Useful when
- * you need to mark objects "visible" in userland as closed after failover or other significant event that
- * impacts the connection.
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
+ * mark objects "visible" in userland as closed after failover or other significant event that impacts the
+ * connection. <p/> The caller must hold the failover mutex before calling this method.
*/
private void markAllSessionsClosed()
{
@@ -740,9 +748,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Close all the sessions, either due to normal connection closure or due to an error occurring.
*
- * @param cause if not null, the error that is causing this shutdown
- * <p/>
- * The caller must hold the failover mutex before calling this method.
+ * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex
+ * before calling this method.
*/
private void closeAllSessions(Throwable cause, long timeout) throws JMSException
{
@@ -891,6 +898,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* Fire the preFailover event to the registered connection listener (if any)
*
* @param redirect true if this is the result of a redirect request rather than a connection error
+ *
* @return true if no listener or listener does not veto change
*/
public boolean firePreFailover(boolean redirect)
@@ -904,10 +912,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Fire the preResubscribe event to the registered connection listener (if any). If the listener
- * vetoes resubscription then all the sessions are closed.
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener vetoes
+ * resubscription then all the sessions are closed.
*
* @return true if no listener or listener does not veto resubscription.
+ *
* @throws JMSException
*/
public boolean firePreResubscribe() throws JMSException
@@ -927,9 +936,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- /**
- * Fires a failover complete event to the registered connection listener (if any).
- */
+ /** Fires a failover complete event to the registered connection listener (if any). */
public void fireFailoverComplete()
{
if (_connectionListener != null)
@@ -939,8 +946,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * In order to protect the consistency of the connection and its child sessions, consumers and producers,
- * the "failover mutex" must be held when doing any operations that could be corrupted during failover.
+ * In order to protect the consistency of the connection and its child sessions, consumers and producers, the
+ * "failover mutex" must be held when doing any operations that could be corrupted during failover.
*
* @return a mutex. Guaranteed never to change for the lifetime of this connection even if failover occurs.
*/
@@ -950,8 +957,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * If failover is taking place this will block until it has completed. If failover
- * is not taking place it will return immediately.
+ * If failover is taking place this will block until it has completed. If failover is not taking place it will
+ * return immediately.
*
* @throws InterruptedException
*/
@@ -961,18 +968,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Invoked by the AMQProtocolSession when a protocol session exception has occurred.
- * This method sends the exception to a JMS exception listener, if configured, and
- * propagates the exception to sessions, which in turn will propagate to consumers.
- * This allows synchronous consumers to have exceptions thrown to them.
+ * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
+ * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+ * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
*/
public void exceptionReceived(Throwable cause)
{
- _logger.debug("Connection Close done by:" + Thread.currentThread().getName());
- _logger.debug("exceptionReceived is ", cause);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
+ }
final JMSException je;
if (cause instanceof JMSException)
@@ -1012,7 +1020,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing AMQConnection due to :" + cause.getMessage());
+ }
_closed.set(true);
closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 7ab26f3b47..dc2ffc38c4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1629,9 +1629,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message received in session with channel id " + _channelId);
+ _logger.debug("Message[" + (message.getDeliverBody() == null ?
+ "B:" + message.getBounceBody() : "D:" + message.getDeliverBody())
+ + "] received in session with channel id " + _channelId);
}
+ startDistpatcherIfNecessary();
+
_queue.add(message);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 57d987712a..9c8e9188ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -60,36 +60,41 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
AMQShortString reason = method.replyText;
- // TODO: check whether channel id of zero is appropriate
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, method.getMajor(), method.getMinor()));
-
- if (errorCode != AMQConstant.REPLY_SUCCESS)
+ try
{
- if(errorCode == AMQConstant.NOT_ALLOWED)
+ // TODO: check whether channel id of zero is appropriate
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+ if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.info("Authentication Error:"+Thread.currentThread().getName());
+ if (errorCode == AMQConstant.NOT_ALLOWED)
+ {
+ _logger.info("Authentication Error:" + Thread.currentThread().getName());
- protocolSession.closeProtocolSession();
+ protocolSession.closeProtocolSession();
- //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
- stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
+ //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+ stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
- throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
- }
- else
- {
- _logger.info("Connection close received with error code " + errorCode);
+ throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString());
+ }
+ else
+ {
+ _logger.info("Connection close received with error code " + errorCode);
- throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ }
}
}
+ finally
+ {
+ // this actually closes the connection in the case where it is not an error.
- // this actually closes the connection in the case where it is not an error.
-
- protocolSession.closeProtocolSession();
+ protocolSession.closeProtocolSession();
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 09dd6ecfff..825baf95d1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -59,25 +59,22 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
/**
- * The state manager is responsible for managing the state of the protocol session.
- * <p/>
- * For each AMQProtocolHandler there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
+ * there is a separate state manager.
*/
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
private AMQProtocolSession _protocolSession;
- /**
- * The current state
- */
+ /** The current state */
private AMQState _currentState;
/**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
- * The class must be a subclass of AMQFrame.
+ * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
+ * AMQFrame.
*/
- private final Map _state2HandlersMap = new HashMap();
+ protected final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
private final Object _stateLock = new Object();
@@ -87,7 +84,7 @@ public class AMQStateManager implements AMQMethodListener
{
this(null);
}
-
+
public AMQStateManager(AMQProtocolSession protocolSession)
{
@@ -98,7 +95,7 @@ public class AMQStateManager implements AMQMethodListener
{
_protocolSession = protocolSession;
_currentState = state;
- if(register)
+ if (register)
{
registerListeners();
}
@@ -194,7 +191,7 @@ public class AMQStateManager implements AMQMethodListener
final Class clazz = frame.getClass();
if (_logger.isDebugEnabled())
{
- _logger.debug("Looking for state transition handler for frame " + clazz);
+ _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
}
final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
@@ -228,12 +225,12 @@ public class AMQStateManager implements AMQMethodListener
public void attainState(final AMQState s) throws AMQException
{
- synchronized(_stateLock)
+ synchronized (_stateLock)
{
final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
long waitTime = MAXIMUM_STATE_WAIT_TIME;
- while(_currentState != s && waitTime > 0)
+ while (_currentState != s && waitTime > 0)
{
try
{
@@ -243,12 +240,12 @@ public class AMQStateManager implements AMQMethodListener
{
_logger.warn("Thread interrupted");
}
- if(_currentState != s)
+ if (_currentState != s)
{
waitTime = waitUntilTime - System.currentTimeMillis();
}
}
- if(_currentState != s)
+ if (_currentState != s)
{
_logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);
throw new AMQException("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s);