diff options
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java')
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java | 77 |
1 files changed, 47 insertions, 30 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 87984e8c49..f1099ca5ab 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -26,7 +26,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.AMQFrame; @@ -60,7 +63,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con Connection _connection; private String _brokerlist = "vm://:1"; private Session _session; - private static final long SYNC_TIMEOUT = 500; + private static final long SYNC_TIMEOUT = 5000; private int TEST = 0; protected void setUp() throws Exception @@ -287,7 +290,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con TEST++; _logger.info("Test creating producer which will use channel id 1"); - Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST); + Queue queue = _session.createTemporaryQueue(); MessageConsumer consumer = _session.createConsumer(queue); @@ -311,7 +314,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con connection.setConnectionListener(this); - _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); @@ -332,31 +335,42 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) - throws AMQException, FailoverException + private void declareExchange(final int channelId, final String _type, final String _name, final boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = - ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type - - if (nowait) - { - ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare); - } - else - { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, - SYNC_TIMEOUT); - } +// new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() +// { +// public Object execute() throws AMQException, FailoverException +// { + + AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler(); + + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + protocolHandler.getProtocolMajorVersion(), + protocolHandler.getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type + + if (nowait) + { + protocolHandler.writeFrame(exchangeDeclare); + } + else + { + protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + } + +// return null; +// } +// }, (AMQConnection)_connection).execute(); + } private void createChannel(int channelId) throws AMQException, FailoverException @@ -375,10 +389,12 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void bytesSent(long count) - { } + { + } public void bytesReceived(long count) - { } + { + } public boolean preFailover(boolean redirect) { @@ -391,5 +407,6 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void failoverComplete() - { } + { + } } |