diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 881 |
1 files changed, 881 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java new file mode 100644 index 0000000000..eb5af119b2 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -0,0 +1,881 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.mina.filter.codec.ProtocolCodecException; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverHandler; +import org.apache.qpid.client.failover.FailoverState; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateWaiter; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.io.IoTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the + * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the + * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the + * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, + * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in + * terms of "message received" and so on. + * + * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is + * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public + * API calls through which an individual connection can be manipulated. This protocol handler talks to the network + * through MINA, in a behind the scenes role; it is not an exposed part of the client API. + * + * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, + * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in + * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol + * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions + * in the event of failover. See below for more information about this. + * + * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named + * attributes. A more convenient, type-safe, container for session data is provided in the form of + * {@link AMQProtocolSession}. + * + * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session + * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper + * as described above). This event handler is different, because dealing with failover complicates things. To the + * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but + * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot + * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old + * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection + * and the protocol session data is held outside of the MINA IOSession. + * + * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through. + * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from + * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, + * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Maintain fail-over state. + * <tr><td> + * </table> + * + * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model separate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. + */ +public class AMQProtocolHandler implements ProtocolEngine +{ + /** Used for debugging. */ + private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); + private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol"); + private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null); + + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); + + /** + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection + * instances and protocol handler instances. + */ + private AMQConnection _connection; + + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ + private volatile AMQProtocolSession _protocolSession; + + /** Holds the state of the protocol session. */ + private AMQStateManager _stateManager; + + /** Holds the method listeners, */ + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); + + /** + * We create the failover handler when the session is created since it needs a reference to the IoSession in order + * to be able to send errors during failover back to the client application. The session won't be available in the + * case where we failing over due to a Connection.Redirect message from the broker. + */ + private FailoverHandler _failoverHandler; + + /** + * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly + * attempting failover where it is failing. + */ + private FailoverState _failoverState = FailoverState.NOT_STARTED; + + /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ + private CountDownLatch _failoverLatch; + + /** The last failover exception that occurred */ + private FailoverException _lastFailoverException; + + /** Defines the default timeout to use for synchronous protocol commands. */ + private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); + + /** Object to lock on when changing the latch */ + private Object _failoverLatchChange = new Object(); + private AMQCodecFactory _codecFactory; + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; + private ProtocolVersion _suggestedProtocolVersion; + + private long _writtenBytes; + private long _readBytes; + + /** + * Creates a new protocol handler, associated with the specified client connection instance. + * + * @param con The client connection that this is the event handler for. + */ + public AMQProtocolHandler(AMQConnection con) + { + _connection = con; + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager = new AMQStateManager(_protocolSession); + _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); + _failoverHandler = new FailoverHandler(this); + } + + /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), + brokerDetail.getPort(), + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); + _protocolSession.init(); + } + + /** + * Called when the network connection is closed. This can happen, either because the client explicitly requested + * that the connection be closed, in which case nothing is done, or because the connection died. In the case + * where the connection died, an attempt to failover automatically to a new connection may be started. The failover + * process will be started, provided that it is the clients policy to allow failover, and provided that a failover + * has not already been started or failed. + * + * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and + * not otherwise? The above comment doesn't make that clear. + */ + public void closed() + { + if (_connection.isClosed()) + { + _logger.debug("Session closed called by client"); + } + else + { + _logger.debug("Session closed called with failover state currently " + _failoverState); + + // reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + + if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) + { + _logger.debug("FAILOVER STARTING"); + if (_failoverState == FailoverState.NOT_STARTED) + { + _failoverState = FailoverState.IN_PROGRESS; + startFailoverThread(); + } + else + { + _logger.debug("Not starting failover as state currently " + _failoverState); + } + } + else + { + _logger.debug("Failover not allowed by policy."); // or already in progress? + + if (_logger.isDebugEnabled()) + { + _logger.debug(_connection.getFailoverPolicy().toString()); + } + + if (_failoverState != FailoverState.IN_PROGRESS) + { + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.", + _stateManager.getLastException())); + } + else + { + _logger.debug("sessionClose() failover in progress"); + } + } + } + + _logger.debug("Protocol Session [" + this + "] closed"); + } + + /** See {@link FailoverHandler} to see rationale for separate thread. */ + private void startFailoverThread() + { + if(!_connection.isClosed()) + { + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); + } + } + + public void readerIdle() + { + _logger.debug("Protocol Session [" + this + "] idle: reader"); + // failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + _networkDriver.close(); + } + + public void writerIdle() + { + _logger.debug("Protocol Session [" + this + "] idle: reader"); + writeFrame(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); + } + + /** + * Invoked when any exception is thrown by the NetworkDriver + */ + public void exception(Throwable cause) + { + if (_failoverState == FailoverState.NOT_STARTED) + { + // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) + { + _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); + // this will attempt failover + _networkDriver.close(); + closed(); + } + else + { + + if (cause instanceof ProtocolCodecException) + { + _logger.info("Protocol Exception caught NOT going to attempt failover as " + + "cause isn't AMQConnectionClosedException: " + cause, cause); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + propagateExceptionToAllWaiters(amqe); + } + _connection.exceptionReceived(cause); + + } + + // FIXME Need to correctly handle other exceptions. Things like ... + // if (cause instanceof AMQChannelClosedException) + // which will cause the JMSSession to end due to a channel close and so that Session needs + // to be removed from the map so we can correctly still call close without an exception when trying to close + // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception + } + // we reach this point if failover was attempted and failed therefore we need to let the calling app + // know since we cannot recover the situation + else if (_failoverState == FailoverState.FAILED) + { + _logger.error("Exception caught by protocol handler: " + cause, cause); + + // we notify the state manager of the error in case we have any clients waiting on a state + // change. Those "waiters" will be interrupted and can handle the exception + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + propagateExceptionToAllWaiters(amqe); + _connection.exceptionReceived(cause); + } + } + + /** + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. + * + * This should be called only when the exception is fatal for the connection. + * + * @param e the exception to propagate + * + * @see #propagateExceptionToFrameListeners + */ + public void propagateExceptionToAllWaiters(Exception e) + { + getStateManager().error(e); + + propagateExceptionToFrameListeners(e); + } + + /** + * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any + * protocol level waits. + * + * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should + * stop waiting and relinquish the Failover lock {@see FailoverHandler}. + * + * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt + * their protocol request and so listen again for the correct frame. + * + * @param e the exception to propagate + */ + public void propagateExceptionToFrameListeners(Exception e) + { + synchronized (_frameListeners) + { + if (!_frameListeners.isEmpty()) + { + final Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener ml = (AMQMethodListener) it.next(); + ml.error(e); + } + } + } + } + + public void notifyFailoverStarting() + { + // Set the last exception in the sync block to ensure the ordering with add. + // either this gets done and the add does the ml.error + // or the add completes first and the iterator below will do ml.error + synchronized (_frameListeners) + { + _lastFailoverException = new FailoverException("Failing over about to start"); + } + + //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be + // interrupted unless failover cannot restore the state. + propagateExceptionToFrameListeners(_lastFailoverException); + } + + public void failoverInProgress() + { + _lastFailoverException = null; + } + + private static int _messageReceivedCount; + + + public void received(ByteBuffer msg) + { + try + { + _readBytes += msg.remaining(); + final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + { + + public void run() + { + // Decode buffer + + for (AMQDataBlock message : dataBlocks) + { + + try + { + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + + if(message instanceof AMQFrame) + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + _suggestedProtocolVersion = protocolInit.checkVersion(); + _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + catch (Exception e) + { + _logger.error("Exception processing frame", e); + propagateExceptionToFrameListeners(e); + exception(e); + } + } + } + }); + } + catch (Exception e) + { + propagateExceptionToFrameListeners(e); + exception(e); + } + } + + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) + throws AMQException + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame); + } + + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + synchronized (_frameListeners) + { + if (!_frameListeners.isEmpty()) + { + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + } + if (!wasAnyoneInterested) + { + throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners, null); + } + } + catch (AMQException e) + { + propagateExceptionToFrameListeners(e); + + exception(e); + } + + } + + private static int _messagesOut; + + public StateWaiter createWaiter(Set<AMQState> states) throws AMQException + { + return getStateManager().createWaiter(states); + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, false); + } + + public void writeFrame(AMQDataBlock frame, boolean wait) + { + final ByteBuffer buf = frame.toNioByteBuffer(); + _writtenBytes += buf.remaining(); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + public void run() + { + _networkDriver.send(buf); + } + }); + if (PROTOCOL_DEBUG) + { + _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); + } + + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && ((sentMessages % 1000) == 0)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + + _connection.bytesSent(_writtenBytes); + + if (wait) + { + _networkDriver.flush(); + } + } + + /** + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. + * + * @param frame + * @param listener the blocking listener. Note the calling thread will block. + */ + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) + throws AMQException, FailoverException + { + return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); + } + + /** + * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to + * calling getProtocolSession().write() then waiting for the response. + * + * @param frame + * @param listener the blocking listener. Note the calling thread will block. + */ + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, + long timeout) throws AMQException, FailoverException + { + try + { + synchronized (_frameListeners) + { + if (_lastFailoverException != null) + { + throw _lastFailoverException; + } + + if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || + _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) + { + Exception e = _stateManager.getLastException(); + if (e != null) + { + if (e instanceof AMQException) + { + AMQException amqe = (AMQException) e; + + throw amqe.cloneForCurrentThread(); + } + else + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); + } + } + } + + _frameListeners.add(listener); + //FIXME: At this point here we should check or before add we should check _stateManager is in an open + // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 + } + writeFrame(frame); + + return listener.blockForFrame(timeout); + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + finally + { + // If we don't removeKey the listener then no-one will + _frameListeners.remove(listener); + } + + } + + /** More convenient method to write a frame and wait for it's response. */ + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException + { + return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); + } + + /** More convenient method to write a frame and wait for it's response. */ + public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException + { + return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), + timeout); + } + + public void closeSession(AMQSession session) throws AMQException + { + _protocolSession.closeSession(session); + } + + /** + * Closes the connection. + * + * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed + * anyway. + * + * @param timeout The timeout to wait for an acknowledgement to the close request. + * + * @throws AMQException If the close fails for any reason. + */ + public void closeConnection(long timeout) throws AMQException + { + ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection."), 0, 0); + + final AMQFrame frame = body.generateFrame(0); + + //If the connection is already closed then don't do a syncWrite + if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) + { + try + { + syncWrite(frame, ConnectionCloseOkBody.class, timeout); + _networkDriver.close(); + closed(); + } + catch (AMQTimeoutException e) + { + closed(); + } + catch (FailoverException e) + { + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + } + } + _poolReference.releaseExecutorService(); + } + + /** @return the number of bytes read from this protocol session */ + public long getReadBytes() + { + return _readBytes; + } + + /** @return the number of bytes written to this protocol session */ + public long getWrittenBytes() + { + return _writtenBytes; + } + + public void failover(String host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + startFailoverThread(); + } + + public void blockUntilNotFailingOver() throws InterruptedException + { + synchronized(_failoverLatchChange) + { + if (_failoverLatch != null) + { + if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) + { + + } + } + } + } + + public AMQShortString generateQueueName() + { + return _protocolSession.generateQueueName(); + } + + public CountDownLatch getFailoverLatch() + { + return _failoverLatch; + } + + public void setFailoverLatch(CountDownLatch failoverLatch) + { + synchronized (_failoverLatchChange) + { + _failoverLatch = failoverLatch; + } + } + + public AMQConnection getConnection() + { + return _connection; + } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + _stateManager.setProtocolSession(_protocolSession); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + FailoverState getFailoverState() + { + return _failoverState; + } + + public void setFailoverState(FailoverState failoverState) + { + _failoverState = failoverState; + } + + public byte getProtocolMajorVersion() + { + return _protocolSession.getProtocolMajorVersion(); + } + + public byte getProtocolMinorVersion() + { + return _protocolSession.getProtocolMinorVersion(); + } + + public MethodRegistry getMethodRegistry() + { + return _protocolSession.getMethodRegistry(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolSession.getProtocolVersion(); + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + /** @param delay delay in seconds (not ms) */ + void initHeartbeats(int delay) + { + if (delay > 0) + { + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); + } + } + + public NetworkDriver getNetworkDriver() + { + return _networkDriver; + } + + public ProtocolVersion getSuggestedProtocolVersion() + { + return _suggestedProtocolVersion; + } + +} |