summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java52
1 files changed, 32 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 34c6468629..eb5af119b2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -57,6 +57,7 @@ import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
@@ -64,9 +65,8 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,13 +172,11 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
private long _readBytes;
- private NetworkTransport _transport;
- private NetworkConnection _network;
- private Sender<ByteBuffer> _sender;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -213,6 +211,21 @@ public class AMQProtocolHandler implements ProtocolEngine
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(),
+ brokerDetail.getPort(),
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -302,7 +315,7 @@ public class AMQProtocolHandler implements ProtocolEngine
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _network.close();
+ _networkDriver.close();
}
public void writerIdle()
@@ -324,7 +337,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _network.close();
+ _networkDriver.close();
closed();
}
else
@@ -576,7 +589,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
public void run()
{
- _sender.send(buf);
+ _networkDriver.send(buf);
}
});
if (PROTOCOL_DEBUG)
@@ -597,7 +610,7 @@ public class AMQProtocolHandler implements ProtocolEngine
if (wait)
{
- _sender.flush();
+ _networkDriver.flush();
}
}
@@ -711,7 +724,7 @@ public class AMQProtocolHandler implements ProtocolEngine
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _network.close();
+ _networkDriver.close();
closed();
}
catch (AMQTimeoutException e)
@@ -831,18 +844,17 @@ public class AMQProtocolHandler implements ProtocolEngine
public SocketAddress getRemoteAddress()
{
- return _network.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _network.getLocalAddress();
+ return _networkDriver.getLocalAddress();
}
- public void setNetworkConnection(NetworkConnection network)
+ public void setNetworkDriver(NetworkDriver driver)
{
- _network = network;
- _sender = network.getSender();
+ _networkDriver = driver;
}
/** @param delay delay in seconds (not ms) */
@@ -850,15 +862,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkConnection getNetworkConnection()
+ public NetworkDriver getNetworkDriver()
{
- return _network;
+ return _networkDriver;
}
public ProtocolVersion getSuggestedProtocolVersion()