diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-06-01 14:33:07 +0000 |
| commit | 3b5d4734b777b54b52ce2710f404143aca8c5c6e (patch) | |
| tree | d436e7a5239ec6be725852c12e7ccae975892745 /java/client/src/test | |
| parent | 566e08caa331629a15bedca1d8cfc896886b0497 (diff) | |
| download | qpid-python-3b5d4734b777b54b52ce2710f404143aca8c5c6e.tar.gz | |
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@543496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
4 files changed, 126 insertions, 143 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 51bbe7d0e6..c201e88104 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -14,42 +14,43 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.channelclose; import junit.framework.TestCase; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.jms.JMSException; -import javax.jms.ExceptionListener; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.jms.Queue; +import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.jms.ConnectionListener; -import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -73,15 +74,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con TransportConnection.killAllVMBrokers(); } - /* close channel, use chanel with same id ensure error. - */ - public void testReusingChannelAfterFullClosure() + */ + public void testReusingChannelAfterFullClosure() throws Exception { _connection = newConnection(); - //Create Producer + // Create Producer try { _connection.start(); @@ -113,6 +113,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con { _logger.info("Exception occured was:" + e.getErrorCode()); } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -134,29 +135,27 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con /* close channel and send guff then send ok no errors */ - public void testSendingMethodsAfterClose() + public void testSendingMethodsAfterClose() throws Exception { try { - _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + _connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); ((AMQConnection) _connection).setConnectionListener(this); - _connection.setExceptionListener(this); - //Change the StateManager for one that doesn't respond with Close-OKs + // Change the StateManager for one that doesn't respond with Close-OKs AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); _connection.start(); - //Test connection + // Test connection checkSendingMessage(); - //Set StateManager to manager that ignores Close-oks + // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -214,7 +213,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con createChannelAndTest(TEST_CHANNEL); - //Test connection is still ok + // Test connection is still ok checkSendingMessage(); @@ -248,9 +247,9 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } } - private void createChannelAndTest(int channel) + private void createChannelAndTest(int channel) throws FailoverException { - //Create A channel + // Create A channel try { createChannel(channel); @@ -274,14 +273,14 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = + ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } - private void checkSendingMessage() throws JMSException { TEST++; @@ -307,8 +306,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con AMQConnection connection = null; try { - connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" - + _brokerlist + "'"); + connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'"); connection.setConnectionListener(this); @@ -330,24 +328,24 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con fail("Creating new connection when:" + e.getMessage()); } - return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type if (nowait) { @@ -355,36 +353,31 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, + SYNC_TIMEOUT); } } - private void createChannel(int channelId) throws AMQException + private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); } - public void onException(JMSException jmsException) { - //_logger.info("CCT" + jmsException); + // _logger.info("CCT" + jmsException); fail(jmsException.getMessage()); } public void bytesSent(long count) - { - } + { } public void bytesReceived(long count) - { - - } + { } public boolean preFailover(boolean redirect) { @@ -397,6 +390,5 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } public void failoverComplete() - { - } + { } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index d52707d965..58ac8294f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -22,29 +22,29 @@ package org.apache.qpid.test.unit.close; import junit.framework.TestCase; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; -import javax.jms.TextMessage; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidClientConnection; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends TestCase { @@ -86,7 +86,7 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) // clean up + if (!passed) // clean up { QpidClientConnection conn = new QpidClientConnection(BROKER); @@ -96,6 +96,7 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } + TransportConnection.killVMBroker(1); } @@ -117,7 +118,7 @@ public class MessageRequeueTest extends TestCase final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; - long messageLog[] = new long[numTestMessages + 1]; + long[] messageLog = new long[numTestMessages + 1]; _logger.info("consuming..."); Message msg = consumer.receive(1000); @@ -130,15 +131,13 @@ public class MessageRequeueTest extends TestCase int msgindex = msg.getIntProperty("index"); if (messageLog[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); } if (dt == 0) @@ -148,7 +147,7 @@ public class MessageRequeueTest extends TestCase messageLog[msgindex] = dt; - //get Next message + // get Next message msg = consumer.receive(1000); } @@ -163,7 +162,7 @@ public class MessageRequeueTest extends TestCase for (long b : messageLog) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -175,6 +174,7 @@ public class MessageRequeueTest extends TestCase index++; } + assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); @@ -199,7 +199,7 @@ public class MessageRequeueTest extends TestCase t1.start(); t2.start(); t3.start(); -// t4.start(); + // t4.start(); try { @@ -228,7 +228,7 @@ public class MessageRequeueTest extends TestCase for (long b : receieved) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -237,8 +237,10 @@ public class MessageRequeueTest extends TestCase list.append(b); failed++; } + index++; } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); passed = true; @@ -278,15 +280,14 @@ public class MessageRequeueTest extends TestCase int msgindex = result.getIntProperty("index"); if (receieved[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); } if (dt == 0) @@ -297,9 +298,8 @@ public class MessageRequeueTest extends TestCase receieved[msgindex] = dt; } - count++; - if (count % 100 == 0) + if ((count % 100) == 0) { _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); } @@ -328,11 +328,10 @@ public class MessageRequeueTest extends TestCase } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; -// while (run < 10) + // while (run < 10) { run++; @@ -359,7 +358,6 @@ public class MessageRequeueTest extends TestCase assertNotNull("Message should not be null", msg); - // As we have not ack'd message will be requeued. _logger.debug("Close Consumer"); consumer.close(); @@ -369,4 +367,4 @@ public class MessageRequeueTest extends TestCase } } -}
\ No newline at end of file +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0e718da19b..8d96977df2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.Session; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; /** @@ -62,10 +64,10 @@ public class CommitRollbackTest extends TestCase { TransportConnection.createVMBroker(1); } + testMethod++; queue += testMethod; - newConnection(); } @@ -106,7 +108,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -119,7 +120,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -135,7 +136,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -151,7 +151,7 @@ public class CommitRollbackTest extends TestCase _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -168,7 +168,6 @@ public class CommitRollbackTest extends TestCase assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -335,13 +334,12 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); } - /** * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order * * @throws Exception On error */ - public void testSend2ThenRollback() throws Exception + /*public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -391,7 +389,7 @@ public class CommitRollbackTest extends TestCase } assertNull("test message should be null", result); - } + }*/ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -428,7 +426,7 @@ public class CommitRollbackTest extends TestCase { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else // or it will be msg 2 arriving the first time due to latency. + else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); assertEquals("2", ((TextMessage) result).getText()); @@ -445,7 +443,6 @@ public class CommitRollbackTest extends TestCase } - public void testPutThenRollbackThenGet() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index 195ed79dab..d52da06f76 100644 --- a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -1,25 +1,25 @@ package org.apache.qpid.testutil; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; public class QpidClientConnection implements ExceptionListener { - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); private boolean transacted = true; @@ -40,17 +40,16 @@ public class QpidClientConnection implements ExceptionListener setPrefetch(5000); } - public void connect() throws JMSException { if (!connected) { /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { @@ -63,7 +62,6 @@ public class QpidClientConnection implements ExceptionListener session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - _logger.info("starting connection"); connection.start(); @@ -124,7 +122,6 @@ public class QpidClientConnection implements ExceptionListener this.prefetch = prefetch; } - /** override as necessary */ public void onException(JMSException exception) { @@ -266,4 +263,3 @@ public class QpidClientConnection implements ExceptionListener _logger.info("consumed: " + messagesReceived); } } - |
