diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid')
11 files changed, 156 insertions, 114 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index 05ac3dca9e..6bae0166d1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,9 +39,4 @@ public class AMQAuthenticationException extends AMQException { super(error, msg, cause); } - public boolean isHardError() - { - return true; - } - } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 118be75705..2e3e417c95 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -65,6 +65,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -205,9 +206,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - protected static final boolean DECLARE_QUEUES = + protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - protected static final boolean DECLARE_EXCHANGES = + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); /** System property to enable strict AMQP compliance. */ @@ -629,6 +630,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ public void close(long timeout) throws JMSException { + close(timeout, true); + } + + private void close(long timeout, boolean sendClose) throws JMSException + { if (_logger.isInfoEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -654,9 +660,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // If the connection is open or we are in the process // of closing the connection then send a cance // no point otherwise as the connection will be gone - if (!_connection.isClosed() || _connection.isClosing()) + if (!_connection.isClosed() || _connection.isClosing()) { - sendClose(timeout); + if (sendClose) + { + sendClose(timeout); + } } } catch (AMQException e) @@ -712,25 +721,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (!_closed.getAndSet(true)) { - synchronized (getFailoverMutex()) + synchronized (_messageDeliveryLock) { - synchronized (_messageDeliveryLock) + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); } + + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); } } } @@ -1737,6 +1743,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { + if (e instanceof AMQChannelClosedException) + { + close(-1, false); + } + JMSException ex = new JMSException("Error registering consumer: " + e); ex.setLinkedException(e); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index fa4e08f62b..d7196c0abb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; @@ -116,12 +117,23 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B public void sendClose(long timeout) throws AMQException, FailoverException { - getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), - ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. + // we also need to check the state manager for 08/09 as the + // _connection variable may not be updated in time by the error receiving + // thread. + // We can't close the session if we are alreadying in the process of + // closing/closed the connection. + + if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) + || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) + { + + getProtocolHandler().closeSession(this); + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), + ChannelCloseOkBody.class, timeout); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } } public void sendCommit() throws AMQException, FailoverException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 2b6745ebe4..2cf19bf391 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann } public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) - throws AMQException + throws AMQException { _logger.debug("ChannelClose method received"); @@ -59,52 +58,62 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); } - - ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody(); AMQFrame frame = body.generateFrame(channelId); session.writeFrame(frame); - - if (errorCode != AMQConstant.REPLY_SUCCESS) + try { - if (_logger.isDebugEnabled()) - { - _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); - } - - if (errorCode == AMQConstant.NO_CONSUMERS) - { - throw new AMQNoConsumersException("Error: " + reason, null, null); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - throw new AMQNoRouteException("Error: " + reason, null, null); - } - else if (errorCode == AMQConstant.INVALID_ARGUMENT) + if (errorCode != AMQConstant.REPLY_SUCCESS) { - _logger.debug("Broker responded with Invalid Argument."); + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); + } + + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null, null); + } + else if (errorCode == AMQConstant.INVALID_ARGUMENT) + { + _logger.debug("Broker responded with Invalid Argument."); + + throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); + } + else + { + + throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); + } - throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason), null); - } - else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) - { - _logger.debug("Broker responded with Invalid Routing Key."); - - throw new AMQInvalidRoutingKeyException(String.valueOf(reason), null); - } - else - { - throw new AMQChannelClosedException(errorCode, "Error: " + reason, null); } - } - // fixme why is this only done when the close is expected... - // should the above forced closes not also cause a close? - // ---------- - // Closing the session only when it is expected allows the errors to be processed - // Calling this here will prevent failover. So we should do this for all exceptions - // that should never cause failover. Such as authentication errors. - - session.channelClosed(channelId, errorCode, String.valueOf(reason)); + finally + { + // fixme why is this only done when the close is expected... + // should the above forced closes not also cause a close? + // ---------- + // Closing the session only when it is expected allows the errors to be processed + // Calling this here will prevent failover. So we should do this for all exceptions + // that should never cause failover. Such as authentication errors. + // ---- + // 2009-09-07 - ritchiem + // calling channelClosed will only close this session and will not + // prevent failover. If we don't close the session here then we will + // have problems during the session close as it will attempt to + // close the session that the broker has closed, + + session.channelClosed(channelId, errorCode, String.valueOf(reason)); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index e639a33450..e40cafd72f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -40,7 +40,6 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C } public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId) - throws AMQException { session.getStateManager().changeState(AMQState.CONNECTION_OPEN); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index e4e58c317d..287b5957a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -45,7 +45,6 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con { } public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId) - throws AMQException { _logger.debug("ConnectionTune frame received"); final MethodRegistry methodRegistry = session.getMethodRegistry(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java index f2aca58deb..8782e00a12 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -52,7 +52,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession } @Override - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _ioSender.close(); _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index e3a1a82dc4..ab3ff8ecb0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -259,7 +259,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** * Called when we want to create a new IoTransport session - * @param brokerDetail + * @param brokerDetail */ public void createIoTransportSession(BrokerDetails brokerDetail) { @@ -271,7 +271,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter brokerDetail.useSSL()); _protocolSession.init(); } - + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case @@ -433,12 +433,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param e the exception to propagate * * @see #propagateExceptionToFrameListeners - * @see #propagateExceptionToStateWaiters */ public void propagateExceptionToAllWaiters(Exception e) { + getStateManager().error(e); propagateExceptionToFrameListeners(e); - propagateExceptionToStateWaiters(e); } /** @@ -469,22 +468,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - /** - * This caters for the case where we only need to propogate an exception to the the state manager to interupt any - * thing waiting for a state change. - * - * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. - * - * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal - * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. - * - * @param e the exception to propagate - */ - public void propagateExceptionToStateWaiters(Exception e) - { - getStateManager().error(e); - } - public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -601,7 +584,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter { _protocolLogger.debug(String.format("SEND: [%s] %s", this, message)); } - + final long sentMessages = _messagesOut++; final boolean debug = _logger.isDebugEnabled(); @@ -667,7 +650,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter throw _lastFailoverException; } - if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED) + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || + _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) { Exception e = _stateManager.getLastException(); if (e != null) @@ -733,25 +717,31 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void closeConnection(long timeout) throws AMQException { - getStateManager().changeState(AMQState.CONNECTION_CLOSING); - ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); - } - catch (AMQTimeoutException e) + //If the connection is already closed then don't do a syncWrite + if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { _protocolSession.closeProtocolSession(false); } - catch (FailoverException e) + else { - _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 5e12a5e6f8..0e872170aa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -410,12 +410,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); } - public void closeProtocolSession() throws AMQException + public void closeProtocolSession() { closeProtocolSession(true); } - public void closeProtocolSession(boolean waitLast) throws AMQException + public void closeProtocolSession(boolean waitLast) { _logger.debug("Waiting for last write to join."); if (waitLast && (_lastWriteFuture != null)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index f8645139f2..70d4697f2c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; /** * The state manager is responsible for managing the state of the protocol session. <p/> @@ -86,7 +87,7 @@ public class AMQStateManager implements AMQMethodListener return _currentState; } - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); @@ -136,6 +137,22 @@ public class AMQStateManager implements AMQMethodListener */ public void error(Exception error) { + if (error instanceof AMQException) + { + // AMQException should be being notified before closing the + // ProtocolSession. Which will change the State to CLOSED. + // if we have a hard error. + if (((AMQException)error).isHardError()) + { + changeState(AMQState.CONNECTION_CLOSING); + } + } + else + { + // Be on the safe side here and mark the connection closed + changeState(AMQState.CONNECTION_CLOSED); + } + if (_waiters.size() == 0) { _logger.error("No Waiters for error saving as last error:" + error.getMessage()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index bddbc329ab..ee7fc533a3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -22,10 +22,11 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -36,6 +37,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FlowControllingBlockingQueue { + private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); + /** This queue is bounded and is used to store messages before being dispatched to the consumer */ private final Queue _queue = new ConcurrentLinkedQueue(); @@ -46,6 +49,8 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + + private boolean disableFlowControl; public boolean isEmpty() { @@ -69,6 +74,10 @@ public class FlowControllingBlockingQueue _flowControlHighThreshold = highThreshold; _flowControlLowThreshold = lowThreshold; _listener = listener; + if (highThreshold == 0) + { + disableFlowControl = true; + } } public Object take() throws InterruptedException @@ -84,7 +93,7 @@ public class FlowControllingBlockingQueue } } } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { @@ -93,6 +102,7 @@ public class FlowControllingBlockingQueue _listener.underThreshold(_count); } } + } return o; @@ -106,7 +116,7 @@ public class FlowControllingBlockingQueue notifyAll(); } - if (_listener != null) + if (!disableFlowControl && _listener != null) { synchronized (_listener) { |