diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol')
6 files changed, 0 insertions, 1791 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java deleted file mode 100644 index eb5af119b2..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ /dev/null @@ -1,881 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.mina.filter.codec.ProtocolCodecException; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverHandler; -import org.apache.qpid.client.failover.FailoverState; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateWaiter; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.io.IoTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the - * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the - * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the - * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP, - * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in - * terms of "message received" and so on. - * - * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is - * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public - * API calls through which an individual connection can be manipulated. This protocol handler talks to the network - * through MINA, in a behind the scenes role; it is not an exposed part of the client API. - * - * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level, - * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in - * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol - * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions - * in the event of failover. See below for more information about this. - * - * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named - * attributes. A more convenient, type-safe, container for session data is provided in the form of - * {@link AMQProtocolSession}. - * - * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session - * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper - * as described above). This event handler is different, because dealing with failover complicates things. To the - * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but - * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot - * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old - * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection - * and the protocol session data is held outside of the MINA IOSession. - * - * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through. - * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from - * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work, - * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Maintain fail-over state. - * <tr><td> - * </table> - * - * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of - * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model separate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so - * that lifecycles of the fields match lifecycles of their containing objects. - */ -public class AMQProtocolHandler implements ProtocolEngine -{ - /** Used for debugging. */ - private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); - private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol"); - private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null); - - private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); - - /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection - * instances and protocol handler instances. - */ - private AMQConnection _connection; - - /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ - private volatile AMQProtocolSession _protocolSession; - - /** Holds the state of the protocol session. */ - private AMQStateManager _stateManager; - - /** Holds the method listeners, */ - private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); - - /** - * We create the failover handler when the session is created since it needs a reference to the IoSession in order - * to be able to send errors during failover back to the client application. The session won't be available in the - * case where we failing over due to a Connection.Redirect message from the broker. - */ - private FailoverHandler _failoverHandler; - - /** - * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly - * attempting failover where it is failing. - */ - private FailoverState _failoverState = FailoverState.NOT_STARTED; - - /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ - private CountDownLatch _failoverLatch; - - /** The last failover exception that occurred */ - private FailoverException _lastFailoverException; - - /** Defines the default timeout to use for synchronous protocol commands. */ - private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); - - /** Object to lock on when changing the latch */ - private Object _failoverLatchChange = new Object(); - private AMQCodecFactory _codecFactory; - private Job _readJob; - private Job _writeJob; - private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); - private NetworkDriver _networkDriver; - private ProtocolVersion _suggestedProtocolVersion; - - private long _writtenBytes; - private long _readBytes; - - /** - * Creates a new protocol handler, associated with the specified client connection instance. - * - * @param con The client connection that this is the event handler for. - */ - public AMQProtocolHandler(AMQConnection con) - { - _connection = con; - _protocolSession = new AMQProtocolSession(this, _connection); - _stateManager = new AMQStateManager(_protocolSession); - _codecFactory = new AMQCodecFactory(false, _protocolSession); - _poolReference.setThreadFactory(new ThreadFactory() - { - - public Thread newThread(final Runnable runnable) - { - try - { - return Threading.getThreadFactory().createThread(runnable); - } - catch (Exception e) - { - throw new RuntimeException("Failed to create thread", e); - } - } - }); - _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); - _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); - _poolReference.acquireExecutorService(); - _failoverHandler = new FailoverHandler(this); - } - - /** - * Called when we want to create a new IoTransport session - * @param brokerDetail - */ - public void createIoTransportSession(BrokerDetails brokerDetail) - { - _protocolSession = new AMQProtocolSession(this, _connection); - _stateManager.setProtocolSession(_protocolSession); - IoTransport.connect_0_9(getProtocolSession(), - brokerDetail.getHost(), - brokerDetail.getPort(), - brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); - _protocolSession.init(); - } - - /** - * Called when the network connection is closed. This can happen, either because the client explicitly requested - * that the connection be closed, in which case nothing is done, or because the connection died. In the case - * where the connection died, an attempt to failover automatically to a new connection may be started. The failover - * process will be started, provided that it is the clients policy to allow failover, and provided that a failover - * has not already been started or failed. - * - * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. - */ - public void closed() - { - if (_connection.isClosed()) - { - _logger.debug("Session closed called by client"); - } - else - { - _logger.debug("Session closed called with failover state currently " + _failoverState); - - // reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.debug("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.debug("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.debug("Failover not allowed by policy."); // or already in progress? - - if (_logger.isDebugEnabled()) - { - _logger.debug(_connection.getFailoverPolicy().toString()); - } - - if (_failoverState != FailoverState.IN_PROGRESS) - { - _logger.debug("sessionClose() not allowed to failover"); - _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", - _stateManager.getLastException())); - } - else - { - _logger.debug("sessionClose() failover in progress"); - } - } - } - - _logger.debug("Protocol Session [" + this + "] closed"); - } - - /** See {@link FailoverHandler} to see rationale for separate thread. */ - private void startFailoverThread() - { - if(!_connection.isClosed()) - { - final Thread failoverThread; - try - { - failoverThread = Threading.getThreadFactory().createThread(_failoverHandler); - } - catch (Exception e) - { - throw new RuntimeException("Failed to create thread", e); - } - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); - } - } - - public void readerIdle() - { - _logger.debug("Protocol Session [" + this + "] idle: reader"); - // failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - _networkDriver.close(); - } - - public void writerIdle() - { - _logger.debug("Protocol Session [" + this + "] idle: reader"); - writeFrame(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - - /** - * Invoked when any exception is thrown by the NetworkDriver - */ - public void exception(Throwable cause) - { - if (_failoverState == FailoverState.NOT_STARTED) - { - // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) - { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attempt failover - _networkDriver.close(); - closed(); - } - else - { - - if (cause instanceof ProtocolCodecException) - { - _logger.info("Protocol Exception caught NOT going to attempt failover as " + - "cause isn't AMQConnectionClosedException: " + cause, cause); - - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToAllWaiters(amqe); - } - _connection.exceptionReceived(cause); - - } - - // FIXME Need to correctly handle other exceptions. Things like ... - // if (cause instanceof AMQChannelClosedException) - // which will cause the JMSSession to end due to a channel close and so that Session needs - // to be removed from the map so we can correctly still call close without an exception when trying to close - // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception - } - // we reach this point if failover was attempted and failed therefore we need to let the calling app - // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) - { - _logger.error("Exception caught by protocol handler: " + cause, cause); - - // we notify the state manager of the error in case we have any clients waiting on a state - // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToAllWaiters(amqe); - _connection.exceptionReceived(cause); - } - } - - /** - * There are two cases where we have other threads potentially blocking for events to be handled by this class. - * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type - * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. - * - * This should be called only when the exception is fatal for the connection. - * - * @param e the exception to propagate - * - * @see #propagateExceptionToFrameListeners - */ - public void propagateExceptionToAllWaiters(Exception e) - { - getStateManager().error(e); - - propagateExceptionToFrameListeners(e); - } - - /** - * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any - * protocol level waits. - * - * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should - * stop waiting and relinquish the Failover lock {@see FailoverHandler}. - * - * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt - * their protocol request and so listen again for the correct frame. - * - * @param e the exception to propagate - */ - public void propagateExceptionToFrameListeners(Exception e) - { - synchronized (_frameListeners) - { - if (!_frameListeners.isEmpty()) - { - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); - } - } - } - } - - public void notifyFailoverStarting() - { - // Set the last exception in the sync block to ensure the ordering with add. - // either this gets done and the add does the ml.error - // or the add completes first and the iterator below will do ml.error - synchronized (_frameListeners) - { - _lastFailoverException = new FailoverException("Failing over about to start"); - } - - //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be - // interrupted unless failover cannot restore the state. - propagateExceptionToFrameListeners(_lastFailoverException); - } - - public void failoverInProgress() - { - _lastFailoverException = null; - } - - private static int _messageReceivedCount; - - - public void received(ByteBuffer msg) - { - try - { - _readBytes += msg.remaining(); - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() - { - - public void run() - { - // Decode buffer - - for (AMQDataBlock message : dataBlocks) - { - - try - { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } - - if(message instanceof AMQFrame) - { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && ((msgNumber % 1000) == 0)) - { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - - bodyFrame.handle(frame.getChannel(), _protocolSession); - - _connection.bytesReceived(_readBytes); - } - else if (message instanceof ProtocolInitiation) - { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - _suggestedProtocolVersion = protocolInit.checkVersion(); - _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - } - catch (Exception e) - { - _logger.error("Exception processing frame", e); - propagateExceptionToFrameListeners(e); - exception(e); - } - } - } - }); - } - catch (Exception e) - { - propagateExceptionToFrameListeners(e); - exception(e); - } - } - - public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) - throws AMQException - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame); - } - - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - synchronized (_frameListeners) - { - if (!_frameListeners.isEmpty()) - { - //This iterator is safe from the error state as the frame listeners always add before they send so their - // will be ready and waiting for this response. - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - } - if (!wasAnyoneInterested) - { - throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners, null); - } - } - catch (AMQException e) - { - propagateExceptionToFrameListeners(e); - - exception(e); - } - - } - - private static int _messagesOut; - - public StateWaiter createWaiter(Set<AMQState> states) throws AMQException - { - return getStateManager().createWaiter(states); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - writeFrame(frame, false); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - final ByteBuffer buf = frame.toNioByteBuffer(); - _writtenBytes += buf.remaining(); - Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() - { - public void run() - { - _networkDriver.send(buf); - } - }); - if (PROTOCOL_DEBUG) - { - _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); - } - - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && ((sentMessages % 1000) == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - - _connection.bytesSent(_writtenBytes); - - if (wait) - { - _networkDriver.flush(); - } - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) - throws AMQException, FailoverException - { - return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, - long timeout) throws AMQException, FailoverException - { - try - { - synchronized (_frameListeners) - { - if (_lastFailoverException != null) - { - throw _lastFailoverException; - } - - if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || - _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) - { - Exception e = _stateManager.getLastException(); - if (e != null) - { - if (e instanceof AMQException) - { - AMQException amqe = (AMQException) e; - - throw amqe.cloneForCurrentThread(); - } - else - { - throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e); - } - } - } - - _frameListeners.add(listener); - //FIXME: At this point here we should check or before add we should check _stateManager is in an open - // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 - } - writeFrame(frame); - - return listener.blockForFrame(timeout); - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener - } - finally - { - // If we don't removeKey the listener then no-one will - _frameListeners.remove(listener); - } - - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException - { - return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException - { - return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), - timeout); - } - - public void closeSession(AMQSession session) throws AMQException - { - _protocolSession.closeSession(session); - } - - /** - * Closes the connection. - * - * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed - * anyway. - * - * @param timeout The timeout to wait for an acknowledgement to the close request. - * - * @throws AMQException If the close fails for any reason. - */ - public void closeConnection(long timeout) throws AMQException - { - ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); - - final AMQFrame frame = body.generateFrame(0); - - //If the connection is already closed then don't do a syncWrite - if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) - { - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _networkDriver.close(); - closed(); - } - catch (AMQTimeoutException e) - { - closed(); - } - catch (FailoverException e) - { - _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); - } - } - _poolReference.releaseExecutorService(); - } - - /** @return the number of bytes read from this protocol session */ - public long getReadBytes() - { - return _readBytes; - } - - /** @return the number of bytes written to this protocol session */ - public long getWrittenBytes() - { - return _writtenBytes; - } - - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - - public void blockUntilNotFailingOver() throws InterruptedException - { - synchronized(_failoverLatchChange) - { - if (_failoverLatch != null) - { - if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) - { - - } - } - } - } - - public AMQShortString generateQueueName() - { - return _protocolSession.generateQueueName(); - } - - public CountDownLatch getFailoverLatch() - { - return _failoverLatch; - } - - public void setFailoverLatch(CountDownLatch failoverLatch) - { - synchronized (_failoverLatchChange) - { - _failoverLatch = failoverLatch; - } - } - - public AMQConnection getConnection() - { - return _connection; - } - - public AMQStateManager getStateManager() - { - return _stateManager; - } - - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - _stateManager.setProtocolSession(_protocolSession); - } - - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } - - FailoverState getFailoverState() - { - return _failoverState; - } - - public void setFailoverState(FailoverState failoverState) - { - _failoverState = failoverState; - } - - public byte getProtocolMajorVersion() - { - return _protocolSession.getProtocolMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return _protocolSession.getProtocolMinorVersion(); - } - - public MethodRegistry getMethodRegistry() - { - return _protocolSession.getMethodRegistry(); - } - - public ProtocolVersion getProtocolVersion() - { - return _protocolSession.getProtocolVersion(); - } - - public SocketAddress getRemoteAddress() - { - return _networkDriver.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _networkDriver.getLocalAddress(); - } - - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - getNetworkDriver().setMaxWriteIdle(delay); - getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - - public NetworkDriver getNetworkDriver() - { - return _networkDriver; - } - - public ProtocolVersion getSuggestedProtocolVersion() - { - return _suggestedProtocolVersion; - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java deleted file mode 100644 index 7976760696..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ /dev/null @@ -1,477 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_8; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; - -/** - * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol - * session is still available but clients should not use it to obtain session attributes. - */ -public class AMQProtocolSession implements AMQVersionAwareProtocolSession -{ - protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; - - protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class); - - public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; - - //Usable channels are numbered 1 to <ChannelMax> - public static final int MAX_CHANNEL_MAX = 0xFFFF; - public static final int MIN_USABLE_CHANNEL_NUM = 1; - - protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters"; - - protected static final String AMQ_CONNECTION = "AMQConnection"; - - protected static final String SASL_CLIENT = "SASLClient"; - - /** - * The handler from which this session was created and which is used to handle protocol events. We send failover - * events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; - - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); - - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); - - /** - * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives - * first) with the subsequent content header and content bodies. - */ - private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); - private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; - - /** Counter to ensure unique queue names */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); - - private ProtocolVersion _protocolVersion; -// private VersionSpecificRegistry _registry = -// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); - - private MethodRegistry _methodRegistry = - MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); - - private MethodDispatcher _methodDispatcher; - - protected final AMQConnection _connection; - - private ConnectionTuneParameters _connectionTuneParameters; - - private SaslClient _saslClient; - - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - - public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _protocolVersion = connection.getProtocolVersion(); - _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; - } - - public void init() - { - // start the process of setting up the connection. This is the first place that - // data is written to the server. - _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); - } - - public String getClientID() - { - try - { - return getAMQConnection().getClientID(); - } - catch (JMSException e) - { - // we never throw a JMSException here - return null; - } - } - - public void setClientID(String clientID) throws JMSException - { - getAMQConnection().setClientID(clientID); - } - - public AMQStateManager getStateManager() - { - return _protocolHandler.getStateManager(); - } - - public String getVirtualHost() - { - return getAMQConnection().getVirtualHost(); - } - - public String getUsername() - { - return getAMQConnection().getUsername(); - } - - public String getPassword() - { - return getAMQConnection().getPassword(); - } - - public SaslClient getSaslClient() - { - return _saslClient; - } - - /** - * Store the SASL client currently being used for the authentication handshake - * - * @param client if non-null, stores this in the session. if null clears any existing client being stored - */ - public void setSaslClient(SaslClient client) - { - _saslClient = client; - } - - public ConnectionTuneParameters getConnectionTuneParameters() - { - return _connectionTuneParameters; - } - - public void setConnectionTuneParameters(ConnectionTuneParameters params) - { - _connectionTuneParameters = params; - AMQConnection con = getAMQConnection(); - - con.setMaximumChannelCount(params.getChannelMax()); - con.setMaximumFrameSize(params.getFrameMax()); - _protocolHandler.initHeartbeats((int) params.getHeartbeat()); - } - - /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA - * dispatcher thread. - * - * @param message - * - * @throws AMQException if this was not expected - */ - public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException - { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - _channelId2UnprocessedMsgArray[channelId] = message; - } - else - { - _channelId2UnprocessedMsgMap.put(channelId, message); - } - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException - { - final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] - : _channelId2UnprocessedMsgMap.get(channelId)); - - if (msg == null) - { - throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null); - } - - if (msg.getContentHeader() != null) - { - throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null); - } - - msg.setContentHeader(contentHeader); - if (contentHeader.bodySize == 0) - { - deliverMessageToAMQSession(channelId, msg); - } - } - - public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException - { - UnprocessedMessage_0_8 msg; - final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; - if (fastAccess) - { - msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId]; - } - else - { - msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId); - } - - if (msg == null) - { - throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null); - } - - if (msg.getContentHeader() == null) - { - if (fastAccess) - { - _channelId2UnprocessedMsgArray[channelId] = null; - } - else - { - _channelId2UnprocessedMsgMap.remove(channelId); - } - throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null); - } - - msg.receiveBody(contentBody); - - if (msg.isAllBodyDataReceived()) - { - deliverMessageToAMQSession(channelId, msg); - } - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException - { - - } - - /** - * Deliver a message to the appropriate session, removing the unprocessed message from our map - * - * @param channelId the channel id the message should be delivered to - * @param msg the message - */ - private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) - { - AMQSession session = getSession(channelId); - session.messageReceived(msg); - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - _channelId2UnprocessedMsgArray[channelId] = null; - } - else - { - _channelId2UnprocessedMsgMap.remove(channelId); - } - } - - protected AMQSession getSession(int channelId) - { - return _connection.getSession(channelId); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _protocolHandler.writeFrame(frame); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _protocolHandler.writeFrame(frame, wait); - } - - /** - * Starts the process of closing a session - * - * @param session the AMQSession being closed - */ - public void closeSession(AMQSession session) - { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); - final int channelId = session.getChannelId(); - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to close a channel with id < 0"); - } - // we need to know when a channel is closing so that we can respond - // with a channel.close frame when we receive any other type of frame - // on that channel - _closingChannels.putIfAbsent(channelId, session); - } - - /** - * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is - * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if - * appropriate. - * - * @param channelId the id of the channel (session) - * - * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if - * the channel close is just the server responding to the client's earlier request to close the channel. - */ - public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException - { - - // if this is not a response to an earlier request to close the channel - if (_closingChannels.remove(channelId) == null) - { - final AMQSession session = getSession(channelId); - try - { - session.closed(new AMQException(code, text, null)); - } - catch (JMSException e) - { - throw new AMQException(null, "JMSException received while closing session", e); - } - - return true; - } - else - { - return false; - } - } - - public AMQConnection getAMQConnection() - { - return _connection; - } - - public void closeProtocolSession() throws AMQException - { - _protocolHandler.closeConnection(0); - } - - public void failover(String host, int port) - { - _protocolHandler.failover(host, port); - } - - protected AMQShortString generateQueueName() - { - int id; - synchronized (_queueIdLock) - { - id = _queueId++; - } - // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability - String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_"); - String queueName = "tmp_" + localAddress + "_" + id; - return new AMQShortString(queueName.replaceAll("_+", "_")); - } - - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) - { - final AMQSession session = getSession(channelId); - - session.confirmConsumerCancelled(consumerTag.toIntValue()); - } - - public void setProtocolVersion(final ProtocolVersion pv) - { - _logger.info("Setting ProtocolVersion to :" + pv); - _protocolVersion = pv; - _methodRegistry = MethodRegistry.getMethodRegistry(pv); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); - } - - public byte getProtocolMinorVersion() - { - return _protocolVersion.getMinorVersion(); - } - - public byte getProtocolMajorVersion() - { - return _protocolVersion.getMajorVersion(); - } - - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } - - public MethodRegistry getMethodRegistry() - { - return _methodRegistry; - } - - public MethodDispatcher getMethodDispatcher() - { - return _methodDispatcher; - } - - public void setTicket(int ticket, int channelId) - { - final AMQSession session = getSession(channelId); - session.setTicket(ticket); - } - - public void setMethodDispatcher(MethodDispatcher methodDispatcher) - { - _methodDispatcher = methodDispatcher; - } - - public void setFlowControl(final int channelId, final boolean active) - { - final AMQSession session = getSession(channelId); - session.setFlowControl(active); - } - - public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException - { - _protocolHandler.methodBodyReceived(channel, amqMethodBody); - } - - public void notifyError(Exception error) - { - _protocolHandler.propagateExceptionToAllWaiters(error); - } - - public void setSender(Sender<java.nio.ByteBuffer> sender) - { - // No-op, interface munging - } - - - @Override - public String toString() - { - return "AMQProtocolSession[" + _connection + ']'; - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java deleted file mode 100644 index 2bc609ebf2..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.util.BlockingWaiter; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; - -/** - * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of - * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or - * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this - * differs from a 'rendezvous' in that sense. - * - * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response. - * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register - * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they - * have been completed. - * - * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to - * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to - * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The - * channel id must be passed to the constructor. - * - * <p/>Errors from the producer are rethrown to the consumer. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent} - * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody} - * <tr><td> Block until a method is handled by the delegated to handler. - * <tr><td> Propagate the most recent exception to the consumer. - * </table> - * - * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a - * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations - * seem to use it. So wrapping the listeners is possible. - * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to - * check that SynchronousQueue has a non-blocking put method available. - */ -public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener -{ - - /** Holds the channel id for the channel upon which this listener is waiting for a response. */ - protected int _channelId; - - /** - * Creates a new method listener, that filters incoming method to just those that match the specified channel id. - * - * @param channelId The channel id to filter incoming methods with. - */ - public BlockingMethodFrameListener(int channelId) - { - _channelId = channelId; - } - - /** - * Delegates any additional handling of the incoming methods to another handler. - * - * @param channelId The channel id of the incoming method. - * @param frame The method body. - * - * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise. - */ - public abstract boolean processMethod(int channelId, AMQMethodBody frame); - - public boolean process(AMQMethodEvent evt) - { - AMQMethodBody method = evt.getMethod(); - - return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - } - - /** - * Informs this listener that an AMQP method has been received. - * - * @param evt The AMQP method. - * - * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise. - */ - public boolean methodReceived(AMQMethodEvent evt) - { - return received(evt); - } - - /** - * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout - * has passed. - * - * @param timeout The timeout in milliseconds. - * - * @return The AMQP method that was received. - * - * @throws AMQException - * @throws FailoverException - */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException - { - try - { - return (AMQMethodEvent) block(timeout); - } - finally - { - //Prevent any more errors being notified to this waiter. - close(); - } - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java deleted file mode 100644 index 35ea44a331..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class HeartbeatConfig -{ - private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class); - static final HeartbeatConfig CONFIG = new HeartbeatConfig(); - - /** - * The factor used to get the timeout from the delay between heartbeats. - */ - private float timeoutFactor = 2; - - HeartbeatConfig() - { - String property = System.getProperty("amqj.heartbeat.timeoutFactor"); - if (property != null) - { - try - { - timeoutFactor = Float.parseFloat(property); - } - catch (NumberFormatException e) - { - _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property); - } - } - } - - float getTimeoutFactor() - { - return timeoutFactor; - } - - int getTimeout(int writeDelay) - { - return (int) (timeoutFactor * writeDelay); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java deleted file mode 100644 index d44faeab04..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -class HeartbeatDiagnostics -{ - private static final Diagnostics _impl = init(); - - private static Diagnostics init() - { - return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off(); - } - - static void sent() - { - _impl.sent(); - } - - static void timeout() - { - _impl.timeout(); - } - - static void received(boolean heartbeat) - { - _impl.received(heartbeat); - } - - static void init(int delay, int timeout) - { - _impl.init(delay, timeout); - } - - private static interface Diagnostics - { - void sent(); - void timeout(); - void received(boolean heartbeat); - void init(int delay, int timeout); - } - - private static class On implements Diagnostics - { - private final String[] messages = new String[50]; - private int i; - - private void save(String msg) - { - messages[i++] = msg; - if(i >= messages.length){ - i = 0;//i.e. a circular buffer - } - } - - public void sent() - { - save(System.currentTimeMillis() + ": sent heartbeat"); - } - - public void timeout() - { - for(int i = 0; i < messages.length; i++) - { - if(messages[i] != null) - { - System.out.println(messages[i]); - } - } - System.out.println(System.currentTimeMillis() + ": timed out"); - } - - public void received(boolean heartbeat) - { - save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data")); - } - - public void init(int delay, int timeout) - { - System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout); - } - } - - private static class Off implements Diagnostics - { - public void sent() - { - - } - public void timeout() - { - - } - public void received(boolean heartbeat) - { - - } - - public void init(int delay, int timeout) - { - - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java deleted file mode 100644 index bbd0a7b144..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoSession; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message - * when a threshold has been exceeded, and has a frequency configuration so that messages are not output - * too often. - * - */ -public class ProtocolBufferMonitorFilter extends IoFilterAdapter -{ - private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class); - - public static final long DEFAULT_FREQUENCY = 5000; - - public static final int DEFAULT_THRESHOLD = 3000; - - private int _bufferedMessages = 0; - - private int _threshold; - - private long _lastMessageOutputTime; - - private long _outputFrequencyInMillis; - - public ProtocolBufferMonitorFilter() - { - _threshold = DEFAULT_THRESHOLD; - _outputFrequencyInMillis = DEFAULT_FREQUENCY; - } - - public ProtocolBufferMonitorFilter(int threshold, long frequency) - { - _threshold = threshold; - _outputFrequencyInMillis = frequency; - } - - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception - { - _bufferedMessages++; - if (_bufferedMessages > _threshold) - { - long now = System.currentTimeMillis(); - if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis) - { - _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: " - + _bufferedMessages); - _lastMessageOutputTime = now; - } - } - - nextFilter.messageReceived(session, message); - } - - public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception - { - _bufferedMessages--; - nextFilter.messageSent(session, message); - } - - public int getBufferedMessages() - { - return _bufferedMessages; - } - - public int getThreshold() - { - return _threshold; - } - - public void setThreshold(int threshold) - { - _threshold = threshold; - } - - public long getOutputFrequencyInMillis() - { - return _outputFrequencyInMillis; - } - - public void setOutputFrequencyInMillis(long outputFrequencyInMillis) - { - _outputFrequencyInMillis = outputFrequencyInMillis; - } - - public long getLastMessageOutputTime() - { - return _lastMessageOutputTime; - } -} |