diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs b/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs new file mode 100644 index 0000000000..142ac40b27 --- /dev/null +++ b/qpid/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs @@ -0,0 +1,397 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.InteropServices; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Integration.Tests.interactive +{ + [TestFixture, Category("Interactive")] + public class FailoverTest : IConnectionListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest)); + + /// <summary>Specifies the number of times to run the test cycle.</summary> + const int NUM_MESSAGES = 10; + + /// <summary>Determines how many messages to send within each commit.</summary> + const int COMMIT_BATCH_SIZE = 1; + + /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary> + //const int SLEEP_MILLIS = 1; + + /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary> + const int TIMEOUT = 10000; + + /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary> + const int FAIL_POINT = 5; + + /// <summary>Specified the ack mode to use for the test.</summary> + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge; + + /// <summary>Determines whether this test runs transactionally or not. </summary> + bool transacted = false; + + /// <summary>Holds the connection to run the test over.</summary> + AMQConnection _connection; + + /// <summary>Holds the channel for the test message publisher. </summary> + IChannel publishingChannel; + + /// <summary>Holds the test message publisher. </summary> + IMessagePublisher publisher; + + /// <summary>Used to keep count of the number of messages sent. </summary> + int messagesSent; + + /// <summary>Used to keep count of the number of messages received. </summary> + int messagesReceived; + + /// <summary>Used to wait for test completion on. </summary> + private static object testComplete = new Object(); + + /// <summary>Used to wait for failover completion on. </summary> + private static object failoverComplete = new Object(); + + bool failedOver=false; + + /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary> + int _extraMessage = 0; + + /// <summary> + /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection. + /// </summary> + /// [SetUp] + public void Init(IConnectionInfo connectionInfo) + { + //log4net.Config.BasicConfigurator.Configure(); + // Reset all counts. + messagesSent = 0; + messagesReceived = 0; + failedOver=false; + _extraMessage = 0; + + PromptAndWait("Ensure both brokers are running, then press Enter"); + + // Create a connection for the test. + _connection = new AMQConnection(connectionInfo); + _connection.ConnectionListener = this; + + // Create a consumer to receive the test messages. + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); + + string queueName = receivingChannel.GenerateUniqueName(); + receivingChannel.DeclareQueue(queueName, false, true, true); + receivingChannel.Bind(queueName, "amq.direct", queueName); + + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); + + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + _connection.Start(); + + // Create a publisher to send the test messages. + publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); + publisher = publishingChannel.CreatePublisherBuilder() + .WithRoutingKey(queueName) + .Create(); + + _log.Debug("connection = " + _connection); + _log.Debug("connectionInfo = " + connectionInfo); + _log.Debug("connection.AsUrl = " + _connection.toURL()); + _log.Debug("AcknowledgeMode is " + _acknowledgeMode); + } + + /// <summary> + /// Clean up the test connection. + /// </summary> + [TearDown] + public virtual void Shutdown() + { + if (!failedOver) + { + Assert.Fail("The failover callback never occured."); + } + + Console.WriteLine("Test done shutting down"); + Thread.Sleep(2000); + _connection.Close(); + } + + /// <summary> + /// Runs a failover test, building up the connection information from its component parts. In particular the brokers + /// to fail between are seperately added into the connection info. + /// </summary> + /*[Test] + public void TestWithBasicInfo() + { + _log.Debug("public void TestWithBasicInfo(): called"); + + // Manually create the connection parameters. + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + + Init(connectionInfo); + DoFailoverTest(); + }*/ + + /// <summary> + /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format. + /// </summary> + [Test] + public void TestWithUrl() + { + _log.Debug("public void runTestWithUrl(): called"); + + // Parse the connection parameters from a URL. + String clientId = "failover" + DateTime.Now.Ticks; + string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + + "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'"; + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); + + Init(connectionInfo); + DoFailoverTest(0); + } + + /// <summary> + /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent + /// are received within the test time limit. + /// </summary> + /// + /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param> + void DoFailoverTest(int delay) + { + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called"); + + // Wait for all of the test messages to be received, checking that this occurs within the test time limit. + bool withinTimeout = false; + + for (int i = 1; i <= NUM_MESSAGES; ++i) + { + SendMessage(); + + // Prompt the user to cause a failure if at the fail point. + if (i == FAIL_POINT) + { + for( int min = delay ; min > 0 ; min--) + { + Console.WriteLine("Waiting for "+min+" minutes to test connection time bug."); + Thread.Sleep(60*1000); + } + + PromptAndWait("Cause a broker failure now, then press return."); + Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second"); + + Console.WriteLine("Sending a message to ensure send right after works"); + + SendMessage(); + + Console.WriteLine("Waiting for fail-over to complete before continuing..."); + + + lock(failoverComplete) + { + if (!failedOver) + { + withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT); + } + else + { + withinTimeout=true; + } + } + + if (!withinTimeout) + { + PromptAndWait("Failover has not yet occured. Press enter to give up waiting."); + } + } + } + + lock(testComplete) + { + withinTimeout = Monitor.Wait(testComplete, TIMEOUT); + } + + if (!withinTimeout) + { + Assert.Fail("Test timed out, before all messages received."); + } + + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting"); + } + + [Test] + public void Test5MinuteWait() + { + String clientId = "failover" + DateTime.Now.Ticks; + + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.Username = "guest"; + connectionInfo.Password = "guest"; + connectionInfo.ClientName = clientId; + connectionInfo.VirtualHost = "/test"; + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false)); + + Init(connectionInfo); + DoFailoverTest(5); + } + + void SendMessage() + { + ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent); + + publisher.Send(msg); + messagesSent++; + + if (transacted) + { + publishingChannel.Commit(); + } + + Console.WriteLine("messagesSent = " + messagesSent); + } + + /// <summary> + /// Receives all of the test messages. + /// </summary> + /// + /// <param name="message">The newly arrived test message.</param> + public void OnMessage(IMessage message) + { + try + { + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + message.Acknowledge(); + } + + messagesReceived++; + + _log.Debug("messagesReceived = " + messagesReceived); + + // Check if all of the messages in the test have been received, in which case notify the message producer that the test has + // succesfully completed. + if (messagesReceived == NUM_MESSAGES + _extraMessage) + { + lock (testComplete) + { + failedOver = true; + Monitor.Pulse(testComplete); + } + } + } + catch (QpidException e) + { + _log.Fatal("Exception received. About to stop.", e); + Stop(); + } + } + + /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary> + /// + /// <param name="message">The message to prompt the user with.</param> + private void PromptAndWait(string message) + { + Console.WriteLine("\n" + message); + Console.ReadLine(); + } + + // <summary>Closes the test connection.</summary> + private void Stop() + { + _log.Debug("Stopping..."); + try + { + _connection.Close(); + } + catch (QpidException e) + { + _log.Debug("Failed to shutdown: ", e); + } + } + + /// <summary> + /// Called when bytes have been transmitted to the server + /// </summary> + /// + /// <param>count the number of bytes sent in total since the connection was opened</param> + public void BytesSent(long count) {} + + /// <summary> + /// Called when some bytes have been received on a connection + /// </summary> + /// + /// <param>count the number of bytes received in total since the connection was opened</param> + public void BytesReceived(long count) {} + + /// <summary> + /// Called after the infrastructure has detected that failover is required but before attempting failover. + /// </summary> + /// + /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param> + /// + /// <return>true to continue failing over, false to veto failover and raise a connection exception</return> + public bool PreFailover(bool redirect) + { + _log.Debug("public bool PreFailover(bool redirect): called"); + return true; + } + + /// <summary> + /// Called after connection has been made to another broker after failover has been started but before + /// any resubscription has been done. + /// </summary> + /// + /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in + /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers + /// and consumers are invalidated. + /// </return> + public bool PreResubscribe() + { + _log.Debug("public bool PreResubscribe(): called"); + return true; + } + + /// <summary> + /// Called once failover has completed successfully. This is called irrespective of whether the client has + /// vetoed automatic resubscription. + /// </summary> + public void FailoverComplete() + { + failedOver = true; + _log.Debug("public void FailoverComplete(): called"); + Console.WriteLine("public void FailoverComplete(): called"); + lock (failoverComplete) + { + Monitor.Pulse(failoverComplete); + } + } + } +} |