summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java258
1 files changed, 148 insertions, 110 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 284954edba..eb5af119b2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.client.protocol;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -30,8 +28,10 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -46,7 +46,6 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -58,13 +57,16 @@ import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,22 +164,20 @@ public class AMQProtocolHandler implements ProtocolEngine
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
- private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
- Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
- ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+ private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
private AMQCodecFactory _codecFactory;
-
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
-
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -189,10 +189,43 @@ public class AMQProtocolHandler implements ProtocolEngine
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
_codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _poolReference.setThreadFactory(new ThreadFactory()
+ {
+
+ public Thread newThread(final Runnable runnable)
+ {
+ try
+ {
+ return Threading.getThreadFactory().createThread(runnable);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to create thread", e);
+ }
+ }
+ });
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
_failoverHandler = new FailoverHandler(this);
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(),
+ brokerDetail.getPort(),
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -282,7 +315,7 @@ public class AMQProtocolHandler implements ProtocolEngine
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _network.close();
+ _networkDriver.close();
}
public void writerIdle()
@@ -304,12 +337,22 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _network.close();
+ _networkDriver.close();
closed();
}
else
{
+
+ if (cause instanceof ProtocolCodecException)
+ {
+ _logger.info("Protocol Exception caught NOT going to attempt failover as " +
+ "cause isn't AMQConnectionClosedException: " + cause, cause);
+
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ propagateExceptionToAllWaiters(amqe);
+ }
_connection.exceptionReceived(cause);
+
}
// FIXME Need to correctly handle other exceptions. Things like ...
@@ -403,63 +446,76 @@ public class AMQProtocolHandler implements ProtocolEngine
public void received(ByteBuffer msg)
{
- _readBytes += msg.remaining();
try
{
+ _readBytes += msg.remaining();
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- // Decode buffer
-
- for (AMQDataBlock message : dataBlocks)
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
+ public void run()
+ {
+ // Decode buffer
- if(message instanceof AMQFrame)
+ for (AMQDataBlock message : dataBlocks)
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
- if (debug && ((msgNumber % 1000) == 0))
+ try
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ _suggestedProtocolVersion = protocolInit.checkVersion();
+ _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
- bodyFrame.handle(frame.getChannel(), _protocolSession);
-
- _connection.bytesReceived(_readBytes);
- }
- else if (message instanceof ProtocolInitiation)
- {
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- _suggestedProtocolVersion = protocolInit.checkVersion();
- _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
+ });
}
catch (Exception e)
{
- _logger.error("Exception processing frame", e);
propagateExceptionToFrameListeners(e);
exception(e);
}
-
-
}
public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -514,13 +570,28 @@ public class AMQProtocolHandler implements ProtocolEngine
return getStateManager().createWaiter(states);
}
- public synchronized void writeFrame(AMQDataBlock frame)
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
{
- final ByteBuffer buf = asByteBuffer(frame);
- _writtenBytes += buf.remaining();
- _sender.send(buf);
- _sender.flush();
+ writeFrame(frame, false);
+ }
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
if (PROTOCOL_DEBUG)
{
_protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -537,41 +608,12 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection.bytesSent(_writtenBytes);
- }
-
- private ByteBuffer asByteBuffer(AMQDataBlock block)
- {
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
-
- try
+ if (wait)
{
- block.writePayload(new DataOutputStream(new OutputStream()
- {
-
-
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ _networkDriver.flush();
}
-
- buf.flip();
- return buf;
}
-
/**
* Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
* calling getProtocolSession().write() then waiting for the response.
@@ -665,23 +707,24 @@ public class AMQProtocolHandler implements ProtocolEngine
* <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
- * @param timeout The timeout to wait for an acknowledgment to the close request.
+ * @param timeout The timeout to wait for an acknowledgement to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
+ ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
+
+ final AMQFrame frame = body.generateFrame(0);
+
+ //If the connection is already closed then don't do a syncWrite
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
- // Connection is already closed then don't do a syncWrite
try
{
- final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
- final AMQFrame frame = body.generateFrame(0);
-
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _network.close();
+ _networkDriver.close();
closed();
}
catch (AMQTimeoutException e)
@@ -690,9 +733,10 @@ public class AMQProtocolHandler implements ProtocolEngine
}
catch (FailoverException e)
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
+ _poolReference.releaseExecutorService();
}
/** @return the number of bytes read from this protocol session */
@@ -800,23 +844,17 @@ public class AMQProtocolHandler implements ProtocolEngine
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
- }
-
- public void setNetworkConnection(NetworkConnection network)
- {
- setNetworkConnection(network, network.getSender());
+ return _networkDriver.getLocalAddress();
}
- public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ public void setNetworkDriver(NetworkDriver driver)
{
- _network = network;
- _sender = sender;
+ _networkDriver = driver;
}
/** @param delay delay in seconds (not ms) */
@@ -824,15 +862,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkConnection getNetworkConnection()
+ public NetworkDriver getNetworkDriver()
{
- return _network;
+ return _networkDriver;
}
public ProtocolVersion getSuggestedProtocolVersion()