diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 258 |
1 files changed, 148 insertions, 110 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 284954edba..eb5af119b2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.client.protocol; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,8 +28,10 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -46,7 +46,6 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; @@ -58,13 +57,16 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -162,22 +164,20 @@ public class AMQProtocolHandler implements ProtocolEngine private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ - private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, - Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, - ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); + private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30); /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); private AMQCodecFactory _codecFactory; - + private Job _readJob; + private Job _writeJob; + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; - /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -189,10 +189,43 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); + _poolReference.setThreadFactory(new ThreadFactory() + { + + public Thread newThread(final Runnable runnable) + { + try + { + return Threading.getThreadFactory().createThread(runnable); + } + catch (Exception e) + { + throw new RuntimeException("Failed to create thread", e); + } + } + }); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), + brokerDetail.getPort(), + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -282,7 +315,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _network.close(); + _networkDriver.close(); } public void writerIdle() @@ -304,12 +337,22 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _network.close(); + _networkDriver.close(); closed(); } else { + + if (cause instanceof ProtocolCodecException) + { + _logger.info("Protocol Exception caught NOT going to attempt failover as " + + "cause isn't AMQConnectionClosedException: " + cause, cause); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + propagateExceptionToAllWaiters(amqe); + } _connection.exceptionReceived(cause); + } // FIXME Need to correctly handle other exceptions. Things like ... @@ -403,63 +446,76 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { - _readBytes += msg.remaining(); try { + _readBytes += msg.remaining(); final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - // Decode buffer - - for (AMQDataBlock message : dataBlocks) + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { - if (PROTOCOL_DEBUG) - { - _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); - } + public void run() + { + // Decode buffer - if(message instanceof AMQFrame) + for (AMQDataBlock message : dataBlocks) { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - if (debug && ((msgNumber % 1000) == 0)) + try { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + if (PROTOCOL_DEBUG) + { + _protocolLogger.info(String.format("RECV: [%s] %s", this, message)); + } + + if(message instanceof AMQFrame) + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + bodyFrame.handle(frame.getChannel(), _protocolSession); + + _connection.bytesReceived(_readBytes); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + _suggestedProtocolVersion = protocolInit.checkVersion(); + _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + catch (Exception e) + { + _logger.error("Exception processing frame", e); + propagateExceptionToFrameListeners(e); + exception(e); } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - - bodyFrame.handle(frame.getChannel(), _protocolSession); - - _connection.bytesReceived(_readBytes); - } - else if (message instanceof ProtocolInitiation) - { - // We get here if the server sends a response to our initial protocol header - // suggesting an alternate ProtocolVersion; the server will then close the - // connection. - ProtocolInitiation protocolInit = (ProtocolInitiation) message; - _suggestedProtocolVersion = protocolInit.checkVersion(); - _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion); - - // get round a bug in old versions of qpid whereby the connection is not closed - _stateManager.changeState(AMQState.CONNECTION_CLOSED); } } + }); } catch (Exception e) { - _logger.error("Exception processing frame", e); propagateExceptionToFrameListeners(e); exception(e); } - - } public void methodBodyReceived(final int channelId, final AMQBody bodyFrame) @@ -514,13 +570,28 @@ public class AMQProtocolHandler implements ProtocolEngine return getStateManager().createWaiter(states); } - public synchronized void writeFrame(AMQDataBlock frame) + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) { - final ByteBuffer buf = asByteBuffer(frame); - _writtenBytes += buf.remaining(); - _sender.send(buf); - _sender.flush(); + writeFrame(frame, false); + } + public void writeFrame(AMQDataBlock frame, boolean wait) + { + final ByteBuffer buf = frame.toNioByteBuffer(); + _writtenBytes += buf.remaining(); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + public void run() + { + _networkDriver.send(buf); + } + }); if (PROTOCOL_DEBUG) { _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame)); @@ -537,41 +608,12 @@ public class AMQProtocolHandler implements ProtocolEngine _connection.bytesSent(_writtenBytes); - } - - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize()); - - try + if (wait) { - block.writePayload(new DataOutputStream(new OutputStream() - { - - - @Override - public void write(int b) throws IOException - { - buf.put((byte) b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException - { - buf.put(b, off, len); - } - })); - } - catch (IOException e) - { - throw new RuntimeException(e); + _networkDriver.flush(); } - - buf.flip(); - return buf; } - /** * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to * calling getProtocolSession().write() then waiting for the response. @@ -665,23 +707,24 @@ public class AMQProtocolHandler implements ProtocolEngine * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed * anyway. * - * @param timeout The timeout to wait for an acknowledgment to the close request. + * @param timeout The timeout to wait for an acknowledgement to the close request. * * @throws AMQException If the close fails for any reason. */ public void closeConnection(long timeout) throws AMQException { + ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection."), 0, 0); + + final AMQFrame frame = body.generateFrame(0); + + //If the connection is already closed then don't do a syncWrite if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { - // Connection is already closed then don't do a syncWrite try { - final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); - final AMQFrame frame = body.generateFrame(0); - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _network.close(); + _networkDriver.close(); closed(); } catch (AMQTimeoutException e) @@ -690,9 +733,10 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (FailoverException e) { - _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway."); + _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } } + _poolReference.releaseExecutorService(); } /** @return the number of bytes read from this protocol session */ @@ -800,23 +844,17 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); - } - - public void setNetworkConnection(NetworkConnection network) - { - setNetworkConnection(network, network.getSender()); + return _networkDriver.getLocalAddress(); } - public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender) + public void setNetworkDriver(NetworkDriver driver) { - _network = network; - _sender = sender; + _networkDriver = driver; } /** @param delay delay in seconds (not ms) */ @@ -824,15 +862,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkConnection getNetworkConnection() + public NetworkDriver getNetworkDriver() { - return _network; + return _networkDriver; } public ProtocolVersion getSuggestedProtocolVersion() |