summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java363
1 files changed, 184 insertions, 179 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ab3ff8ecb0..c7e2493025 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
@@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
- private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
-
+ private AMQCodecFactory _codecFactory;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
- *
- * @param session The MINA session.
- *
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
- if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- _protocolSession = new AMQProtocolSession(this, session, _connection);
-
- _stateManager.setProtocolSession(_protocolSession);
-
- _protocolSession.init();
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager = new AMQStateManager(_protocolSession);
+ _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _failoverHandler = new FailoverHandler(this);
}
/**
@@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* may be called first followed by this method. This depends on whether the client was trying to send data at the
* time of the failure.
*
- * @param session The MINA session.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session)
+ public void closed()
{
if (_connection.isClosed())
{
@@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.", null));
+ "Server closed connection and reconnection " + "not permitted.",
+ _stateManager.getLastException()));
}
else
{
@@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter
failoverThread.start();
}
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ @Override
+ public void readerIdle()
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ // failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ _networkDriver.close();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ writeFrame(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
}
/**
- * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
- * IOException, MINA will close the connection automatically.
- *
- * @param session The MINA session.
- * @param cause The exception that triggered this event.
+ * Invoked when any exception is thrown by the NetworkDriver
*/
- public void exceptionCaught(IoSession session, Throwable cause)
+ public void exception(Throwable cause)
{
+ _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
+ // this will attempt failover
+ _networkDriver.close();
+ closed();
}
else
{
@@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
+
propagateExceptionToFrameListeners(e);
}
@@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static int _messageReceivedCount;
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
- if(message instanceof AMQFrame)
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ try
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
+ _readBytes += msg.remaining();
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- if (debug && ((msgNumber % 1000) == 0))
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ @Override
+ public void run()
+ {
+ // Decode buffer
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ for (AMQDataBlock message : dataBlocks)
+ {
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ try
+ {
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
+ }
+ }
+ }
+ }));
}
- else if (message instanceof ProtocolInitiation)
+ catch (Exception e)
{
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
}
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws AMQException
{
@@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
propagateExceptionToFrameListeners(e);
- exceptionCaught(session, e);
+ exception(e);
}
}
private static int _messagesOut;
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(session.getWrittenBytes());
- }
-
public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
return getStateManager().createWaiter(states);
@@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void writeFrame(AMQDataBlock frame)
{
- _protocolSession.writeFrame(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- _protocolSession.writeFrame(frame, wait);
+ ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ _networkDriver.send(buf);
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+ }
+
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && ((sentMessages % 1000) == 0))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+
+ _connection.bytesSent(_writtenBytes);
+
+ if (wait)
+ {
+ _networkDriver.flush();
+ }
}
/**
@@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
- _protocolSession.writeFrame(frame);
+ writeFrame(frame);
return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
@@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQFrame frame = body.generateFrame(0);
//If the connection is already closed then don't do a syncWrite
- if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- _protocolSession.closeProtocolSession(false);
- }
- else
+ if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
+ _networkDriver.close();
+ closed();
}
catch (AMQTimeoutException e)
{
- _protocolSession.closeProtocolSession(false);
+ closed();
}
catch (FailoverException e)
{
@@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
- return _protocolSession.getIoSession().getReadBytes();
+ return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
- return _protocolSession.getIoSession().getWrittenBytes();
+ return _writtenBytes;
}
public void failover(String host, int port)
@@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
@@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolVersion();
}
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _networkDriver;
+ }
}