diff options
author | Aidan Skinner <aidan@apache.org> | 2008-05-07 14:09:16 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-05-07 14:09:16 +0000 |
commit | 9003422eef0d6b53e0448bc8f4c1f445094a43d9 (patch) | |
tree | e00c460ce73539c9715c7c27f4250d5b4847829e /dotnet | |
parent | 372720db97414ef06e7830f6fc7621e08fe17a67 (diff) | |
download | qpid-python-9003422eef0d6b53e0448bc8f4c1f445094a43d9.tar.gz |
Merged revisions 653420-654109 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line
Check if consumer is closed and dont reclose it
........
r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line
QPID-1022 Use synchronous writes to fix race conditions
........
r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line
QPID-1023 increase some timeouts
........
r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines
QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness.
........
r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines
QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout.
........
r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line
QPID-952 should have been part of previous commit
........
r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines
QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet')
13 files changed, 510 insertions, 421 deletions
diff --git a/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs b/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs index badaa48111..b62b11a3db 100644 --- a/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs +++ b/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs @@ -53,6 +53,11 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking { _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + /// For future note TCP Set NoDelay options may help, though with the blocking io not sure + /// The Don't linger may help with detecting disconnect but that hasn't been the case in testing. + /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.NoDelay, 0); + /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.DontLinger, 0); + IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility. IPAddress ipAddress = ipHostInfo.AddressList[0]; @@ -77,6 +82,8 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking { _log.Error("Write caused exception", e); _protocolListener.OnException(e); + // We should provide the error synchronously as we are doing blocking io. + throw e; } } @@ -87,6 +94,17 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking int numOctets = _networkStream.Read(bytes, 0, bytes.Length); + /// Read only returns 0 if the socket has been gracefully shutdown. + /// http://msdn2.microsoft.com/en-us/library/system.net.sockets.networkstream.read(VS.71).aspx + /// We can use this to block Send so the next Read will force an exception forcing failover. + /// Otherwise we need to wait ~20 seconds for the NetworkStream/Socket code to realise that + /// the socket has been closed. + if (numOctets == 0) + { + _socket.Shutdown(SocketShutdown.Send); + _socket.Close(); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); byteBuffer.limit(numOctets); diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index 8e9ebdbdfb..41d4e089b6 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -69,7 +69,7 @@ namespace Apache.Qpid.Client internal bool IsFailoverAllowed { - get { return _failoverPolicy.FailoverAllowed(); } + get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } } /// <summary> @@ -151,34 +151,22 @@ namespace Apache.Qpid.Client _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); // XXX: Should perhaps break out of the do/while here if not a SocketException... } - } while (_failoverPolicy.FailoverAllowed()); + } while (!_connected && _failoverPolicy.FailoverAllowed()); _log.Debug("Are we connected:" + _connected); - - if (!_failoverPolicy.FailoverAllowed()) - { - if ( lastException is AMQException ) - throw lastException; - else - throw new AMQConnectionException("Unable to connect", lastException); - } - // TODO: this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !Closed) + if (!_connected) { - _log.Debug("Sleeping."); - Thread.Sleep(100); - } - if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } + if ( lastException is AMQException ) + { + throw lastException; + } + else + { + throw new AMQConnectionException("Unable to connect", lastException); + } } + } /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index ce8e2ca2fe..86dc9a4681 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -888,10 +888,14 @@ namespace Apache.Qpid.Client /// <param name="consumer"></param> private void RegisterConsumer(BasicMessageConsumer consumer) { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + consumer.ConsumerTag = tag; + _consumers.Add(tag, consumer); + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; - _consumers.Add(consumerTag, consumer); + consumer.Exclusive, consumer.AcknowledgeMode, tag); + } internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) @@ -902,19 +906,21 @@ namespace Apache.Qpid.Client AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, - routingKey, true, args); - _replayFrames.Add(queueBind); + routingKey, false, args); + lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueBind); + _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args)); } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) { - // Need to generate a consumer tag on the client so we can exploit the nowait flag. - String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, @@ -934,9 +940,7 @@ namespace Apache.Qpid.Client _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); - - _replayFrames.Add(queueDelete); - + if (noWait) { _connection.ProtocolWriter.Write(queueDelete); @@ -945,6 +949,8 @@ namespace Apache.Qpid.Client { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); } catch (AMQException) { @@ -958,14 +964,16 @@ namespace Apache.Qpid.Client queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, true, null); + isAutoDelete, false, null); - _replayFrames.Add(queueDeclare); lock (_connection.FailoverMutex) { - _connection.ProtocolWriter.Write(queueDeclare); + _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); } + // AS FIXME: wasnae me + _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, + isAutoDelete, true, null)); } // AMQP-level method. @@ -978,8 +986,6 @@ namespace Apache.Qpid.Client AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); - - _replayFrames.Add(declareExchange); if (noWait) { @@ -987,6 +993,8 @@ namespace Apache.Qpid.Client { _connection.ProtocolWriter.Write(declareExchange); } + // AS FIXME: wasnae me + _replayFrames.Add(declareExchange); } else { diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index e88cf8f04c..6fee316cb4 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -266,7 +266,11 @@ namespace Apache.Qpid.Client public override void Close() { - // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex + if (_closed == CLOSED) + { + return; + } + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex lock (_channel.Connection.FailoverMutex) { lock (_closingLock) diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs index 7ae086e35f..1fb3d407eb 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -35,12 +35,6 @@ namespace Apache.Qpid.Client.Protocol private readonly IProtocolWriter _protocolWriter; private readonly IConnectionCloser _connectionCloser; - /** - * Counter to ensure unique queue names - */ - private int _queueId = 1; - private readonly Object _queueIdLock = new Object(); - /// <summary> /// Maps from the channel id to the AmqChannel that it represents. /// </summary> @@ -267,13 +261,7 @@ namespace Apache.Qpid.Client.Protocol internal string GenerateQueueName() { - int id; - lock(_queueIdLock) - { - id = _queueId++; - } - - return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; + return "ntmp_" + System.Guid.NewGuid(); } } } diff --git a/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs index fcaabfea3b..547266a7a5 100644 --- a/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs +++ b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs @@ -1,320 +1,397 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
- [TestFixture, Category("Interactive")]
- public class FailoverTest : IConnectionListener
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
-
- /// <summary>Specifies the number of times to run the test cycle.</summary>
- const int NUM_MESSAGES = 10;
-
- /// <summary>Determines how many messages to send within each commit.</summary>
- const int COMMIT_BATCH_SIZE = 1;
-
- /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
- //const int SLEEP_MILLIS = 1;
-
- /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
- const int TIMEOUT = 10000;
-
- /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
- const int FAIL_POINT = 5;
-
- /// <summary>Specified the ack mode to use for the test.</summary>
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
- /// <summary>Determines whether this test runs transactionally or not. </summary>
- bool transacted = false;
-
- /// <summary>Holds the connection to run the test over.</summary>
- AMQConnection _connection;
-
- /// <summary>Holds the channel for the test message publisher. </summary>
- IChannel publishingChannel;
-
- /// <summary>Holds the test message publisher. </summary>
- IMessagePublisher publisher;
-
- /// <summary>Used to keep count of the number of messages sent. </summary>
- int messagesSent;
-
- /// <summary>Used to keep count of the number of messages received. </summary>
- int messagesReceived;
-
- /// <summary>Used to wait for test completion on. </summary>
- private static object testComplete = new Object();
-
- /// <summary>
- /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
- /// </summary>
- /// [SetUp]
- public void Init(IConnectionInfo connectionInfo)
- {
- // Reset all counts.
- messagesSent = 0;
- messagesReceived = 0;
-
- // Create a connection for the test.
- _connection = new AMQConnection(connectionInfo);
- _connection.ConnectionListener = this;
-
- // Create a consumer to receive the test messages.
- IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
- string queueName = receivingChannel.GenerateUniqueName();
- receivingChannel.DeclareQueue(queueName, false, true, true);
- receivingChannel.Bind(queueName, "amq.direct", queueName);
-
- IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
- .WithPrefetchLow(30)
- .WithPrefetchHigh(60).Create();
-
- consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
- _connection.Start();
-
- // Create a publisher to send the test messages.
- publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
- publisher = publishingChannel.CreatePublisherBuilder()
- .WithRoutingKey(queueName)
- .Create();
-
- _log.Debug("connection = " + _connection);
- _log.Debug("connectionInfo = " + connectionInfo);
- _log.Debug("connection.AsUrl = " + _connection.toURL());
- _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
- }
-
- /// <summary>
- /// Clean up the test connection.
- /// </summary>
- [TearDown]
- public virtual void Shutdown()
- {
- Thread.Sleep(2000);
- _connection.Close();
- }
-
- /// <summary>
- /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
- /// to fail between are seperately added into the connection info.
- /// </summary>
- /*[Test]
- public void TestWithBasicInfo()
- {
- _log.Debug("public void TestWithBasicInfo(): called");
-
- // Manually create the connection parameters.
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
- Init(connectionInfo);
- DoFailoverTest();
- }*/
-
- /// <summary>
- /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
- /// </summary>
- [Test]
- public void TestWithUrl()
- {
- _log.Debug("public void runTestWithUrl(): called");
-
- // Parse the connection parameters from a URL.
- String clientId = "failover" + DateTime.Now.Ticks;
- string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
- "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-
- Init(connectionInfo);
- DoFailoverTest();
- }
-
- /// <summary>
- /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
- /// are received within the test time limit.
- /// </summary>
- ///
- /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
- void DoFailoverTest()
- {
- _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
-
- for (int i = 1; i <= NUM_MESSAGES; ++i)
- {
- ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
- //_log.Debug("sending message = " + msg.Text);
- publisher.Send(msg);
- messagesSent++;
-
- _log.Debug("messagesSent = " + messagesSent);
-
- if (transacted)
- {
- publishingChannel.Commit();
- }
-
- // Prompt the user to cause a failure if at the fail point.
- if (i == FAIL_POINT)
- {
- PromptAndWait("Cause a broker failure now, then press return...");
- }
-
- //Thread.Sleep(SLEEP_MILLIS);
- }
-
- // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
- bool withinTimeout;
-
- lock(testComplete)
- {
- withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
- }
-
- if (!withinTimeout)
- {
- Assert.Fail("Test timed out, before all messages received.");
- }
-
- _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
- }
-
- /// <summary>
- /// Receives all of the test messages.
- /// </summary>
- ///
- /// <param name="message">The newly arrived test message.</param>
- public void OnMessage(IMessage message)
- {
- try
- {
- if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- message.Acknowledge();
- }
-
- messagesReceived++;
-
- _log.Debug("messagesReceived = " + messagesReceived);
-
- // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
- // succesfully completed.
- if (messagesReceived == NUM_MESSAGES)
- {
- lock (testComplete)
- {
- Monitor.Pulse(testComplete);
- }
- }
- }
- catch (QpidException e)
- {
- _log.Fatal("Exception received. About to stop.", e);
- Stop();
- }
- }
-
- /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
- ///
- /// <param name="message">The message to prompt the user with.</param>
- private void PromptAndWait(string message)
- {
- Console.WriteLine("\n" + message);
- Console.ReadLine();
- }
-
- // <summary>Closes the test connection.</summary>
- private void Stop()
- {
- _log.Debug("Stopping...");
- try
- {
- _connection.Close();
- }
- catch (QpidException e)
- {
- _log.Debug("Failed to shutdown: ", e);
- }
- }
-
- /// <summary>
- /// Called when bytes have been transmitted to the server
- /// </summary>
- ///
- /// <param>count the number of bytes sent in total since the connection was opened</param>
- public void BytesSent(long count) {}
-
- /// <summary>
- /// Called when some bytes have been received on a connection
- /// </summary>
- ///
- /// <param>count the number of bytes received in total since the connection was opened</param>
- public void BytesReceived(long count) {}
-
- /// <summary>
- /// Called after the infrastructure has detected that failover is required but before attempting failover.
- /// </summary>
- ///
- /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
- ///
- /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>
- public bool PreFailover(bool redirect)
- {
- _log.Debug("public bool PreFailover(bool redirect): called");
- return true;
- }
-
- /// <summary>
- /// Called after connection has been made to another broker after failover has been started but before
- /// any resubscription has been done.
- /// </summary>
- ///
- /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
- /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
- /// and consumers are invalidated.
- /// </return>
- public bool PreResubscribe()
- {
- _log.Debug("public bool PreResubscribe(): called");
- return true;
- }
-
- /// <summary>
- /// Called once failover has completed successfully. This is called irrespective of whether the client has
- /// vetoed automatic resubscription.
- /// </summary>
- public void FailoverComplete()
- {
- _log.Debug("public void FailoverComplete(): called");
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.InteropServices; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Integration.Tests.interactive +{ + [TestFixture, Category("Interactive")] + public class FailoverTest : IConnectionListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest)); + + /// <summary>Specifies the number of times to run the test cycle.</summary> + const int NUM_MESSAGES = 10; + + /// <summary>Determines how many messages to send within each commit.</summary> + const int COMMIT_BATCH_SIZE = 1; + + /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary> + //const int SLEEP_MILLIS = 1; + + /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary> + const int TIMEOUT = 10000; + + /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary> + const int FAIL_POINT = 5; + + /// <summary>Specified the ack mode to use for the test.</summary> + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge; + + /// <summary>Determines whether this test runs transactionally or not. </summary> + bool transacted = false; + + /// <summary>Holds the connection to run the test over.</summary> + AMQConnection _connection; + + /// <summary>Holds the channel for the test message publisher. </summary> + IChannel publishingChannel; + + /// <summary>Holds the test message publisher. </summary> + IMessagePublisher publisher; + + /// <summary>Used to keep count of the number of messages sent. </summary> + int messagesSent; + + /// <summary>Used to keep count of the number of messages received. </summary> + int messagesReceived; + + /// <summary>Used to wait for test completion on. </summary> + private static object testComplete = new Object(); + + /// <summary>Used to wait for failover completion on. </summary>
+ private static object failoverComplete = new Object(); + + bool failedOver=false; + + /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary>
+ int _extraMessage = 0; + + /// <summary> + /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection. + /// </summary> + /// [SetUp] + public void Init(IConnectionInfo connectionInfo) + { + //log4net.Config.BasicConfigurator.Configure(); + // Reset all counts. + messagesSent = 0; + messagesReceived = 0; + failedOver=false; + _extraMessage = 0; + + PromptAndWait("Ensure both brokers are running, then press Enter"); + + // Create a connection for the test. + _connection = new AMQConnection(connectionInfo); + _connection.ConnectionListener = this; + + // Create a consumer to receive the test messages. + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); + + string queueName = receivingChannel.GenerateUniqueName(); + receivingChannel.DeclareQueue(queueName, false, true, true); + receivingChannel.Bind(queueName, "amq.direct", queueName); + + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); + + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + _connection.Start(); + + // Create a publisher to send the test messages. + publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); + publisher = publishingChannel.CreatePublisherBuilder() + .WithRoutingKey(queueName) + .Create(); + + _log.Debug("connection = " + _connection); + _log.Debug("connectionInfo = " + connectionInfo); + _log.Debug("connection.AsUrl = " + _connection.toURL()); + _log.Debug("AcknowledgeMode is " + _acknowledgeMode); + } + + /// <summary> + /// Clean up the test connection. + /// </summary> + [TearDown] + public virtual void Shutdown() + { + if (!failedOver) + { + Assert.Fail("The failover callback never occured."); + } + + Console.WriteLine("Test done shutting down"); + Thread.Sleep(2000); + _connection.Close(); + } + + /// <summary> + /// Runs a failover test, building up the connection information from its component parts. In particular the brokers + /// to fail between are seperately added into the connection info. + /// </summary> + /*[Test] + public void TestWithBasicInfo() + { + _log.Debug("public void TestWithBasicInfo(): called"); + + // Manually create the connection parameters. + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + + Init(connectionInfo); + DoFailoverTest(); + }*/ + + /// <summary> + /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format. + /// </summary> + [Test] + public void TestWithUrl() + { + _log.Debug("public void runTestWithUrl(): called"); + + // Parse the connection parameters from a URL. + String clientId = "failover" + DateTime.Now.Ticks; + string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + + "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'"; + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl); + + Init(connectionInfo); + DoFailoverTest(0); + } + + /// <summary> + /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent + /// are received within the test time limit. + /// </summary> + /// + /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param> + void DoFailoverTest(int delay) + { + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called"); + + // Wait for all of the test messages to be received, checking that this occurs within the test time limit. + bool withinTimeout = false; + + for (int i = 1; i <= NUM_MESSAGES; ++i) + { + SendMessage(); + + // Prompt the user to cause a failure if at the fail point. + if (i == FAIL_POINT) + { + for( int min = delay ; min > 0 ; min--) + { + Console.WriteLine("Waiting for "+min+" minutes to test connection time bug."); + Thread.Sleep(60*1000); + } + + PromptAndWait("Cause a broker failure now, then press return."); + Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second"); + + Console.WriteLine("Sending a message to ensure send right after works"); + + SendMessage(); + + Console.WriteLine("Waiting for fail-over to complete before continuing..."); + + + lock(failoverComplete) + { + if (!failedOver) + { + withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT); + } + else + { + withinTimeout=true; + } + } + + if (!withinTimeout) + { + PromptAndWait("Failover has not yet occured. Press enter to give up waiting."); + } + } + } + + lock(testComplete) + { + withinTimeout = Monitor.Wait(testComplete, TIMEOUT); + } + + if (!withinTimeout) + { + Assert.Fail("Test timed out, before all messages received."); + } + + _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting"); + } + + [Test] + public void Test5MinuteWait() + { + String clientId = "failover" + DateTime.Now.Ticks; + + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.Username = "guest"; + connectionInfo.Password = "guest"; + connectionInfo.ClientName = clientId; + connectionInfo.VirtualHost = "/test"; + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false)); + + Init(connectionInfo); + DoFailoverTest(5); + } + + void SendMessage() + { + ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent); + + publisher.Send(msg); + messagesSent++; + + if (transacted) + { + publishingChannel.Commit(); + } + + Console.WriteLine("messagesSent = " + messagesSent); + } + + /// <summary> + /// Receives all of the test messages. + /// </summary> + /// + /// <param name="message">The newly arrived test message.</param> + public void OnMessage(IMessage message) + { + try + { + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + message.Acknowledge(); + } + + messagesReceived++; + + _log.Debug("messagesReceived = " + messagesReceived); + + // Check if all of the messages in the test have been received, in which case notify the message producer that the test has + // succesfully completed. + if (messagesReceived == NUM_MESSAGES + _extraMessage) + { + lock (testComplete) + { + failedOver = true; + Monitor.Pulse(testComplete); + } + } + } + catch (QpidException e) + { + _log.Fatal("Exception received. About to stop.", e); + Stop(); + } + } + + /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary> + /// + /// <param name="message">The message to prompt the user with.</param> + private void PromptAndWait(string message) + { + Console.WriteLine("\n" + message); + Console.ReadLine(); + } + + // <summary>Closes the test connection.</summary> + private void Stop() + { + _log.Debug("Stopping..."); + try + { + _connection.Close(); + } + catch (QpidException e) + { + _log.Debug("Failed to shutdown: ", e); + } + } + + /// <summary> + /// Called when bytes have been transmitted to the server + /// </summary> + /// + /// <param>count the number of bytes sent in total since the connection was opened</param> + public void BytesSent(long count) {} + + /// <summary> + /// Called when some bytes have been received on a connection + /// </summary> + /// + /// <param>count the number of bytes received in total since the connection was opened</param> + public void BytesReceived(long count) {} + + /// <summary> + /// Called after the infrastructure has detected that failover is required but before attempting failover. + /// </summary> + /// + /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param> + /// + /// <return>true to continue failing over, false to veto failover and raise a connection exception</return> + public bool PreFailover(bool redirect) + { + _log.Debug("public bool PreFailover(bool redirect): called"); + return true; + } + + /// <summary> + /// Called after connection has been made to another broker after failover has been started but before + /// any resubscription has been done. + /// </summary> + /// + /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in + /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers + /// and consumers are invalidated. + /// </return> + public bool PreResubscribe() + { + _log.Debug("public bool PreResubscribe(): called"); + return true; + } + + /// <summary> + /// Called once failover has completed successfully. This is called irrespective of whether the client has + /// vetoed automatic resubscription. + /// </summary> + public void FailoverComplete() + { + failedOver = true; + _log.Debug("public void FailoverComplete(): called"); + Console.WriteLine("public void FailoverComplete(): called"); + lock (failoverComplete) + { + Monitor.Pulse(failoverComplete); + } + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs index f6d511034f..d4b61a2788 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs @@ -41,7 +41,7 @@ namespace Apache.Qpid.Integration.Tests.testcases private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
/// <summary> The default timeout in milliseconds to use on receives. </summary>
- private const long RECEIVE_WAIT = 500;
+ private const long RECEIVE_WAIT = 2000;
/// <summary> The default AMQ connection URL to use for tests. </summary>
public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
@@ -55,6 +55,9 @@ namespace Apache.Qpid.Integration.Tests.testcases /// <summary> Holds an array of channels for building mutiple test end-points. </summary>
protected IChannel[] testChannel = new IChannel[10];
+ /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
+ protected String[] testQueue = new String[10];
+
/// <summary> Holds an array of producers for building mutiple test end-points. </summary>
protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
@@ -65,7 +68,7 @@ namespace Apache.Qpid.Integration.Tests.testcases private static int uniqueId = 0;
/// <summary> Used to hold unique ids per test. </summary>
- protected int testId;
+ protected Guid testId;
/// <summary> Creates the test connection and channel. </summary>
[SetUp]
@@ -74,7 +77,7 @@ namespace Apache.Qpid.Integration.Tests.testcases log.Debug("public virtual void Init(): called");
// Set up a unique id for this test.
- testId = uniqueId++;
+ testId = System.Guid.NewGuid();
}
/// <summary>
@@ -144,6 +147,10 @@ namespace Apache.Qpid.Integration.Tests.testcases if (declareBind)
{
+ if (durable)
+ {
+ testQueue[n] = queueName;
+ }
testChannel[n].DeclareQueue(queueName, durable, true, true);
testChannel[n].Bind(queueName, exchangeName, routingKey);
}
@@ -167,6 +174,10 @@ namespace Apache.Qpid.Integration.Tests.testcases if (testConsumer[n] != null)
{
+ if (testQueue[n] != null)
+ {
+ testChannel[n].DeleteQueue(testQueue[n], false, false, true);
+ }
testConsumer[n].Close();
testConsumer[n].Dispose();
testConsumer[n] = null;
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs index 117dc200d3..e34864aefd 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs @@ -127,7 +127,7 @@ namespace Apache.Qpid.Integration.Tests.testcases .WithRoutingKey(_routingKey)
.Create();
_logger.Info("Publisher created...");
- SendTestMessage("Message 1");
+ SendTestMessage("DeleteNonEmptyQueue Message 1");
try
{
@@ -165,8 +165,8 @@ namespace Apache.Qpid.Integration.Tests.testcases .Create();
_logger.Info("Publisher created...");
- SendTestMessage("Message 1");
- SendTestMessage("Message 2");
+ SendTestMessage("DeleteQueueWithResponse Message 1");
+ SendTestMessage("DeleteQueueWithResponse Message 2");
// delete the queue, the server must respond
_channel.DeleteQueue(_queueName, false, false, false);
diff --git a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index aab279285e..1951a8a171 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -94,11 +94,11 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestCommittedSendReceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "B", testConsumer[1]);
testChannel[1].Commit();
}
@@ -107,11 +107,11 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestRolledBackSendNotReceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
testChannel[0].Rollback();
// Try to receive messages.
- ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(0, "B", testConsumer[1]);
testChannel[1].Commit();
}
@@ -124,17 +124,17 @@ namespace Apache.Qpid.Integration.Tests.testcases true, false, null);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
// Close end-point 1 without committing the message, then re-open to consume again.
CloseEndPoint(1);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[2]);
CloseEndPoint(2);
}
@@ -144,38 +144,33 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestCommittedReceiveNotRereceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(0, "D", testConsumer[1]);
}
/// <summary> Check that a rolled back receive can be re-received. </summary>
[Test]
public void TestRolledBackReceiveCanBeRereceived()
{
- // Create a third end-point as an alternative delivery route for the message.
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
testChannel[1].Rollback();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
-
- CloseEndPoint(2);
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+
}
}
-}
\ No newline at end of file +}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs index ceda19af3e..ac975100b1 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs @@ -99,7 +99,7 @@ namespace Apache.Qpid.Integration.Tests.testcases true, "TestSubscription" + testId);
ConsumeNMessagesOnly(1, "B", testConsumer[2]);
-
+
// Clean up any open consumers at the end of the test.
CloseEndPoint(2);
CloseEndPoint(1);
@@ -113,23 +113,23 @@ namespace Apache.Qpid.Integration.Tests.testcases SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
+ true, true, "foo"+testId);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
testChannel[0].Commit();
// Try to receive messages, but don't commit them.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+ testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
@@ -141,24 +141,24 @@ namespace Apache.Qpid.Integration.Tests.testcases SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
+ true, true, "foo"+testId);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
testChannel[0].Commit();
// Try to receive messages, but roll them back.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Rollback();
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+ testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs index 1ab8d79250..5e17cf1d2d 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs @@ -50,7 +50,7 @@ namespace Apache.Qpid.Integration.Tests.testcases private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
/// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
- private static readonly int TIMEOUT = 1000;
+ private static readonly int TIMEOUT = 2000;
/// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
private string _exchangeName = "ServiceQ1";
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs index 7a4bf29cf6..876e7c7bf7 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs @@ -117,7 +117,7 @@ namespace Apache.Qpid.Integration.Tests.testcases testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
}
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
// Check that all messages really were received.
Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
@@ -139,14 +139,14 @@ namespace Apache.Qpid.Integration.Tests.testcases SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
true, false, null);
- expectedMessageCount = MESSAGE_COUNT;
+ expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++)
{
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
}
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
// Check that all messages really were received.
Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);
diff --git a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs index 950acbed9c..1c104d1451 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs @@ -39,7 +39,7 @@ namespace Apache.Qpid.Integration.Tests.testcases /// Make a test TLS connection to the broker
/// without using client-certificates
/// </summary>
- [Test]
+ //[Test]
public void DoSslConnection()
{
// because for tests we don't usually trust the server certificate
|