summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-10-09 13:55:06 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-10-09 13:55:06 +0000
commitb420d1140114b61adfe03aabaacc1a3f55eb63f9 (patch)
treef9b8fa56fda7de8aa90a099be56303afd2e2a0f7
parent3a07d83bc77eb4f188cf95cdbb918c0ebe237010 (diff)
downloadqpid-python-b420d1140114b61adfe03aabaacc1a3f55eb63f9.tar.gz
Merged revisions 583170 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r583170 | rupertlssmith | 2007-10-09 14:49:32 +0100 (Tue, 09 Oct 2007) | 1 line QPID-256 FailoverTest restored to working order. IOExceptions on connections now trigger fail-over. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@583172 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client.Tests/default.build21
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTest.cs390
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs253
-rw-r--r--dotnet/Qpid.Client.Tests/log4net.config106
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs166
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IoHandler.cs5
-rw-r--r--dotnet/Qpid.Client/qms/FailoverPolicy.cs2
-rw-r--r--dotnet/Qpid.Common/AMQConnectionClosedException.cs13
8 files changed, 393 insertions, 563 deletions
diff --git a/dotnet/Qpid.Client.Tests/default.build b/dotnet/Qpid.Client.Tests/default.build
index e7fb81dae5..d1b1496a8b 100644
--- a/dotnet/Qpid.Client.Tests/default.build
+++ b/dotnet/Qpid.Client.Tests/default.build
@@ -28,19 +28,22 @@
file="log4net.config"
/>
</target>
+
<target name="test" depends="build">
- <nunit2>
+ <nunit2>
<formatter type="${nant.formatter}" usefile="false" />
<test>
- <assemblies>
- <include name="${build.dir}/${project::get-name()}.tests.dll"/>
- </assemblies>
- <categories>
- <exclude name="Failover"/>
- <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/>
- </categories>
+ <assemblies>
+ <include name="${build.dir}/${project::get-name()}.tests.dll"/>
+ </assemblies>
+ <categories>
+ <!-- The fail-over tests are interactive so should not be run as part of the build. -->
+ <exclude name="Failover"/>
+ <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/>
+ </categories>
</test>
- </nunit2>
+ </nunit2>
</target>
+
</project>
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
index 1f1e2f726c..15e2eb6757 100644
--- a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
+++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
@@ -19,6 +19,7 @@
*
*/
using System;
+using System.Runtime.InteropServices;
using System.Threading;
using log4net;
using NUnit.Framework;
@@ -30,228 +31,289 @@ namespace Apache.Qpid.Client.Tests.failover
[TestFixture, Category("Failover")]
public class FailoverTest : IConnectionListener
{
- private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest));
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
- private IConnection _connection;
- private IChannel _channel;
- private IMessagePublisher _publisher;
- private int _count;
+ /// <summary>Specifies the number of times to run the test cycle.</summary>
+ const int NUM_MESSAGES = 10;
- private IMessageConsumer _consumerOfResponse;
+ /// <summary>Determines how many messages to send within each commit.</summary>
+ const int COMMIT_BATCH_SIZE = 1;
- void DoFailoverTest(IConnectionInfo info)
- {
- DoFailoverTest(new AMQConnection(info));
- }
+ /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+ //const int SLEEP_MILLIS = 1;
- void DoFailoverTest(IConnection connection)
- {
- AMQConnection amqConnection = (AMQConnection)connection;
- amqConnection.ConnectionListener = this;
- //Console.WriteLine("connection.url = " + amqConnection.ToURL());
- _connection = connection;
- _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException);
- _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
+ /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+ const int TIMEOUT = 10000;
- string exchangeName = ExchangeNameDefaults.TOPIC;
- string routingKey = "topic1";
+ /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+ const int FAIL_POINT = 5;
- string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey);
-
- new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName);
+ /// <summary>Specified the ack mode to use for the test.</summary>
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
- IChannel channel = _channel;
+ /// <summary>Determines whether this test runs transactionally or not. </summary>
+ bool transacted = false;
- string tempQueueName = channel.GenerateUniqueName();
- channel.DeclareQueue(tempQueueName, false, true, true);
- _consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create();
- _consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage);
+ /// <summary>Holds the connection to run the test over.</summary>
+ AMQConnection _connection;
- _connection.Start();
+ /// <summary>Holds the channel for the test message publisher. </summary>
+ IChannel publishingChannel;
- IMessage msg = _channel.CreateTextMessage("Init");
- // FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange)
- // FIXME: but the implementation might not like this as it defaults to null rather than "".
- msg.ReplyToRoutingKey = tempQueueName;
-// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName);
- _logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text));
-
-// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
- _publisher = _channel.CreatePublisherBuilder()
- .WithRoutingKey(routingKey)
- .WithExchangeName(exchangeName)
- .Create();
- _publisher.Send(msg);
- }
+ /// <summary>Holds the test message publisher. </summary>
+ IMessagePublisher publisher;
- public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey)
- {
- string queueName = _channel.GenerateUniqueName();
+ /// <summary>Used to keep count of the number of messages sent. </summary>
+ int messagesSent;
- // Queue.Declare
- _channel.DeclareQueue(queueName, false, true, true);
+ /// <summary>Used to keep count of the number of messages received. </summary>
+ int messagesReceived;
- // Queue.Bind
- _channel.Bind(queueName, exchangeName, routingKey);
- return queueName;
- }
+ /// <summary>Used to wait for test completion on. </summary>
+ private static object testComplete = new Object();
- private void OnConnectionException(Exception e)
+ /// <summary>
+ /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+ /// </summary>
+ /// [SetUp]
+ public void Init(IConnectionInfo connectionInfo)
{
- _logger.Error("Connection exception occurred", e);
- }
+ // Reset all counts.
+ messagesSent = 0;
+ messagesReceived = 0;
- public void OnMessage(IMessage message)
- {
- try
- {
- _logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text);
- Thread.Sleep(1000);
- _publisher.Send(_channel.CreateTextMessage("Message" + (++_count)));
- }
- catch (QpidException e)
- {
- error(e);
- }
- }
+ // Create a connection for the test.
+ _connection = new AMQConnection(connectionInfo);
+ _connection.ConnectionListener = this;
- private void error(Exception e)
- {
- _logger.Error("exception received", e);
- stop();
- }
+ // Create a consumer to receive the test messages.
+ IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
- private void stop()
- {
- _logger.Info("Stopping...");
- try
- {
- _connection.Dispose();
- }
- catch (QpidException e)
- {
- _logger.Error("Failed to shutdown", e);
- }
- }
+ string queueName = receivingChannel.GenerateUniqueName();
+ receivingChannel.DeclareQueue(queueName, false, true, true);
+ receivingChannel.Bind(queueName, "amq.direct", queueName);
- public void BytesSent(long count)
- {
- }
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
- public void BytesReceived(long count)
- {
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+
+ // Create a publisher to send the test messages.
+ publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+ publisher = publishingChannel.CreatePublisherBuilder()
+ .WithRoutingKey(queueName)
+ .Create();
+
+ _log.Debug("connection = " + _connection);
+ _log.Debug("connectionInfo = " + connectionInfo);
+ _log.Debug("connection.AsUrl = " + _connection.toURL());
+ _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
}
- public bool PreFailover(bool redirect)
+ /// <summary>
+ /// Clean up the test connection.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
{
- _logger.Info("preFailover(" + redirect + ") called");
- return true;
+ Thread.Sleep(2000);
+ _connection.Close();
}
- public bool PreResubscribe()
+ /// <summary>
+ /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+ /// to fail between are seperately added into the connection info.
+ /// </summary>
+ /*[Test]
+ public void TestWithBasicInfo()
{
- _logger.Info("preResubscribe() called");
- return true;
- }
+ _log.Debug("public void TestWithBasicInfo(): called");
+
+ // Manually create the connection parameters.
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
- public void FailoverComplete()
+ Init(connectionInfo);
+ DoFailoverTest();
+ }*/
+
+ /// <summary>
+ /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+ /// </summary>
+ [Test]
+ public void TestWithUrl()
{
- _logger.Info("failoverComplete() called");
+ _log.Debug("public void runTestWithUrl(): called");
+
+ // Parse the connection parameters from a URL.
+ String clientId = "failover" + DateTime.Now.Ticks;
+ string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+ "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+
+ Init(connectionInfo);
+ DoFailoverTest();
}
- private class MsgListener
+ /// <summary>
+ /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+ /// are received within the test time limit.
+ /// </summary>
+ ///
+ /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+ void DoFailoverTest()
{
- private IChannel _session;
- private IMessagePublisher _publisher;
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
- internal MsgListener(IChannel session, string queueName)
+ for (int i = 1; i <= NUM_MESSAGES; ++i)
{
- _session = session;
- _session.CreateConsumerBuilder(queueName).Create().OnMessage =
- new MessageReceivedDelegate(OnMessage);
- }
+ ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+ //_log.Debug("sending message = " + msg.Text);
+ publisher.Send(msg);
+ messagesSent++;
- public void OnMessage(IMessage message)
- {
- try
+ _log.Debug("messagesSent = " + messagesSent);
+
+ if (transacted)
{
- _logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text);
- if(_publisher == null)
- {
- _publisher = init(message);
- }
- reply(message);
+ publishingChannel.Commit();
}
- catch (QpidException e)
+
+ // Prompt the user to cause a failure if at the fail point.
+ if (i == FAIL_POINT)
{
-// Error(e);
- _logger.Error("yikes", e); // XXX
+ PromptAndWait("Cause a broker failure now, then press return...");
}
+
+ //Thread.Sleep(SLEEP_MILLIS);
}
- private void reply(IMessage message)
+ // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+ bool withinTimeout;
+
+ lock(testComplete)
+ {
+ withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+ }
+
+ if (!withinTimeout)
{
- string msg = ((ITextMessage) message).Text;
- _logger.Info("sending reply - " + msg);
- _publisher.Send(_session.CreateTextMessage(msg));
+ Assert.Fail("Test timed out, before all messages received.");
}
- private IMessagePublisher init(IMessage message)
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+ }
+
+ /// <summary>
+ /// Receives all of the test messages.
+ /// </summary>
+ ///
+ /// <param name="message">The newly arrived test message.</param>
+ public void OnMessage(IMessage message)
+ {
+ try
{
- _logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'",
- message.ReplyToExchangeName, message.ReplyToRoutingKey));
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ message.Acknowledge();
+ }
+
+ messagesReceived++;
- string exchangeName = message.ReplyToExchangeName;
- string routingKey = message.ReplyToRoutingKey;
+ _log.Debug("messagesReceived = " + messagesReceived);
- //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
- return _session.CreatePublisherBuilder()
- .WithExchangeName(exchangeName)
- .WithRoutingKey(routingKey)
- .Create();
+ // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
+ // succesfully completed.
+ if (messagesReceived == NUM_MESSAGES)
+ {
+ lock (testComplete)
+ {
+ Monitor.Pulse(testComplete);
+ }
+ }
+ }
+ catch (QpidException e)
+ {
+ _log.Fatal("Exception received. About to stop.", e);
+ Stop();
}
}
- [Test]
- public void TestFail()
+ /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+ ///
+ /// <param name="message">The message to prompt the user with.</param>
+ private void PromptAndWait(string message)
{
- Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed.");
+ Console.WriteLine("\n" + message);
+ Console.ReadLine();
}
- /*[Test]
- public void TestWithBasicInfo()
+ // <summary>Closes the test connection.</summary>
+ private void Stop()
{
- Console.WriteLine("TestWithBasicInfo");
+ _log.Debug("Stopping...");
try
{
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
- DoFailoverTest(connectionInfo);
- while (true)
- {
- Thread.Sleep(5000);
- }
+ _connection.Close();
}
- catch (Exception e)
+ catch (QpidException e)
{
- _logger.Error("Exception caught", e);
+ _log.Debug("Failed to shutdown: ", e);
}
- }*/
+ }
+
+ /// <summary>
+ /// Called when bytes have been transmitted to the server
+ /// </summary>
+ ///
+ /// <param>count the number of bytes sent in total since the connection was opened</param>
+ public void BytesSent(long count) {}
+
+ /// <summary>
+ /// Called when some bytes have been received on a connection
+ /// </summary>
+ ///
+ /// <param>count the number of bytes received in total since the connection was opened</param>
+ public void BytesReceived(long count) {}
+
+ /// <summary>
+ /// Called after the infrastructure has detected that failover is required but before attempting failover.
+ /// </summary>
+ ///
+ /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+ ///
+ /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>
+ public bool PreFailover(bool redirect)
+ {
+ _log.Debug("public bool PreFailover(bool redirect): called");
+ return true;
+ }
-// [Test]
-// public void TestWithUrl()
-// {
-// String clientId = "failover" + DateTime.Now.Ticks;
-// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
-// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
-//
-// _logger.Info("url = [" + defaultUrl + "]");
-//
-// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]");
-//
-// String broker = defaultUrl;
-// //new FailoverTest(broker);
-// }
+ /// <summary>
+ /// Called after connection has been made to another broker after failover has been started but before
+ /// any resubscription has been done.
+ /// </summary>
+ ///
+ /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+ /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+ /// and consumers are invalidated.
+ /// </return>
+ public bool PreResubscribe()
+ {
+ _log.Debug("public bool PreResubscribe(): called");
+ return true;
+ }
+
+ /// <summary>
+ /// Called once failover has completed successfully. This is called irrespective of whether the client has
+ /// vetoed automatic resubscription.
+ /// </summary>
+ public void FailoverComplete()
+ {
+ _log.Debug("public void FailoverComplete(): called");
+ }
}
}
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
deleted file mode 100644
index a110891cfc..0000000000
--- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Client.Tests.failover
-{
- [TestFixture, Category("Failover")]
- public class FailoverTxTest : IConnectionListener
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest));
-
- const int NUM_ITERATIONS = 10;
- const int NUM_COMMITED_MESSAGES = 10;
- const int NUM_ROLLEDBACK_MESSAGES = 3;
- const int SLEEP_MILLIS = 50;
-
- // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge;
- const bool _noWait = true; // use Receive or ReceiveNoWait
- AMQConnection _connection;
-
- public void OnMessage(IMessage message)
- {
- try
- {
- _log.Info("Received: " + ((ITextMessage) message).Text);
- if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- _log.Info("client acknowledging");
- message.Acknowledge();
- }
- }
- catch (QpidException e)
- {
- Error(e);
- }
- }
-
- class NoWaitConsumer
- {
- FailoverTxTest _failoverTxTest;
- IMessageConsumer _consumer;
- private bool _noWait;
-
- internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait)
- {
- _failoverTxTest = failoverTxTest;
- _consumer = channel;
- _noWait = noWait;
- }
-
- internal void Run()
- {
- int messages = 0;
- while (messages < NUM_COMMITED_MESSAGES)
- {
- IMessage msg;
- if (_noWait) msg = _consumer.ReceiveNoWait();
- else msg = _consumer.Receive();
- if (msg != null)
- {
- _log.Info("NoWait received message");
- ++messages;
- _failoverTxTest.OnMessage(msg);
- }
- else
- {
- Thread.Sleep(1);
- }
-
- }
-
- }
- }
-
- void DoFailoverTxTest(IConnectionInfo connectionInfo)
- {
- _connection = new AMQConnection(connectionInfo);
- _connection.ConnectionListener = this;
- _log.Info("connection = " + _connection);
- _log.Info("connectionInfo = " + connectionInfo);
- _log.Info("connection.AsUrl = " + _connection.toURL());
-
- _log.Info("AcknowledgeMode is " + _acknowledgeMode);
- IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
- string queueName = receivingChannel.GenerateUniqueName();
-
- // Queue.Declare
- receivingChannel.DeclareQueue(queueName, false, true, true);
-
- // No need to call Queue.Bind as automatically bound to default direct exchange.
- receivingChannel.Bind(queueName, "amq.direct", queueName);
-
- IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
- .WithPrefetchLow(30)
- .WithPrefetchHigh(60).Create();
- bool useThread = false;
- if (useThread)
- {
- NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait);
- new Thread(new ThreadStart(noWaitConsumer.Run)).Start();
- }
- else
- {
- consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
- }
-
- _connection.Start();
-
- PublishInTx(queueName);
-
- Thread.Sleep(2000); // Wait a while for last messages.
-
- _connection.Close();
- _log.Info("FailoverTxText complete");
- }
-
- private void PublishInTx(string routingKey)
- {
- _log.Info("sendInTx");
- bool transacted = true;
- IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
- IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder()
- .WithRoutingKey(routingKey)
- .Create();
-
- for (int i = 1; i <= NUM_ITERATIONS; ++i)
- {
- for (int j = 1; j <= NUM_ROLLEDBACK_MESSAGES; ++j)
- {
- ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " rolledBackMsg=" + j);
- _log.Info("sending message = " + msg.Text);
- publisher.Send(msg);
- Thread.Sleep(SLEEP_MILLIS);
- }
- if (transacted) publishingChannel.Rollback();
-
- for (int j = 1; j <= NUM_COMMITED_MESSAGES; ++j)
- {
- ITextMessage msg = publishingChannel.CreateTextMessage("Tx=" + i + " commitedMsg=" + j);
- _log.Info("sending message = " + msg.Text);
- publisher.Send(msg);
- Thread.Sleep(SLEEP_MILLIS);
- }
- if (transacted) publishingChannel.Commit();
- }
- }
-
- private void Error(Exception e)
- {
- _log.Fatal("Exception received. About to stop.", e);
- Stop();
- }
-
- private void Stop()
- {
- _log.Info("Stopping...");
- try
- {
- _connection.Close();
- }
- catch (QpidException e)
- {
- _log.Info("Failed to shutdown: ", e);
- }
- }
-
- public void BytesSent(long count)
- {
- }
-
- public void BytesReceived(long count)
- {
- }
-
- public bool PreFailover(bool redirect)
- {
- _log.Info("preFailover(" + redirect + ") called");
- return true;
- }
-
- public bool PreResubscribe()
- {
- _log.Info("preResubscribe() called");
- return true;
- }
-
- public void FailoverComplete()
- {
- _log.Info("failoverComplete() called");
- }
-
- [Test]
- public void TestFail()
- {
- Assert.Fail("Tests in this class do not pass, but hang forever, so commented out until can be fixed.");
- }
-
- /*[Test]
- public void TestWithBasicInfo()
- {
- Console.WriteLine("TestWithBasicInfo");
- Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion());
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
-
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
- DoFailoverTxTest(connectionInfo);
- }*/
-
- /*[Test]
- public void runTestWithUrl()
- {
- String clientId = "failover" + DateTime.Now.Ticks;
- string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
- "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
-
- _log.Info("url = [" + defaultUrl + "]");
-
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-
- _log.Info("connection url = [" + connectionInfo + "]");
-
- DoFailoverTxTest(connectionInfo);
- }*/
- }
-}
diff --git a/dotnet/Qpid.Client.Tests/log4net.config b/dotnet/Qpid.Client.Tests/log4net.config
index 4346e0eaeb..8753c9c431 100644
--- a/dotnet/Qpid.Client.Tests/log4net.config
+++ b/dotnet/Qpid.Client.Tests/log4net.config
@@ -1,62 +1,48 @@
<log4net>
- <appender name="console" type="log4net.Appender.ConsoleAppender" >
- <layout type="log4net.Layout.PatternLayout">
- <conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" />
- </layout>
- <threshold value="info"/>
- </appender>
-
- <appender name="filelog" type="log4net.Appender.FileAppender">
- <file value="qpid_client.log"/>
- <appendToFile value="false"/>
- <layout type="log4net.Layout.PatternLayout">
- <conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" />
- </layout>
- </appender>
-
- <appender name="protocolLog" type="log4net.Appender.FileAppender">
- <file value="protocol.log"/>
- <appendToFile value="false"/>
- <layout type="log4net.Layout.PatternLayout">
- <conversionPattern value="%date - %message%newline"/>
- </layout>
- </appender>
-
- <appender name="ioLog" type="log4net.Appender.FileAppender">
- <file value="io.log"/>
- <appendToFile value="false"/>
- <layout type="log4net.Layout.PatternLayout">
- <conversionPattern value="%date - %message%newline"/>
- </layout>
- <threshold value="info"/>
- </appender>
-
- <appender name="UdpAppender" type="log4net.Appender.UdpAppender">
- <remoteAddress value="127.0.0.1" />
- <remotePort value="4445" />
- <layout type="log4net.Layout.XmlLayoutSchemaLog4j">
- <locationInfo value="true" />
- </layout>
- <threshold value="debug"/>
- </appender>
-
- <logger name="Qpid.Client.ProtocolChannel.Tracing" additivity="false">
- <level value="info"/>
- <appender-ref ref="protocolLog"/>
- </logger>
-
- <logger name="Qpid.Client.ByteChannel.Tracing" additivity="false">
- <level value="info" />
- <appender-ref ref="ioLog"/>
- </logger>
- <logger name="Qpid.Framing.FieldTable" additivity="false">
- <level value="debug" />
- <appender-ref ref="console"/>
- </logger>
-
- <root>
- <appender-ref ref="console"/>
- <appender-ref ref="UdpAppender"/>
- <appender-ref ref="filelog"/>
- </root>
+
+ <!-- ============================== -->
+ <!-- Append messages to the console -->
+ <!-- ============================== -->
+
+ <appender name="console" type="log4net.Appender.ConsoleAppender" >
+ <layout type="log4net.Layout.PatternLayout">
+ <conversionPattern value="%m%n"/>
+ </layout>
+ <threshold value="info"/>
+ </appender>
+
+ <!-- ====================================== -->
+ <!-- Append messages to the socket appender -->
+ <!-- ====================================== -->
+
+ <appender name="UdpAppender" type="log4net.Appender.UdpAppender">
+ <remoteAddress value="127.0.0.1"/>
+ <remotePort value="4445"/>
+ <layout type="log4net.Layout.XmlLayoutSchemaLog4j">
+ <locationInfo value="true"/>
+ </layout>
+ <threshold value="debug"/>
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <logger name="Qpid">
+ <level value="debug"/>
+ </logger>
+
+ <logger name="CONSOLE">
+ <level value="info"/>
+ <appender-ref ref="console"/>
+ </logger>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <appender-ref ref="UdpAppender"/>
+ </root>
+
</log4net> \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
index 326afbe613..3d31d6c2f3 100644
--- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
@@ -29,21 +29,40 @@ using Apache.Qpid.Framing;
namespace Apache.Qpid.Client.Protocol
{
+ /// <summary>
+ /// AMQProtocolListener
+ ///
+ /// <p/>Fail-over state transition rules...
+ ///
+ /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order
+ /// to be able to send errors during failover back to the client application. The session won't be available in the case
+ /// when failing over due to a Connection.Redirect message from the broker.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Track fail over state of a connection.
+ /// <tr><td> Manage method listeners. <td> IAMQMethodListener
+ /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler
+ /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener
+ /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener
+ /// </table>
+ ///
+ /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being
+ /// triggered, when it should not be.
+ ///
+ /// </summary>
public class AMQProtocolListener : IProtocolListener
{
+ /// <summary>Used for debugging.</summary>
private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
- /**
- * We create the failover handler when the session is created since it needs a reference to the IoSession in order
- * to be able to send errors during failover back to the client application. The session won't be available in the
- * case where we failing over due to a Connection.Redirect message from the broker.
- */
+ /// <summary>
+ /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it,
+ /// the failover process is handed off to this handler.
+ /// </summary>
private FailoverHandler _failoverHandler;
- /**
- * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
- * attempting failover where it is failing.
- */
+ /// <summary>Tracks the current fail-over state.</summary>
internal FailoverState _failoverState = FailoverState.NOT_STARTED;
internal FailoverState FailoverState
@@ -63,15 +82,14 @@ namespace Apache.Qpid.Client.Protocol
set { _stateManager = value; }
}
- //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
- AMQProtocolSession _protocolSession = null; // FIXME
- public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed?
-
+ AMQProtocolSession _protocolSession = null;
private readonly Object _lock = new Object();
+ public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } }
+
public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
{
_connection = connection;
@@ -138,88 +156,90 @@ namespace Apache.Qpid.Client.Protocol
{
_log.Debug("HeartBeat received");
}
- //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful?
}
+ /// <summary>
+ /// Receives notification of any IO exceptions on the connection.
+ ///
+ /// <p/>Upon receipt of a connection closed exception, the fail-over process is attempted. If the fail-over fails, then all method listeners
+ /// and the application connection object are notified of the connection failure exception.
+ ///
+ /// <p/>This exception handler only deals with AMQConnectionClosedExceptions, any other exception types are thrown back to the caller.
+ /// </summary>
public void OnException(Exception cause)
{
- _log.Warn("Protocol Listener received exception", cause);
- lock (_lock)
+ _log.Warn("public void OnException(Exception cause = " + cause + "): called");
+
+ if (cause is AMQConnectionClosedException || cause is System.IO.IOException)
{
- if (_failoverState == FailoverState.NOT_STARTED)
+ // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also
+ // ensures that this exception is fully propagated to all listeners, before another one can be processed.
+ lock (_lock)
{
- if (cause is AMQConnectionClosedException)
+ // Try a fail-over because the connection has failed.
+ FailoverState failoverState = AttemptFailover();
+
+ // Check if the fail-over has failed, in which case notify all method listeners of the exception.
+ // The application connection object is also notified of the failure of the connection with the exception.
+ if (failoverState == FailoverState.FAILED)
{
- WhenClosed();
+ _log.Debug("Fail-over has failed. Notifying all method listeners of the exception.");
+
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ PropagateExceptionToWaiters(amqe);
+ _connection.ExceptionReceived(cause);
}
}
- // We reach this point if failover was attempted and failed therefore we need to let the calling app
- // know since we cannot recover the situation.
- else if (_failoverState == FailoverState.FAILED)
- {
- // we notify the state manager of the error in case we have any clients waiting on a state
- // change. Those "waiters" will be interrupted and can handle the exception
- AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- PropagateExceptionToWaiters(amqe);
- _connection.ExceptionReceived(cause);
- }
+ }
+ // Throw the exception back to the caller if it is not of a known type, to ensure unhandled runtimes are not swallowed.
+ else
+ {
+ throw cause;
}
}
- /**
- * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
- * sessionClosed() depending on whether we were trying to send data at the time of failure.
- *
- * @param session
- * @throws Exception
- */
- void WhenClosed()
+ /// <summary>
+ /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been
+ /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress
+ /// this method allows it to continue to run and will do nothing.
+ ///
+ /// <p/>This method should only be called when the connection has been remotely closed.
+ /// </summary>
+ ///
+ /// <returns>The fail-over state at the end of this attempt.</returns>
+ private FailoverState AttemptFailover()
{
+ _log.Debug("private void AttemptFailover(): called");
+ _log.Debug("_failoverState = " + _failoverState);
+
+ // Ensure that the connection stops sending heart beats, if it still is.
_connection.StopHeartBeatThread();
- // TODO: Server just closes session with no warning if auth fails.
- if (_connection.Closed)
+ // Check that the connection policy allows fail-over to be attempted.
+ if (!_connection.IsFailoverAllowed)
{
- _log.Info("Channel closed called by client");
+ _log.Debug("Connection does not allowed to failover");
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Broker closed connection and reconnection is not permitted."));
}
- else
- {
- _log.Info("Channel closed called with failover state currently " + _failoverState);
- // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions
- // known through the policy settings.
+ // Check if connection was closed deliberately by the application, in which case no fail-over is attempted.
+ if (_connection.Closed)
+ {
+ return _failoverState;
+ }
- if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed)
- {
- _log.Info("FAILOVER STARTING");
- if (_failoverState == FailoverState.NOT_STARTED)
- {
- _failoverState = FailoverState.IN_PROGRESS;
- StartFailoverThread();
- }
- else
- {
- _log.Info("Not starting failover as state currently " + _failoverState);
- }
- }
- else
- {
- _log.Info("Failover not allowed by policy.");
+ // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is
+ // advanced to 'in progress'
+ if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed)
+ {
+ _log.Info("Starting the fail-over process.");
- if (_failoverState != FailoverState.IN_PROGRESS)
- {
- _log.Info("sessionClose() not allowed to failover");
- _connection.ExceptionReceived(
- new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
- }
- else
- {
- _log.Info("sessionClose() failover in progress");
- }
- }
+ _failoverState = FailoverState.IN_PROGRESS;
+ StartFailoverThread();
}
- _log.Info("Protocol Channel [" + this + "] closed");
+ return _failoverState;
}
/// <summary>
diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
index 556f9631b3..9ac513069e 100644
--- a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
+++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
@@ -143,9 +143,10 @@ namespace Apache.Qpid.Client.Transport
try
{
_topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
- } catch ( Exception e )
+ }
+ catch (Exception e)
{
- _log.Error("Write caused exception", e);
+ _log.Warn("Write caused exception", e);
_protocolListener.OnException(e);
}
}
diff --git a/dotnet/Qpid.Client/qms/FailoverPolicy.cs b/dotnet/Qpid.Client/qms/FailoverPolicy.cs
index d5d1e8d8ac..179a695bf9 100644
--- a/dotnet/Qpid.Client/qms/FailoverPolicy.cs
+++ b/dotnet/Qpid.Client/qms/FailoverPolicy.cs
@@ -46,7 +46,7 @@ namespace Apache.Qpid.Client.Qms
private long _lastMethodTime;
private long _lastFailTime;
-
+
public FailoverPolicy(IConnectionInfo connectionInfo)
{
IFailoverMethod method;
diff --git a/dotnet/Qpid.Common/AMQConnectionClosedException.cs b/dotnet/Qpid.Common/AMQConnectionClosedException.cs
index f3bc387a5c..136131144b 100644
--- a/dotnet/Qpid.Common/AMQConnectionClosedException.cs
+++ b/dotnet/Qpid.Common/AMQConnectionClosedException.cs
@@ -24,6 +24,17 @@ using System.Runtime.Serialization;
namespace Apache.Qpid
{
+ /// <summary>
+ /// AMQConnectionClosedException indicates that a connection has been closed.
+ ///
+ /// <p/>This exception is really used as an event, in order that the method handler that raises it creates an event
+ /// which is propagated to the io handler, in order to notify it of the connection closure.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Represents a the closure of a connection.
+ /// </table>
+ /// </summary>
[Serializable]
public class AMQConnectionClosedException : AMQException
{
@@ -33,7 +44,7 @@ namespace Apache.Qpid
}
protected AMQConnectionClosedException(SerializationInfo info, StreamingContext ctxt)
- : base(info, ctxt)
+ : base(info, ctxt)
{
}
}