diff options
Diffstat (limited to 'java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java')
-rw-r--r-- | java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java index b1bc7b4c8c..911e855d4f 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java @@ -108,7 +108,6 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol public void messageSent(Object frame) throws AMQPException { - _ioSession.write(frame); } @@ -120,7 +119,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol public void sessionIdle(IoSession session, IdleStatus status) throws Exception { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { @@ -148,9 +147,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol _logger.debug("Received heartbeat"); } else { - messageReceived(bodyFrame); - } - // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + messageReceived(frame); + } } public void messageSent(IoSession session, Object message) throws Exception @@ -162,18 +160,19 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol throws Exception { // Need to handle failover - sessionClosed(session); + _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause); + //sessionClosed(session); } public void sessionClosed(IoSession session) throws Exception { // Need to handle failover - _logger.info("Protocol Session [" + this + "] closed"); + _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed"); } public void sessionCreated(IoSession session) throws Exception { - _logger.debug("Protocol session created for session " + _logger.info("Protocol session created for " + this + " session : " + System.identityHashCode(session)); final ProtocolCodecFilter pcf = new ProtocolCodecFilter( @@ -184,7 +183,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol { session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } else + } + else { session.getFilterChain().addLast("protocolFilter", pcf); } @@ -213,12 +213,13 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol e.printStackTrace(); } + _ioSession = session; doAMQPConnectionNegotiation(); } public void sessionOpened(IoSession session) throws Exception { - _logger.debug("Protocol session opened for session " + _logger.info("Protocol session opened for " + this + " : session " + System.identityHashCode(session)); } @@ -230,6 +231,7 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol private void doAMQPConnectionNegotiation() { int i = pv.length - 1; + _logger.debug("Engaging in connection negotiation"); writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); } @@ -257,7 +259,8 @@ public class TransportPhase extends AbstractPhase implements IoHandler, Protocol } /** - * ----------------------------------------------------------- Failover - * section ----------------------------------------------------------- + * ----------------------------------------------------------- + * Failover section + * ----------------------------------------------------------- */ } |