summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java63
1 files changed, 26 insertions, 37 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index eb5af119b2..bf520e64ba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -63,10 +63,11 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,7 +121,7 @@ import org.slf4j.LoggerFactory;
* held per protocol handler, per protocol session, per network connection, per channel, in separate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler implements ProtocolEngine
+public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -172,7 +173,9 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _readJob;
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
- private NetworkDriver _networkDriver;
+ private Sender<ByteBuffer> _sender;
+ private NetworkConnection _network;
+ private NetworkTransport _transport;
private ProtocolVersion _suggestedProtocolVersion;
private long _writtenBytes;
@@ -211,21 +214,6 @@ public class AMQProtocolHandler implements ProtocolEngine
}
/**
- * Called when we want to create a new IoTransport session
- * @param brokerDetail
- */
- public void createIoTransportSession(BrokerDetails brokerDetail)
- {
- _protocolSession = new AMQProtocolSession(this, _connection);
- _stateManager.setProtocolSession(_protocolSession);
- IoTransport.connect_0_9(getProtocolSession(),
- brokerDetail.getHost(),
- brokerDetail.getPort(),
- brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
- _protocolSession.init();
- }
-
- /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -315,7 +303,7 @@ public class AMQProtocolHandler implements ProtocolEngine
// failover:
HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
- _networkDriver.close();
+ _sender.close();
}
public void writerIdle()
@@ -337,7 +325,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attempt failover
- _networkDriver.close();
+ _sender.close();
closed();
}
else
@@ -589,7 +577,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
public void run()
{
- _networkDriver.send(buf);
+ _sender.send(buf);
}
});
if (PROTOCOL_DEBUG)
@@ -610,7 +598,7 @@ public class AMQProtocolHandler implements ProtocolEngine
if (wait)
{
- _networkDriver.flush();
+ _sender.flush();
}
}
@@ -724,7 +712,7 @@ public class AMQProtocolHandler implements ProtocolEngine
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _networkDriver.close();
+ _sender.close();
closed();
}
catch (AMQTimeoutException e)
@@ -735,6 +723,10 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
+ finally
+ {
+ _network.close();
+ }
}
_poolReference.releaseExecutorService();
}
@@ -844,17 +836,19 @@ public class AMQProtocolHandler implements ProtocolEngine
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
- public void setNetworkDriver(NetworkDriver driver)
+ public void connect(NetworkTransport transport, NetworkConnection network)
{
- _networkDriver = driver;
+ _transport = transport;
+ _network = network;
+ _sender = network.getSender();
}
/** @param delay delay in seconds (not ms) */
@@ -862,20 +856,15 @@ public class AMQProtocolHandler implements ProtocolEngine
{
if (delay > 0)
{
- getNetworkDriver().setMaxWriteIdle(delay);
- getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+// FIXME
+// _sender.setMaxWriteIdle(delay);
+// _sender.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
- public NetworkDriver getNetworkDriver()
- {
- return _networkDriver;
- }
-
public ProtocolVersion getSuggestedProtocolVersion()
{
return _suggestedProtocolVersion;
}
-
}