diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 63 |
1 files changed, 26 insertions, 37 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index eb5af119b2..bf520e64ba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -63,10 +63,11 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +121,7 @@ import org.slf4j.LoggerFactory; * held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so * that lifecycles of the fields match lifecycles of their containing objects. */ -public class AMQProtocolHandler implements ProtocolEngine +public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer> { /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class); @@ -172,7 +173,9 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); - private NetworkDriver _networkDriver; + private Sender<ByteBuffer> _sender; + private NetworkConnection _network; + private NetworkTransport _transport; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; @@ -211,21 +214,6 @@ public class AMQProtocolHandler implements ProtocolEngine } /** - * Called when we want to create a new IoTransport session - * @param brokerDetail - */ - public void createIoTransportSession(BrokerDetails brokerDetail) - { - _protocolSession = new AMQProtocolSession(this, _connection); - _stateManager.setProtocolSession(_protocolSession); - IoTransport.connect_0_9(getProtocolSession(), - brokerDetail.getHost(), - brokerDetail.getPort(), - brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); - _protocolSession.init(); - } - - /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -315,7 +303,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _networkDriver.close(); + _sender.close(); } public void writerIdle() @@ -337,7 +325,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _networkDriver.close(); + _sender.close(); closed(); } else @@ -589,7 +577,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); if (PROTOCOL_DEBUG) @@ -610,7 +598,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _networkDriver.flush(); + _sender.flush(); } } @@ -724,7 +712,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _networkDriver.close(); + _sender.close(); closed(); } catch (AMQTimeoutException e) @@ -735,6 +723,10 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); } + finally + { + _network.close(); + } } _poolReference.releaseExecutorService(); } @@ -844,17 +836,19 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } - public void setNetworkDriver(NetworkDriver driver) + public void connect(NetworkTransport transport, NetworkConnection network) { - _networkDriver = driver; + _transport = transport; + _network = network; + _sender = network.getSender(); } /** @param delay delay in seconds (not ms) */ @@ -862,20 +856,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - getNetworkDriver().setMaxWriteIdle(delay); - getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); +// FIXME +// _sender.setMaxWriteIdle(delay); +// _sender.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkDriver getNetworkDriver() - { - return _networkDriver; - } - public ProtocolVersion getSuggestedProtocolVersion() { return _suggestedProtocolVersion; } - } |