summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java881
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java477
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java136
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java61
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java121
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java115
6 files changed, 0 insertions, 1791 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
deleted file mode 100644
index eb5af119b2..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ /dev/null
@@ -1,881 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverHandler;
-import org.apache.qpid.client.failover.FailoverState;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.StateWaiter;
-import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
- * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
- * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
- * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
- * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
- * terms of "message received" and so on.
- *
- * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
- * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
- * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
- * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
- *
- * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
- * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
- * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
- * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
- * in the event of failover. See below for more information about this.
- *
- * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
- * attributes. A more convenient, type-safe, container for session data is provided in the form of
- * {@link AMQProtocolSession}.
- *
- * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
- * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
- * as described above). This event handler is different, because dealing with failover complicates things. To the
- * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
- * behind the scenes a new transport connection, and MINA session will have been created. The MINA session object cannot
- * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
- * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
- * and the protocol session data is held outside of the MINA IOSession.
- *
- * <p/>This handler is responsible for setting up the filter chain to filter all events for this handler through.
- * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
- * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
- * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Maintain fail-over state.
- * <tr><td>
- * </table>
- *
- * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model separate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
- */
-public class AMQProtocolHandler implements ProtocolEngine
-{
- /** Used for debugging. */
- private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
- private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
- private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
-
- private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
-
- /**
- * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
- * instances and protocol handler instances.
- */
- private AMQConnection _connection;
-
- /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */
- private volatile AMQProtocolSession _protocolSession;
-
- /** Holds the state of the protocol session. */
- private AMQStateManager _stateManager;
-
- /** Holds the method listeners, */
- private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
-
- /**
- * We create the failover handler when the session is created since it needs a reference to the IoSession in order
- * to be able to send errors during failover back to the client application. The session won't be available in the
- * case where we failing over due to a Connection.Redirect message from the broker.
- */
- private FailoverHandler _failoverHandler;
-
- /**
- * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
- * attempting failover where it is failing.
- */
- private FailoverState _failoverState = FailoverState.NOT_STARTED;
-
- /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
- private CountDownLatch _failoverLatch;
-
- /** The last failover exception that occurred */
- private FailoverException _lastFailoverException;
-
- /** Defines the default timeout to use for synchronous protocol commands. */
- private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
-
- /** Object to lock on when changing the latch */
- private Object _failoverLatchChange = new Object();
- private AMQCodecFactory _codecFactory;
- private Job _readJob;
- private Job _writeJob;
- private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
- private NetworkDriver _networkDriver;
- private ProtocolVersion _suggestedProtocolVersion;
-
- private long _writtenBytes;
- private long _readBytes;
-
- /**
- * Creates a new protocol handler, associated with the specified client connection instance.
- *
- * @param con The client connection that this is the event handler for.
- */
- public AMQProtocolHandler(AMQConnection con)
- {
- _connection = con;
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager = new AMQStateManager(_protocolSession);
- _codecFactory = new AMQCodecFactory(false, _protocolSession);
- _poolReference.setThreadFactory(new ThreadFactory()
- {
-
- public Thread newThread(final Runnable runnable)
- {
- try
- {
- return Threading.getThreadFactory().createThread(runnable);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- }
- });
- _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
- _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
- _poolReference.acquireExecutorService();
- _failoverHandler = new FailoverHandler(this);
- }
-
- /**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
- * Called when the network connection is closed. This can happen, either because the client explicitly requested
- * that the connection be closed, in which case nothing is done, or because the connection died. In the case
- * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
- * process will be started, provided that it is the clients policy to allow failover, and provided that a failover
- * has not already been started or failed.
- *
- * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
- */
- public void closed()
- {
- if (_connection.isClosed())
- {
- _logger.debug("Session closed called by client");
- }
- else
- {
- _logger.debug("Session closed called with failover state currently " + _failoverState);
-
- // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
-
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
- {
- _logger.debug("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- startFailoverThread();
- }
- else
- {
- _logger.debug("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _logger.debug("Failover not allowed by policy."); // or already in progress?
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug(_connection.getFailoverPolicy().toString());
- }
-
- if (_failoverState != FailoverState.IN_PROGRESS)
- {
- _logger.debug("sessionClose() not allowed to failover");
- _connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
- _stateManager.getLastException()));
- }
- else
- {
- _logger.debug("sessionClose() failover in progress");
- }
- }
- }
-
- _logger.debug("Protocol Session [" + this + "] closed");
- }
-
- /** See {@link FailoverHandler} to see rationale for separate thread. */
- private void startFailoverThread()
- {
- if(!_connection.isClosed())
- {
- final Thread failoverThread;
- try
- {
- failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- failoverThread.setName("Failover");
- // Do not inherit daemon-ness from current thread as this can be a daemon
- // thread such as a AnonymousIoService thread.
- failoverThread.setDaemon(false);
- failoverThread.start();
- }
- }
-
- public void readerIdle()
- {
- _logger.debug("Protocol Session [" + this + "] idle: reader");
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- _networkDriver.close();
- }
-
- public void writerIdle()
- {
- _logger.debug("Protocol Session [" + this + "] idle: reader");
- writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
-
- /**
- * Invoked when any exception is thrown by the NetworkDriver
- */
- public void exception(Throwable cause)
- {
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
- if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
- {
- _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attempt failover
- _networkDriver.close();
- closed();
- }
- else
- {
-
- if (cause instanceof ProtocolCodecException)
- {
- _logger.info("Protocol Exception caught NOT going to attempt failover as " +
- "cause isn't AMQConnectionClosedException: " + cause, cause);
-
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToAllWaiters(amqe);
- }
- _connection.exceptionReceived(cause);
-
- }
-
- // FIXME Need to correctly handle other exceptions. Things like ...
- // if (cause instanceof AMQChannelClosedException)
- // which will cause the JMSSession to end due to a channel close and so that Session needs
- // to be removed from the map so we can correctly still call close without an exception when trying to close
- // the server closed session. See also CloseChannelMethodHandler as the sessionClose is never called on exception
- }
- // we reach this point if failover was attempted and failed therefore we need to let the calling app
- // know since we cannot recover the situation
- else if (_failoverState == FailoverState.FAILED)
- {
- _logger.error("Exception caught by protocol handler: " + cause, cause);
-
- // we notify the state manager of the error in case we have any clients waiting on a state
- // change. Those "waiters" will be interrupted and can handle the exception
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToAllWaiters(amqe);
- _connection.exceptionReceived(cause);
- }
- }
-
- /**
- * There are two cases where we have other threads potentially blocking for events to be handled by this class.
- * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
- * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
- *
- * This should be called only when the exception is fatal for the connection.
- *
- * @param e the exception to propagate
- *
- * @see #propagateExceptionToFrameListeners
- */
- public void propagateExceptionToAllWaiters(Exception e)
- {
- getStateManager().error(e);
-
- propagateExceptionToFrameListeners(e);
- }
-
- /**
- * This caters for the case where we only need to propagate an exception to the the frame listeners to interupt any
- * protocol level waits.
- *
- * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
- * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
- *
- * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
- * their protocol request and so listen again for the correct frame.
- *
- * @param e the exception to propagate
- */
- public void propagateExceptionToFrameListeners(Exception e)
- {
- synchronized (_frameListeners)
- {
- if (!_frameListeners.isEmpty())
- {
- final Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener ml = (AMQMethodListener) it.next();
- ml.error(e);
- }
- }
- }
- }
-
- public void notifyFailoverStarting()
- {
- // Set the last exception in the sync block to ensure the ordering with add.
- // either this gets done and the add does the ml.error
- // or the add completes first and the iterator below will do ml.error
- synchronized (_frameListeners)
- {
- _lastFailoverException = new FailoverException("Failing over about to start");
- }
-
- //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
- // interrupted unless failover cannot restore the state.
- propagateExceptionToFrameListeners(_lastFailoverException);
- }
-
- public void failoverInProgress()
- {
- _lastFailoverException = null;
- }
-
- private static int _messageReceivedCount;
-
-
- public void received(ByteBuffer msg)
- {
- try
- {
- _readBytes += msg.remaining();
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-
- Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
- {
-
- public void run()
- {
- // Decode buffer
-
- for (AMQDataBlock message : dataBlocks)
- {
-
- try
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
-
- if(message instanceof AMQFrame)
- {
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
- {
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- bodyFrame.handle(frame.getChannel(), _protocolSession);
-
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
- }
- }
- catch (Exception e)
- {
- _logger.error("Exception processing frame", e);
- propagateExceptionToFrameListeners(e);
- exception(e);
- }
- }
- }
- });
- }
- catch (Exception e)
- {
- propagateExceptionToFrameListeners(e);
- exception(e);
- }
- }
-
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
- throws AMQException
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- synchronized (_frameListeners)
- {
- if (!_frameListeners.isEmpty())
- {
- //This iterator is safe from the error state as the frame listeners always add before they send so their
- // will be ready and waiting for this response.
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
- }
- }
- catch (AMQException e)
- {
- propagateExceptionToFrameListeners(e);
-
- exception(e);
- }
-
- }
-
- private static int _messagesOut;
-
- public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
- {
- return getStateManager().createWaiter(states);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- writeFrame(frame, false);
- }
-
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- final ByteBuffer buf = frame.toNioByteBuffer();
- _writtenBytes += buf.remaining();
- Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
- {
- public void run()
- {
- _networkDriver.send(buf);
- }
- });
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(_writtenBytes);
-
- if (wait)
- {
- _networkDriver.flush();
- }
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
- throws AMQException, FailoverException
- {
- return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
- * calling getProtocolSession().write() then waiting for the response.
- *
- * @param frame
- * @param listener the blocking listener. Note the calling thread will block.
- */
- public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
- long timeout) throws AMQException, FailoverException
- {
- try
- {
- synchronized (_frameListeners)
- {
- if (_lastFailoverException != null)
- {
- throw _lastFailoverException;
- }
-
- if(_stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED ||
- _stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING)
- {
- Exception e = _stateManager.getLastException();
- if (e != null)
- {
- if (e instanceof AMQException)
- {
- AMQException amqe = (AMQException) e;
-
- throw amqe.cloneForCurrentThread();
- }
- else
- {
- throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
- }
- }
- }
-
- _frameListeners.add(listener);
- //FIXME: At this point here we should check or before add we should check _stateManager is in an open
- // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
- }
- writeFrame(frame);
-
- return listener.blockForFrame(timeout);
- // When control resumes before this line, a reply will have been received
- // that matches the criteria defined in the blocking listener
- }
- finally
- {
- // If we don't removeKey the listener then no-one will
- _frameListeners.remove(listener);
- }
-
- }
-
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException
- {
- return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
- }
-
- /** More convenient method to write a frame and wait for it's response. */
- public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
- {
- return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
- timeout);
- }
-
- public void closeSession(AMQSession session) throws AMQException
- {
- _protocolSession.closeSession(session);
- }
-
- /**
- * Closes the connection.
- *
- * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
- * anyway.
- *
- * @param timeout The timeout to wait for an acknowledgement to the close request.
- *
- * @throws AMQException If the close fails for any reason.
- */
- public void closeConnection(long timeout) throws AMQException
- {
- ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
-
- final AMQFrame frame = body.generateFrame(0);
-
- //If the connection is already closed then don't do a syncWrite
- if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- try
- {
- syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _networkDriver.close();
- closed();
- }
- catch (AMQTimeoutException e)
- {
- closed();
- }
- catch (FailoverException e)
- {
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
- }
- }
- _poolReference.releaseExecutorService();
- }
-
- /** @return the number of bytes read from this protocol session */
- public long getReadBytes()
- {
- return _readBytes;
- }
-
- /** @return the number of bytes written to this protocol session */
- public long getWrittenBytes()
- {
- return _writtenBytes;
- }
-
- public void failover(String host, int port)
- {
- _failoverHandler.setHost(host);
- _failoverHandler.setPort(port);
- // see javadoc for FailoverHandler to see rationale for separate thread
- startFailoverThread();
- }
-
- public void blockUntilNotFailingOver() throws InterruptedException
- {
- synchronized(_failoverLatchChange)
- {
- if (_failoverLatch != null)
- {
- if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
- {
-
- }
- }
- }
- }
-
- public AMQShortString generateQueueName()
- {
- return _protocolSession.generateQueueName();
- }
-
- public CountDownLatch getFailoverLatch()
- {
- return _failoverLatch;
- }
-
- public void setFailoverLatch(CountDownLatch failoverLatch)
- {
- synchronized (_failoverLatchChange)
- {
- _failoverLatch = failoverLatch;
- }
- }
-
- public AMQConnection getConnection()
- {
- return _connection;
- }
-
- public AMQStateManager getStateManager()
- {
- return _stateManager;
- }
-
- public void setStateManager(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- _stateManager.setProtocolSession(_protocolSession);
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- FailoverState getFailoverState()
- {
- return _failoverState;
- }
-
- public void setFailoverState(FailoverState failoverState)
- {
- _failoverState = failoverState;
- }
-
- public byte getProtocolMajorVersion()
- {
- return _protocolSession.getProtocolMajorVersion();
- }
-
- public byte getProtocolMinorVersion()
- {
- return _protocolSession.getProtocolMinorVersion();
- }
-
- public MethodRegistry getMethodRegistry()
- {
- return _protocolSession.getMethodRegistry();
- }
-
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolSession.getProtocolVersion();
- }
-
- public SocketAddress getRemoteAddress()
- {
- return _networkDriver.getRemoteAddress();
- }
-
- public SocketAddress getLocalAddress()
- {
- return _networkDriver.getLocalAddress();
- }
-
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- getNetworkDriver().setMaxWriteIdle(delay);
- getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
- public NetworkDriver getNetworkDriver()
- {
- return _networkDriver;
- }
-
- public ProtocolVersion getSuggestedProtocolVersion()
- {
- return _suggestedProtocolVersion;
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
deleted file mode 100644
index 7976760696..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ /dev/null
@@ -1,477 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
-
-/**
- * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
- * session is still available but clients should not use it to obtain session attributes.
- */
-public class AMQProtocolSession implements AMQVersionAwareProtocolSession
-{
- protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
-
- protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
-
- public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
-
- //Usable channels are numbered 1 to <ChannelMax>
- public static final int MAX_CHANNEL_MAX = 0xFFFF;
- public static final int MIN_USABLE_CHANNEL_NUM = 1;
-
- protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
-
- protected static final String AMQ_CONNECTION = "AMQConnection";
-
- protected static final String SASL_CLIENT = "SASLClient";
-
- /**
- * The handler from which this session was created and which is used to handle protocol events. We send failover
- * events to the handler.
- */
- protected final AMQProtocolHandler _protocolHandler;
-
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
-
- protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
-
- /**
- * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
- * first) with the subsequent content header and content bodies.
- */
- private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
- private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
-
- /** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
-
- private ProtocolVersion _protocolVersion;
-// private VersionSpecificRegistry _registry =
-// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
-
- private MethodRegistry _methodRegistry =
- MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
-
- private MethodDispatcher _methodDispatcher;
-
- protected final AMQConnection _connection;
-
- private ConnectionTuneParameters _connectionTuneParameters;
-
- private SaslClient _saslClient;
-
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
-
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _protocolVersion = connection.getProtocolVersion();
- _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
- }
-
- public void init()
- {
- // start the process of setting up the connection. This is the first place that
- // data is written to the server.
- _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
- }
-
- public String getClientID()
- {
- try
- {
- return getAMQConnection().getClientID();
- }
- catch (JMSException e)
- {
- // we never throw a JMSException here
- return null;
- }
- }
-
- public void setClientID(String clientID) throws JMSException
- {
- getAMQConnection().setClientID(clientID);
- }
-
- public AMQStateManager getStateManager()
- {
- return _protocolHandler.getStateManager();
- }
-
- public String getVirtualHost()
- {
- return getAMQConnection().getVirtualHost();
- }
-
- public String getUsername()
- {
- return getAMQConnection().getUsername();
- }
-
- public String getPassword()
- {
- return getAMQConnection().getPassword();
- }
-
- public SaslClient getSaslClient()
- {
- return _saslClient;
- }
-
- /**
- * Store the SASL client currently being used for the authentication handshake
- *
- * @param client if non-null, stores this in the session. if null clears any existing client being stored
- */
- public void setSaslClient(SaslClient client)
- {
- _saslClient = client;
- }
-
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
-
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- _protocolHandler.initHeartbeats((int) params.getHeartbeat());
- }
-
- /**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
- * dispatcher thread.
- *
- * @param message
- *
- * @throws AMQException if this was not expected
- */
- public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
- {
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- _channelId2UnprocessedMsgArray[channelId] = message;
- }
- else
- {
- _channelId2UnprocessedMsgMap.put(channelId, message);
- }
- }
-
- public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
- {
- final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId));
-
- if (msg == null)
- {
- throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
- }
-
- if (msg.getContentHeader() != null)
- {
- throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
- }
-
- msg.setContentHeader(contentHeader);
- if (contentHeader.bodySize == 0)
- {
- deliverMessageToAMQSession(channelId, msg);
- }
- }
-
- public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
- {
- UnprocessedMessage_0_8 msg;
- final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
- if (fastAccess)
- {
- msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
- }
- else
- {
- msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
- }
-
- if (msg == null)
- {
- throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
- }
-
- if (msg.getContentHeader() == null)
- {
- if (fastAccess)
- {
- _channelId2UnprocessedMsgArray[channelId] = null;
- }
- else
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- }
- throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
- }
-
- msg.receiveBody(contentBody);
-
- if (msg.isAllBodyDataReceived())
- {
- deliverMessageToAMQSession(channelId, msg);
- }
- }
-
- public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
- {
-
- }
-
- /**
- * Deliver a message to the appropriate session, removing the unprocessed message from our map
- *
- * @param channelId the channel id the message should be delivered to
- * @param msg the message
- */
- private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
- {
- AMQSession session = getSession(channelId);
- session.messageReceived(msg);
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- _channelId2UnprocessedMsgArray[channelId] = null;
- }
- else
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- }
- }
-
- protected AMQSession getSession(int channelId)
- {
- return _connection.getSession(channelId);
- }
-
- /**
- * Convenience method that writes a frame to the protocol session. Equivalent to calling
- * getProtocolSession().write().
- *
- * @param frame the frame to write
- */
- public void writeFrame(AMQDataBlock frame)
- {
- _protocolHandler.writeFrame(frame);
- }
-
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _protocolHandler.writeFrame(frame, wait);
- }
-
- /**
- * Starts the process of closing a session
- *
- * @param session the AMQSession being closed
- */
- public void closeSession(AMQSession session)
- {
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
- final int channelId = session.getChannelId();
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to close a channel with id < 0");
- }
- // we need to know when a channel is closing so that we can respond
- // with a channel.close frame when we receive any other type of frame
- // on that channel
- _closingChannels.putIfAbsent(channelId, session);
- }
-
- /**
- * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
- * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
- * appropriate.
- *
- * @param channelId the id of the channel (session)
- *
- * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
- * the channel close is just the server responding to the client's earlier request to close the channel.
- */
- public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
- {
-
- // if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(channelId) == null)
- {
- final AMQSession session = getSession(channelId);
- try
- {
- session.closed(new AMQException(code, text, null));
- }
- catch (JMSException e)
- {
- throw new AMQException(null, "JMSException received while closing session", e);
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- public void closeProtocolSession() throws AMQException
- {
- _protocolHandler.closeConnection(0);
- }
-
- public void failover(String host, int port)
- {
- _protocolHandler.failover(host, port);
- }
-
- protected AMQShortString generateQueueName()
- {
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
- // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
- String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
- String queueName = "tmp_" + localAddress + "_" + id;
- return new AMQShortString(queueName.replaceAll("_+", "_"));
- }
-
- public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
- {
- final AMQSession session = getSession(channelId);
-
- session.confirmConsumerCancelled(consumerTag.toIntValue());
- }
-
- public void setProtocolVersion(final ProtocolVersion pv)
- {
- _logger.info("Setting ProtocolVersion to :" + pv);
- _protocolVersion = pv;
- _methodRegistry = MethodRegistry.getMethodRegistry(pv);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
- }
-
- public byte getProtocolMinorVersion()
- {
- return _protocolVersion.getMinorVersion();
- }
-
- public byte getProtocolMajorVersion()
- {
- return _protocolVersion.getMajorVersion();
- }
-
- public ProtocolVersion getProtocolVersion()
- {
- return _protocolVersion;
- }
-
- public MethodRegistry getMethodRegistry()
- {
- return _methodRegistry;
- }
-
- public MethodDispatcher getMethodDispatcher()
- {
- return _methodDispatcher;
- }
-
- public void setTicket(int ticket, int channelId)
- {
- final AMQSession session = getSession(channelId);
- session.setTicket(ticket);
- }
-
- public void setMethodDispatcher(MethodDispatcher methodDispatcher)
- {
- _methodDispatcher = methodDispatcher;
- }
-
- public void setFlowControl(final int channelId, final boolean active)
- {
- final AMQSession session = getSession(channelId);
- session.setFlowControl(active);
- }
-
- public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
- {
- _protocolHandler.methodBodyReceived(channel, amqMethodBody);
- }
-
- public void notifyError(Exception error)
- {
- _protocolHandler.propagateExceptionToAllWaiters(error);
- }
-
- public void setSender(Sender<java.nio.ByteBuffer> sender)
- {
- // No-op, interface munging
- }
-
-
- @Override
- public String toString()
- {
- return "AMQProtocolSession[" + _connection + ']';
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
deleted file mode 100644
index 2bc609ebf2..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQTimeoutException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.util.BlockingWaiter;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
-
-/**
- * BlockingMethodFrameListener is a 'rendezvous' which acts as a {@link AMQMethodListener} that delegates handling of
- * incoming methods to a method listener implemented as a sub-class of this and hands off the processed method or
- * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
- * differs from a 'rendezvous' in that sense.
- *
- * <p/>BlockingMethodFrameListeners are used to coordinate waiting for replies to method calls that expect a response.
- * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
- * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
- * have been completed.
- *
- * <p/>The {@link #processMethod} must return <tt>true</tt> on any incoming method that it handles. This indicates to
- * this listeners that the method it is waiting for has arrived. Incoming methods are also filtered by channel prior to
- * being passed to the {@link #processMethod} method, so responses are only received for a particular channel. The
- * channel id must be passed to the constructor.
- *
- * <p/>Errors from the producer are rethrown to the consumer.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Accept notification of AMQP method events. <td> {@link AMQMethodEvent}
- * <tr><td> Delegate handling of the method to another method listener. <td> {@link AMQMethodBody}
- * <tr><td> Block until a method is handled by the delegated to handler.
- * <tr><td> Propagate the most recent exception to the consumer.
- * </table>
- *
- * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
- * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
- * seem to use it. So wrapping the listeners is possible.
- * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
- * check that SynchronousQueue has a non-blocking put method available.
- */
-public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
-{
-
- /** Holds the channel id for the channel upon which this listener is waiting for a response. */
- protected int _channelId;
-
- /**
- * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
- *
- * @param channelId The channel id to filter incoming methods with.
- */
- public BlockingMethodFrameListener(int channelId)
- {
- _channelId = channelId;
- }
-
- /**
- * Delegates any additional handling of the incoming methods to another handler.
- *
- * @param channelId The channel id of the incoming method.
- * @param frame The method body.
- *
- * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
- */
- public abstract boolean processMethod(int channelId, AMQMethodBody frame);
-
- public boolean process(AMQMethodEvent evt)
- {
- AMQMethodBody method = evt.getMethod();
-
- return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
- }
-
- /**
- * Informs this listener that an AMQP method has been received.
- *
- * @param evt The AMQP method.
- *
- * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
- */
- public boolean methodReceived(AMQMethodEvent evt)
- {
- return received(evt);
- }
-
- /**
- * Blocks until a method is received that is handled by the delegated to method listener, or the specified timeout
- * has passed.
- *
- * @param timeout The timeout in milliseconds.
- *
- * @return The AMQP method that was received.
- *
- * @throws AMQException
- * @throws FailoverException
- */
- public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
- {
- try
- {
- return (AMQMethodEvent) block(timeout);
- }
- finally
- {
- //Prevent any more errors being notified to this waiter.
- close();
- }
- }
-
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
deleted file mode 100644
index 35ea44a331..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class HeartbeatConfig
-{
- private static final Logger _logger = LoggerFactory.getLogger(HeartbeatConfig.class);
- static final HeartbeatConfig CONFIG = new HeartbeatConfig();
-
- /**
- * The factor used to get the timeout from the delay between heartbeats.
- */
- private float timeoutFactor = 2;
-
- HeartbeatConfig()
- {
- String property = System.getProperty("amqj.heartbeat.timeoutFactor");
- if (property != null)
- {
- try
- {
- timeoutFactor = Float.parseFloat(property);
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
- }
- }
- }
-
- float getTimeoutFactor()
- {
- return timeoutFactor;
- }
-
- int getTimeout(int writeDelay)
- {
- return (int) (timeoutFactor * writeDelay);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java b/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
deleted file mode 100644
index d44faeab04..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-class HeartbeatDiagnostics
-{
- private static final Diagnostics _impl = init();
-
- private static Diagnostics init()
- {
- return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
- }
-
- static void sent()
- {
- _impl.sent();
- }
-
- static void timeout()
- {
- _impl.timeout();
- }
-
- static void received(boolean heartbeat)
- {
- _impl.received(heartbeat);
- }
-
- static void init(int delay, int timeout)
- {
- _impl.init(delay, timeout);
- }
-
- private static interface Diagnostics
- {
- void sent();
- void timeout();
- void received(boolean heartbeat);
- void init(int delay, int timeout);
- }
-
- private static class On implements Diagnostics
- {
- private final String[] messages = new String[50];
- private int i;
-
- private void save(String msg)
- {
- messages[i++] = msg;
- if(i >= messages.length){
- i = 0;//i.e. a circular buffer
- }
- }
-
- public void sent()
- {
- save(System.currentTimeMillis() + ": sent heartbeat");
- }
-
- public void timeout()
- {
- for(int i = 0; i < messages.length; i++)
- {
- if(messages[i] != null)
- {
- System.out.println(messages[i]);
- }
- }
- System.out.println(System.currentTimeMillis() + ": timed out");
- }
-
- public void received(boolean heartbeat)
- {
- save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
- }
-
- public void init(int delay, int timeout)
- {
- System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
- }
- }
-
- private static class Off implements Diagnostics
- {
- public void sent()
- {
-
- }
- public void timeout()
- {
-
- }
- public void received(boolean heartbeat)
- {
-
- }
-
- public void init(int delay, int timeout)
- {
-
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
deleted file mode 100644
index bbd0a7b144..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolBufferMonitorFilter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A MINA filter that monitors the numbers of messages pending to be sent by MINA. It outputs a message
- * when a threshold has been exceeded, and has a frequency configuration so that messages are not output
- * too often.
- *
- */
-public class ProtocolBufferMonitorFilter extends IoFilterAdapter
-{
- private static final Logger _logger = LoggerFactory.getLogger(ProtocolBufferMonitorFilter.class);
-
- public static final long DEFAULT_FREQUENCY = 5000;
-
- public static final int DEFAULT_THRESHOLD = 3000;
-
- private int _bufferedMessages = 0;
-
- private int _threshold;
-
- private long _lastMessageOutputTime;
-
- private long _outputFrequencyInMillis;
-
- public ProtocolBufferMonitorFilter()
- {
- _threshold = DEFAULT_THRESHOLD;
- _outputFrequencyInMillis = DEFAULT_FREQUENCY;
- }
-
- public ProtocolBufferMonitorFilter(int threshold, long frequency)
- {
- _threshold = threshold;
- _outputFrequencyInMillis = frequency;
- }
-
- public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages++;
- if (_bufferedMessages > _threshold)
- {
- long now = System.currentTimeMillis();
- if ((now - _lastMessageOutputTime) > _outputFrequencyInMillis)
- {
- _logger.warn("Protocol message buffer exceeded threshold of " + _threshold + ". Current backlog: "
- + _bufferedMessages);
- _lastMessageOutputTime = now;
- }
- }
-
- nextFilter.messageReceived(session, message);
- }
-
- public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception
- {
- _bufferedMessages--;
- nextFilter.messageSent(session, message);
- }
-
- public int getBufferedMessages()
- {
- return _bufferedMessages;
- }
-
- public int getThreshold()
- {
- return _threshold;
- }
-
- public void setThreshold(int threshold)
- {
- _threshold = threshold;
- }
-
- public long getOutputFrequencyInMillis()
- {
- return _outputFrequencyInMillis;
- }
-
- public void setOutputFrequencyInMillis(long outputFrequencyInMillis)
- {
- _outputFrequencyInMillis = outputFrequencyInMillis;
- }
-
- public long getLastMessageOutputTime()
- {
- return _lastMessageOutputTime;
- }
-}