diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 52 |
1 files changed, 32 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 34c6468629..eb5af119b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -57,6 +57,7 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.Job; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; @@ -64,9 +65,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,13 +172,11 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + private NetworkDriver _networkDriver; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; - private NetworkTransport _transport; - private NetworkConnection _network; - private Sender<ByteBuffer> _sender; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -213,6 +211,21 @@ public class AMQProtocolHandler implements ProtocolEngine } /** + * Called when we want to create a new IoTransport session + * @param brokerDetail + */ + public void createIoTransportSession(BrokerDetails brokerDetail) + { + _protocolSession = new AMQProtocolSession(this, _connection); + _stateManager.setProtocolSession(_protocolSession); + IoTransport.connect_0_9(getProtocolSession(), + brokerDetail.getHost(), + brokerDetail.getPort(), + brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL)); + _protocolSession.init(); + } + + /** * Called when the network connection is closed. This can happen, either because the client explicitly requested * that the connection be closed, in which case nothing is done, or because the connection died. In the case * where the connection died, an attempt to failover automatically to a new connection may be started. The failover @@ -302,7 +315,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _network.close(); + _networkDriver.close(); } public void writerIdle() @@ -324,7 +337,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _network.close(); + _networkDriver.close(); closed(); } else @@ -576,7 +589,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _sender.send(buf); + _networkDriver.send(buf); } }); if (PROTOCOL_DEBUG) @@ -597,7 +610,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _sender.flush(); + _networkDriver.flush(); } } @@ -711,7 +724,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _network.close(); + _networkDriver.close(); closed(); } catch (AMQTimeoutException e) @@ -831,18 +844,17 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _network.getRemoteAddress(); + return _networkDriver.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _network.getLocalAddress(); + return _networkDriver.getLocalAddress(); } - public void setNetworkConnection(NetworkConnection network) + public void setNetworkDriver(NetworkDriver driver) { - _network = network; - _sender = network.getSender(); + _networkDriver = driver; } /** @param delay delay in seconds (not ms) */ @@ -850,15 +862,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - _network.setMaxWriteIdle(delay); - _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + getNetworkDriver().setMaxWriteIdle(delay); + getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkConnection getNetworkConnection() + public NetworkDriver getNetworkDriver() { - return _network; + return _networkDriver; } public ProtocolVersion getSuggestedProtocolVersion() |