diff options
author | Rupert Smith <rupertlssmith@apache.org> | 2007-10-09 13:55:06 +0000 |
---|---|---|
committer | Rupert Smith <rupertlssmith@apache.org> | 2007-10-09 13:55:06 +0000 |
commit | b420d1140114b61adfe03aabaacc1a3f55eb63f9 (patch) | |
tree | f9b8fa56fda7de8aa90a099be56303afd2e2a0f7 | |
parent | 3a07d83bc77eb4f188cf95cdbb918c0ebe237010 (diff) | |
download | qpid-python-b420d1140114b61adfe03aabaacc1a3f55eb63f9.tar.gz |
Merged revisions 583170 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r583170 | rupertlssmith | 2007-10-09 14:49:32 +0100 (Tue, 09 Oct 2007) | 1 line
QPID-256 FailoverTest restored to working order. IOExceptions on connections now trigger fail-over.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@583172 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | dotnet/Qpid.Client.Tests/default.build | 21 | ||||
-rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTest.cs | 390 | ||||
-rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 253 | ||||
-rw-r--r-- | dotnet/Qpid.Client.Tests/log4net.config | 106 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs | 166 | ||||
-rw-r--r-- | dotnet/Qpid.Client/Client/Transport/IoHandler.cs | 5 | ||||
-rw-r--r-- | dotnet/Qpid.Client/qms/FailoverPolicy.cs | 2 | ||||
-rw-r--r-- | dotnet/Qpid.Common/AMQConnectionClosedException.cs | 13 |
8 files changed, 393 insertions, 563 deletions
diff --git a/dotnet/Qpid.Client.Tests/default.build b/dotnet/Qpid.Client.Tests/default.build index e7fb81dae5..d1b1496a8b 100644 --- a/dotnet/Qpid.Client.Tests/default.build +++ b/dotnet/Qpid.Client.Tests/default.build @@ -28,19 +28,22 @@ file="log4net.config" /> </target> + <target name="test" depends="build"> - <nunit2> + <nunit2> <formatter type="${nant.formatter}" usefile="false" /> <test> - <assemblies> - <include name="${build.dir}/${project::get-name()}.tests.dll"/> - </assemblies> - <categories> - <exclude name="Failover"/> - <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/> - </categories> + <assemblies> + <include name="${build.dir}/${project::get-name()}.tests.dll"/> + </assemblies> + <categories> + <!-- The fail-over tests are interactive so should not be run as part of the build. --> + <exclude name="Failover"/> + <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/> + </categories> </test> - </nunit2> + </nunit2> </target> + </project> diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs index 1f1e2f726c..15e2eb6757 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs @@ -19,6 +19,7 @@ * */ using System; +using System.Runtime.InteropServices; using System.Threading; using log4net; using NUnit.Framework; @@ -30,228 +31,289 @@ namespace Apache.Qpid.Client.Tests.failover [TestFixture, Category("Failover")] public class FailoverTest : IConnectionListener { - private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest)); + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest)); - private IConnection _connection; - private IChannel _channel; - private IMessagePublisher _publisher; - private int _count; + /// <summary>Specifies the number of times to run the test cycle.</summary> + const int NUM_MESSAGES = 10; - private IMessageConsumer _consumerOfResponse; + /// <summary>Determines how many messages to send within each commit.</summary> + const int COMMIT_BATCH_SIZE = 1; - void DoFailoverTest(IConnectionInfo info) - { - DoFailoverTest(new AMQConnection(info)); - } + /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary> + //const int SLEEP_MILLIS = 1; - void DoFailoverTest(IConnection connection) - { - AMQConnection amqConnection = (AMQConnection)connection; - amqConnection.ConnectionListener = this; - //Console.WriteLine("connection.url = " + amqConnection.ToURL()); - _connection = connection; - _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); - _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary> + const int TIMEOUT = 10000; - string exchangeName = ExchangeNameDefaults.TOPIC; - string routingKey = "topic1"; + /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary> + const int FAIL_POINT = 5; - string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey); - - new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName); + /// <summary>Specified the ack mode to use for the test.</summary> + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge; - IChannel channel = _channel; + /// <summary>Determines whether this test runs transactionally or not. </summary> + bool transacted = false; - string tempQueueName = channel.GenerateUniqueName(); - channel.DeclareQueue(tempQueueName, false, true, true); - _consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create(); - _consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage); + /// <summary>Holds the connection to run the test over.</summary> + AMQConnection _connection; - _connection.Start(); + /// <summary>Holds the channel for the test message publisher. </summary> + IChannel publishingChannel; - IMessage msg = _channel.CreateTextMessage("Init"); - // FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange) - // FIXME: but the implementation might not like this as it defaults to null rather than "". - msg.ReplyToRoutingKey = tempQueueName; -// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName); - _logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text)); - -// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); - _publisher = _channel.CreatePublisherBuilder() - .WithRoutingKey(routingKey) - .WithExchangeName(exchangeName) - .Create(); - _publisher.Send(msg); - } + /// <summary>Holds the test message publisher. </summary> + IMessagePublisher publisher; - public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey) - { - string queueName = _channel.GenerateUniqueName(); + /// <summary>Used to keep count of the number of messages sent. </summary> + int messagesSent; - // Queue.Declare - _channel.DeclareQueue(queueName, false, true, true); + /// <summary>Used to keep count of the number of messages received. </summary> + int messagesReceived; - // Queue.Bind - _channel.Bind(queueName, exchangeName, routingKey); - return queueName; - } + /// <summary>Used to wait for test completion on. </summary> + private static object testComplete = new Object(); - private void OnConnectionException(Exception e) + /// <summary> + /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection. + /// </summary> + /// [SetUp] + public void Init(IConnectionInfo connectionInfo) { - _logger.Error("Connection exception occurred", e); - } + // Reset all counts. + messagesSent = 0; + messagesReceived = 0; - public void OnMessage(IMessage message) - { - try - { - _logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text); - Thread.Sleep(1000); - _publisher.Send(_channel.CreateTextMessage("Message" + (++_count))); - } - catch (QpidException e) - { - error(e); - } - } + // Create a connection for the test. + _connection = new AMQConnection(connectionInfo); + _connection.ConnectionListener = this; - private void error(Exception e) - { - _logger.Error("exception received", e); - stop(); - } + // Create a consumer to receive the test messages. + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); - private void stop() - { - _logger.Info("Stopping..."); - try - { - _connection.Dispose(); - } - catch (QpidException e) - { - _logger.Error("Failed to shutdown", e); - } - } + string queueName = receivingChannel.GenerateUniqueName(); + receivingChannel.DeclareQueue(queueName, false, true, true); + receivingChannel.Bind(queueName, "amq.direct", queueName); - public void BytesSent(long count) - { - } + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); - public void BytesReceived(long count) - { + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + _connection.Start(); + + // Create a publisher to send the test messages. + publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); + publisher = publishingChannel.CreatePublisherBuilder() + .WithRoutingKey(queueName) + .Create(); + + _log.Debug("connection = " + _connection); + _log.Debug("connectionInfo = " + connectionInfo); + _log.Debug("connection.AsUrl = " + _connection.toURL()); + _log.Debug("AcknowledgeMode is " + _acknowledgeMode); } - public bool PreFailover(bool redirect) + /// <summary> + /// Clean up the test connection. + /// </summary> + [TearDown] + public virtual void Shutdown() { - _logger.Info("preFailover(" + redirect + ") called"); - return true; + Thread.Sleep(2000); + _connection.Close(); } - public bool PreResubscribe() + /// <summary> + /// Runs a failover test, building up the connection information from its component parts. In particular the brokers + /// to fail between are seperately added into the connection info. + /// </summary> + /*[Test] + public void TestWithBasicInfo() { - _logger.Info("preResubscribe() called"); - return true; - } + _log.Debug("public void TestWithBasicInfo(): called"); + + // Manually create the connection parameters. + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); - public void FailoverComplete() + Init(connectionInfo); + DoFailoverTest(); + }*/ + + /// <summary> + /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format. + /// </summary> + [Test] + public void TestWithUrl() { - _logger.Info("failoverComplete() called"); + _log.Debug("public void runTestWithUrl(): called"); + + // Parse the connection parameters from a URL. + String clientId = "failover" + DateTime.Now.Ticks; + string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + + "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); + + Init(connectionInfo); + DoFailoverTest(); } - private class MsgListener + /// <summary> + /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent + /// are received within the test time limit. + /// </summary> + /// + /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param> + void DoFailoverTest() { - private IChannel _session; - private IMessagePublisher _publisher; + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called"); - internal MsgListener(IChannel session, string queueName) + for (int i = 1; i <= NUM_MESSAGES; ++i) { - _session = session; - _session.CreateConsumerBuilder(queueName).Create().OnMessage = - new MessageReceivedDelegate(OnMessage); - } + ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent); + //_log.Debug("sending message = " + msg.Text); + publisher.Send(msg); + messagesSent++; - public void OnMessage(IMessage message) - { - try + _log.Debug("messagesSent = " + messagesSent); + + if (transacted) { - _logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text); - if(_publisher == null) - { - _publisher = init(message); - } - reply(message); + publishingChannel.Commit(); } - catch (QpidException e) + + // Prompt the user to cause a failure if at the fail point. + if (i == FAIL_POINT) { -// Error(e); - _logger.Error("yikes", e); // XXX + PromptAndWait("Cause a broker failure now, then press return..."); } + + //Thread.Sleep(SLEEP_MILLIS); } - private void reply(IMessage message) + // Wait for all of the test messages to be received, checking that this occurs within the test time limit. + bool withinTimeout; + + lock(testComplete) + { + withinTimeout = Monitor.Wait(testComplete, TIMEOUT); + } + + if (!withinTimeout) { - string msg = ((ITextMessage) message).Text; - _logger.Info("sending reply - " + msg); - _publisher.Send(_session.CreateTextMessage(msg)); + Assert.Fail("Test timed out, before all messages received."); } - private IMessagePublisher init(IMessage message) + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting"); + } + + /// <summary> + /// Receives all of the test messages. + /// </summary> + /// + /// <param name="message">The newly arrived test message.</param> + public void OnMessage(IMessage message) + { + try { - _logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'", - message.ReplyToExchangeName, message.ReplyToRoutingKey)); + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + message.Acknowledge(); + } + + messagesReceived++; - string exchangeName = message.ReplyToExchangeName; - string routingKey = message.ReplyToRoutingKey; + _log.Debug("messagesReceived = " + messagesReceived); - //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); - return _session.CreatePublisherBuilder() - .WithExchangeName(exchangeName) - .WithRoutingKey(routingKey) - .Create(); + // Check if all of the messages in the test have been received, in which case notify the message producer that the test has + // succesfully completed. + if (messagesReceived == NUM_MESSAGES) + { + lock (testComplete) + { + Monitor.Pulse(testComplete); + } + } + } + catch (QpidException e) + { + _log.Fatal("Exception received. About to stop.", e); + Stop(); } } - [Test] - public void TestFail() + /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary> + /// + /// <param name="message">The message to prompt the user with.</param> + private void PromptAndWait(string message) { - Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed."); + Console.WriteLine("\n" + message); + Console.ReadLine(); } - /*[Test] - public void TestWithBasicInfo() + // <summary>Closes the test connection.</summary> + private void Stop() { - Console.WriteLine("TestWithBasicInfo"); + _log.Debug("Stopping..."); try { - QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); - connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); - connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); - DoFailoverTest(connectionInfo); - while (true) - { - Thread.Sleep(5000); - } + _connection.Close(); } - catch (Exception e) + catch (QpidException e) { - _logger.Error("Exception caught", e); + _log.Debug("Failed to shutdown: ", e); } - }*/ + } + + /// <summary> + /// Called when bytes have been transmitted to the server + /// </summary> + /// + /// <param>count the number of bytes sent in total since the connection was opened</param> + public void BytesSent(long count) {} + + /// <summary> + /// Called when some bytes have been received on a connection + /// </summary> + /// + /// <param>count the number of bytes received in total since the connection was opened</param> + public void BytesReceived(long count) {} + + /// <summary> + /// Called after the infrastructure has detected that failover is required but before attempting failover. + /// </summary> + /// + /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param> + /// + /// <return>true to continue failing over, false to veto failover and raise a connection exception</return> + public bool PreFailover(bool redirect) + { + _log.Debug("public bool PreFailover(bool redirect): called"); + return true; + } -// [Test] -// public void TestWithUrl() -// { -// String clientId = "failover" + DateTime.Now.Ticks; -// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" + -// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; -// -// _logger.Info("url = [" + defaultUrl + "]"); -// -// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]"); -// -// String broker = defaultUrl; -// //new FailoverTest(broker); -// } + /// <summary> + /// Called after connection has been made to another broker after failover has been started but before + /// any resubscription has been done. + /// </summary> + /// + /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in + /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers + /// and consumers are invalidated. + /// </return> + public bool PreResubscribe() + { + _log.Debug("public bool PreResubscribe(): called"); + return true; + } + + /// <summary> + /// Called once failover has completed successfully. This is called irrespective of whether the client has + /// vetoed automatic resubscription. + /// </summary> + public void FailoverComplete() + { + _log.Debug("public void FailoverComplete(): called"); + } } } diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs deleted file mode 100644 index a110891cfc..0000000000 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ /dev/null @@ -1,253 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Runtime.InteropServices; -using System.Threading; -using log4net; -using NUnit.Framework; -using Apache.Qpid.Client.Qms; -using Apache.Qpid.Messaging; - -namespace Apache.Qpid.Client.Tests.failover -{ - [TestFixture, Category("Failover")] - public class FailoverTxTest : IConnectionListener - { - private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest)); - - const int NUM_ITERATIONS = 10; - const int NUM_COMMITED_MESSAGES = 10; - const int NUM_ROLLEDBACK_MESSAGES = 3; - const int SLEEP_MILLIS = 50; - - // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge - AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge; - const bool _noWait = true; // use Receive or ReceiveNoWait - AMQConnection _connection; - - public void OnMessage(IMessage message) - { - try - { - _log.Info("Received: " + ((ITextMessage) message).Text); - if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) - { - _log.Info("client acknowledging"); - message.Acknowledge(); - } - } - catch (QpidException e) - { - Error(e); - } - } - - class NoWaitConsumer - { - FailoverTxTest _failoverTxTest; - IMessageConsumer _consumer; - private bool _noWait; - - internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait) - { - _failoverTxTest = failoverTxTest; - _consumer = channel; - _noWait = noWait; - } - - internal void Run() - { - int messages = 0; - while (messages < NUM_COMMITED_MESSAGES) - { - IMessage msg; - if (_noWait) msg = _consumer.ReceiveNoWait(); - else msg = _consumer.Receive(); - if (msg != null) - { - _log.Info("NoWait received message"); - ++messages; - _failoverTxTest.OnMessage(msg); - } - else - { - Thread.Sleep(1); - } - - } - - } - } - - void DoFailoverTxTest(IConnectionInfo connectionInfo) - { - _connection = new AMQConnection(connectionInfo); - _connection.ConnectionListener = this; - _log.Info("connection = " + _connection); - _log.Info("connectionInfo = " + connectionInfo); - _log.Info("connection.AsUrl = " + _connection.toURL()); - - _log.Info("AcknowledgeMode is " + _acknowledgeMode); - IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); - - string queueName = receivingChannel.GenerateUniqueName(); - - // Queue.Declare - receivingChannel.DeclareQueue(queueName, false, true, true); - - // No need to call Queue.Bind as automatically bound to default direct exchange. - receivingChannel.Bind(queueName, "amq.direct", queueName); - - IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) - .WithPrefetchLow(30) - .WithPrefetchHigh(60).Create(); - bool useThread = false; - if (useThread) - { - NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait); - new Thread(new ThreadStart(noWaitConsumer.Run)).Start(); - } - else - { - consumer.OnMessage = new MessageReceivedDelegate(OnMessage); - } - - _connection.Start(); - - PublishInTx(queueName); - - Thread.Sleep(2000); // Wait a while for last messages. - - _connection.Close(); - _log.Info("FailoverTxText complete"); - } - - private void PublishInTx(string routingKey) - { - _log.Info("sendInTx"); - bool transacted = true; - IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); - IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder() - .WithRoutingKey(routingKey) - .Create(); - - for (int i = 1; i <= NUM_ITERATIONS; ++i) - { - for (int j = 1; j <= NUM_ROLLEDBACK_MESSAGES; ++j) - { - ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " rolledBackMsg=" + j); - _log.Info("sending message = " + msg.Text); - publisher.Send(msg); - Thread.Sleep(SLEEP_MILLIS); - } - if (transacted) publishingChannel.Rollback(); - - for (int j = 1; j <= NUM_COMMITED_MESSAGES; ++j) - { - ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " commitedMsg=" + j); - _log.Info("sending message = " + msg.Text); - publisher.Send(msg); - Thread.Sleep(SLEEP_MILLIS); - } - if (transacted) publishingChannel.Commit(); - } - } - - private void Error(Exception e) - { - _log.Fatal("Exception received. About to stop.", e); - Stop(); - } - - private void Stop() - { - _log.Info("Stopping..."); - try - { - _connection.Close(); - } - catch (QpidException e) - { - _log.Info("Failed to shutdown: ", e); - } - } - - public void BytesSent(long count) - { - } - - public void BytesReceived(long count) - { - } - - public bool PreFailover(bool redirect) - { - _log.Info("preFailover(" + redirect + ") called"); - return true; - } - - public bool PreResubscribe() - { - _log.Info("preResubscribe() called"); - return true; - } - - public void FailoverComplete() - { - _log.Info("failoverComplete() called"); - } - - [Test] - public void TestFail() - { - Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed."); - } - - /*[Test] - public void TestWithBasicInfo() - { - Console.WriteLine("TestWithBasicInfo"); - Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion()); - QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); - - connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); - connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); - - DoFailoverTxTest(connectionInfo); - }*/ - - /*[Test] - public void runTestWithUrl() - { - String clientId = "failover" + DateTime.Now.Ticks; - string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + - "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; - - _log.Info("url = [" + defaultUrl + "]"); - - IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); - - _log.Info("connection url = [" + connectionInfo + "]"); - - DoFailoverTxTest(connectionInfo); - }*/ - } -} diff --git a/dotnet/Qpid.Client.Tests/log4net.config b/dotnet/Qpid.Client.Tests/log4net.config index 4346e0eaeb..8753c9c431 100644 --- a/dotnet/Qpid.Client.Tests/log4net.config +++ b/dotnet/Qpid.Client.Tests/log4net.config @@ -1,62 +1,48 @@ <log4net> - <appender name="console" type="log4net.Appender.ConsoleAppender" > - <layout type="log4net.Layout.PatternLayout"> - <conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" /> - </layout> - <threshold value="info"/> - </appender> - - <appender name="filelog" type="log4net.Appender.FileAppender"> - <file value="qpid_client.log"/> - <appendToFile value="false"/> - <layout type="log4net.Layout.PatternLayout"> - <conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" /> - </layout> - </appender> - - <appender name="protocolLog" type="log4net.Appender.FileAppender"> - <file value="protocol.log"/> - <appendToFile value="false"/> - <layout type="log4net.Layout.PatternLayout"> - <conversionPattern value="%date - %message%newline"/> - </layout> - </appender> - - <appender name="ioLog" type="log4net.Appender.FileAppender"> - <file value="io.log"/> - <appendToFile value="false"/> - <layout type="log4net.Layout.PatternLayout"> - <conversionPattern value="%date - %message%newline"/> - </layout> - <threshold value="info"/> - </appender> - - <appender name="UdpAppender" type="log4net.Appender.UdpAppender"> - <remoteAddress value="127.0.0.1" /> - <remotePort value="4445" /> - <layout type="log4net.Layout.XmlLayoutSchemaLog4j"> - <locationInfo value="true" /> - </layout> - <threshold value="debug"/> - </appender> - - <logger name="Qpid.Client.ProtocolChannel.Tracing" additivity="false"> - <level value="info"/> - <appender-ref ref="protocolLog"/> - </logger> - - <logger name="Qpid.Client.ByteChannel.Tracing" additivity="false"> - <level value="info" /> - <appender-ref ref="ioLog"/> - </logger> - <logger name="Qpid.Framing.FieldTable" additivity="false"> - <level value="debug" /> - <appender-ref ref="console"/> - </logger> - - <root> - <appender-ref ref="console"/> - <appender-ref ref="UdpAppender"/> - <appender-ref ref="filelog"/> - </root> + + <!-- ============================== --> + <!-- Append messages to the console --> + <!-- ============================== --> + + <appender name="console" type="log4net.Appender.ConsoleAppender" > + <layout type="log4net.Layout.PatternLayout"> + <conversionPattern value="%m%n"/> + </layout> + <threshold value="info"/> + </appender> + + <!-- ====================================== --> + <!-- Append messages to the socket appender --> + <!-- ====================================== --> + + <appender name="UdpAppender" type="log4net.Appender.UdpAppender"> + <remoteAddress value="127.0.0.1"/> + <remotePort value="4445"/> + <layout type="log4net.Layout.XmlLayoutSchemaLog4j"> + <locationInfo value="true"/> + </layout> + <threshold value="debug"/> + </appender> + + <!-- ================ --> + <!-- Limit categories --> + <!-- ================ --> + + <logger name="Qpid"> + <level value="debug"/> + </logger> + + <logger name="CONSOLE"> + <level value="info"/> + <appender-ref ref="console"/> + </logger> + + <!-- ======================= --> + <!-- Setup the Root category --> + <!-- ======================= --> + + <root> + <appender-ref ref="UdpAppender"/> + </root> + </log4net>
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs index 326afbe613..3d31d6c2f3 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -29,21 +29,40 @@ using Apache.Qpid.Framing; namespace Apache.Qpid.Client.Protocol { + /// <summary> + /// AMQProtocolListener + /// + /// <p/>Fail-over state transition rules... + /// + /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order + /// to be able to send errors during failover back to the client application. The session won't be available in the case + /// when failing over due to a Connection.Redirect message from the broker. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Track fail over state of a connection. + /// <tr><td> Manage method listeners. <td> IAMQMethodListener + /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler + /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener + /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener + /// </table> + /// + /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being + /// triggered, when it should not be. + /// + /// </summary> public class AMQProtocolListener : IProtocolListener { + /// <summary>Used for debugging.</summary> private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); - /** - * We create the failover handler when the session is created since it needs a reference to the IoSession in order - * to be able to send errors during failover back to the client application. The session won't be available in the - * case where we failing over due to a Connection.Redirect message from the broker. - */ + /// <summary> + /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, + /// the failover process is handed off to this handler. + /// </summary> private FailoverHandler _failoverHandler; - /** - * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly - * attempting failover where it is failing. - */ + /// <summary>Tracks the current fail-over state.</summary> internal FailoverState _failoverState = FailoverState.NOT_STARTED; internal FailoverState FailoverState @@ -63,15 +82,14 @@ namespace Apache.Qpid.Client.Protocol set { _stateManager = value; } } - //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); - AMQProtocolSession _protocolSession = null; // FIXME - public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed? - + AMQProtocolSession _protocolSession = null; private readonly Object _lock = new Object(); + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) { _connection = connection; @@ -138,88 +156,90 @@ namespace Apache.Qpid.Client.Protocol { _log.Debug("HeartBeat received"); } - //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful? } + /// <summary> + /// Receives notification of any IO exceptions on the connection. + /// + /// <p/>Upon receipt of a connection closed exception, the fail-over process is attempted. If the fail-over fails, then all method listeners + /// and the application connection object are notified of the connection failure exception. + /// + /// <p/>This exception handler only deals with AMQConnectionClosedExceptions, any other exception types are thrown back to the caller. + /// </summary> public void OnException(Exception cause) { - _log.Warn("Protocol Listener received exception", cause); - lock (_lock) + _log.Warn("public void OnException(Exception cause = " + cause + "): called"); + + if (cause is AMQConnectionClosedException || cause is System.IO.IOException) { - if (_failoverState == FailoverState.NOT_STARTED) + // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also + // ensures that this exception is fully propagated to all listeners, before another one can be processed. + lock (_lock) { - if (cause is AMQConnectionClosedException) + // Try a fail-over because the connection has failed. + FailoverState failoverState = AttemptFailover(); + + // Check if the fail-over has failed, in which case notify all method listeners of the exception. + // The application connection object is also notified of the failure of the connection with the exception. + if (failoverState == FailoverState.FAILED) { - WhenClosed(); + _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); } } - // We reach this point if failover was attempted and failed therefore we need to let the calling app - // know since we cannot recover the situation. - else if (_failoverState == FailoverState.FAILED) - { - // we notify the state manager of the error in case we have any clients waiting on a state - // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - PropagateExceptionToWaiters(amqe); - _connection.ExceptionReceived(cause); - } + } + // Throw the exception back to the caller if it is not of a known type, to ensure unhandled runtimes are not swallowed. + else + { + throw cause; } } - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. - * - * @param session - * @throws Exception - */ - void WhenClosed() + /// <summary> + /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been + /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress + /// this method allows it to continue to run and will do nothing. + /// + /// <p/>This method should only be called when the connection has been remotely closed. + /// </summary> + /// + /// <returns>The fail-over state at the end of this attempt.</returns> + private FailoverState AttemptFailover() { + _log.Debug("private void AttemptFailover(): called"); + _log.Debug("_failoverState = " + _failoverState); + + // Ensure that the connection stops sending heart beats, if it still is. _connection.StopHeartBeatThread(); - // TODO: Server just closes session with no warning if auth fails. - if (_connection.Closed) + // Check that the connection policy allows fail-over to be attempted. + if (!_connection.IsFailoverAllowed) { - _log.Info("Channel closed called by client"); + _log.Debug("Connection does not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); } - else - { - _log.Info("Channel closed called with failover state currently " + _failoverState); - // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. + // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. + if (_connection.Closed) + { + return _failoverState; + } - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed) - { - _log.Info("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - StartFailoverThread(); - } - else - { - _log.Info("Not starting failover as state currently " + _failoverState); - } - } - else - { - _log.Info("Failover not allowed by policy."); + // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is + // advanced to 'in progress' + if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) + { + _log.Info("Starting the fail-over process."); - if (_failoverState != FailoverState.IN_PROGRESS) - { - _log.Info("sessionClose() not allowed to failover"); - _connection.ExceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection not permitted.")); - } - else - { - _log.Info("sessionClose() failover in progress"); - } - } + _failoverState = FailoverState.IN_PROGRESS; + StartFailoverThread(); } - _log.Info("Protocol Channel [" + this + "] closed"); + return _failoverState; } /// <summary> diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs index 556f9631b3..9ac513069e 100644 --- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs +++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -143,9 +143,10 @@ namespace Apache.Qpid.Client.Transport try
{
_topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
- } catch ( Exception e )
+ }
+ catch (Exception e)
{
- _log.Error("Write caused exception", e);
+ _log.Warn("Write caused exception", e);
_protocolListener.OnException(e);
}
}
diff --git a/dotnet/Qpid.Client/qms/FailoverPolicy.cs b/dotnet/Qpid.Client/qms/FailoverPolicy.cs index d5d1e8d8ac..179a695bf9 100644 --- a/dotnet/Qpid.Client/qms/FailoverPolicy.cs +++ b/dotnet/Qpid.Client/qms/FailoverPolicy.cs @@ -46,7 +46,7 @@ namespace Apache.Qpid.Client.Qms private long _lastMethodTime; private long _lastFailTime; - + public FailoverPolicy(IConnectionInfo connectionInfo) { IFailoverMethod method; diff --git a/dotnet/Qpid.Common/AMQConnectionClosedException.cs b/dotnet/Qpid.Common/AMQConnectionClosedException.cs index f3bc387a5c..136131144b 100644 --- a/dotnet/Qpid.Common/AMQConnectionClosedException.cs +++ b/dotnet/Qpid.Common/AMQConnectionClosedException.cs @@ -24,6 +24,17 @@ using System.Runtime.Serialization; namespace Apache.Qpid { + /// <summary> + /// AMQConnectionClosedException indicates that a connection has been closed. + /// + /// <p/>This exception is really used as an event, in order that the method handler that raises it creates an event + /// which is propagated to the io handler, in order to notify it of the connection closure. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Represents a the closure of a connection. + /// </table> + /// </summary> [Serializable] public class AMQConnectionClosedException : AMQException { @@ -33,7 +44,7 @@ namespace Apache.Qpid } protected AMQConnectionClosedException(SerializationInfo info, StreamingContext ctxt) - : base(info, ctxt) + : base(info, ctxt) { } } |