summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-09 13:53:51 +0000
committerAidan Skinner <aidan@apache.org>2008-05-09 13:53:51 +0000
commit4cf2baa090c723b97a486bdb582ce07ff3a6c190 (patch)
tree6fc914c48a396cdc9cd6aad90736d3c760f58271
parentf66b4596946bbd8df7e6afbfd421197fe9856f32 (diff)
downloadqpid-python-4cf2baa090c723b97a486bdb582ce07ff3a6c190.tar.gz
Merged revisions 652388-653415,653417-654109 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads. ........ r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in thread dump. Named Queue-housekeeping-<virtualhost name> ........ r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired. ........ r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line QPID-994 Dont wait for attain state as connection is closed by we get CloseOk ........ r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line QPID-1001 dont set the expiration time if TTL is 0 ........ r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line Check if consumer is closed and dont reclose it ........ r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line QPID-1022 Use synchronous writes to fix race conditions ........ r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line QPID-1023 increase some timeouts ........ r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness. ........ r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout. ........ r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line QPID-952 should have been part of previous commit ........ r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.x@654818 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs18
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs37
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs42
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs6
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageProducer.cs5
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs14
-rw-r--r--dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs717
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs17
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs6
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs35
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs30
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs2
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs6
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj42
-rw-r--r--dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java178
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java74
17 files changed, 709 insertions, 522 deletions
diff --git a/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs b/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
index badaa48111..b62b11a3db 100644
--- a/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
+++ b/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs
@@ -53,6 +53,11 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking
{
_socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
+ /// For future note TCP Set NoDelay options may help, though with the blocking io not sure
+ /// The Don't linger may help with detecting disconnect but that hasn't been the case in testing.
+ /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.NoDelay, 0);
+ /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.DontLinger, 0);
+
IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
IPAddress ipAddress = ipHostInfo.AddressList[0];
@@ -77,6 +82,8 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking
{
_log.Error("Write caused exception", e);
_protocolListener.OnException(e);
+ // We should provide the error synchronously as we are doing blocking io.
+ throw e;
}
}
@@ -87,6 +94,17 @@ namespace Apache.Qpid.Client.Transport.Socket.Blocking
int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
+ /// Read only returns 0 if the socket has been gracefully shutdown.
+ /// http://msdn2.microsoft.com/en-us/library/system.net.sockets.networkstream.read(VS.71).aspx
+ /// We can use this to block Send so the next Read will force an exception forcing failover.
+ /// Otherwise we need to wait ~20 seconds for the NetworkStream/Socket code to realise that
+ /// the socket has been closed.
+ if (numOctets == 0)
+ {
+ _socket.Shutdown(SocketShutdown.Send);
+ _socket.Close();
+ }
+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.limit(numOctets);
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
index d0bebf1170..41d4e089b6 100644
--- a/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -69,7 +69,7 @@ namespace Apache.Qpid.Client
internal bool IsFailoverAllowed
{
- get { return _failoverPolicy.FailoverAllowed(); }
+ get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
}
/// <summary>
@@ -151,34 +151,22 @@ namespace Apache.Qpid.Client
_log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
// XXX: Should perhaps break out of the do/while here if not a SocketException...
}
- } while (_failoverPolicy.FailoverAllowed());
+ } while (!_connected && _failoverPolicy.FailoverAllowed());
_log.Debug("Are we connected:" + _connected);
-
- if (!_failoverPolicy.FailoverAllowed())
- {
- if ( lastException is AMQException )
- throw lastException;
- else
- throw new AMQConnectionException("Unable to connect", lastException);
- }
- // TODO: this needs to be redone so that we are not spinning.
- // A suitable object should be set that is then waited on
- // and only notified when a connection is made or when
- // the AMQConnection gets closed.
- while (!_connected && !Closed)
+ if (!_connected)
{
- _log.Debug("Sleeping.");
- Thread.Sleep(100);
- }
- if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
- {
- if (_lastAMQException != null)
- {
- throw _lastAMQException;
- }
+ if ( lastException is AMQException )
+ {
+ throw lastException;
+ }
+ else
+ {
+ throw new AMQConnectionException("Unable to connect", lastException);
+ }
}
+
}
/*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
@@ -263,7 +251,6 @@ namespace Apache.Qpid.Client
_log.Debug("Blocking for connection close ok frame");
- _stateManager.AttainState(AMQState.CONNECTION_CLOSED);
Disconnect();
}
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index ce8e2ca2fe..86dc9a4681 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -888,10 +888,14 @@ namespace Apache.Qpid.Client
/// <param name="consumer"></param>
private void RegisterConsumer(BasicMessageConsumer consumer)
{
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
- consumer.Exclusive, consumer.AcknowledgeMode);
- consumer.ConsumerTag = consumerTag;
- _consumers.Add(consumerTag, consumer);
+ consumer.Exclusive, consumer.AcknowledgeMode, tag);
+
}
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
@@ -902,19 +906,21 @@ namespace Apache.Qpid.Client
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
- routingKey, true, args);
- _replayFrames.Add(queueBind);
+ routingKey, false, args);
+
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueBind);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, true, args));
}
- private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag)
{
- // Need to generate a consumer tag on the client so we can exploit the nowait flag.
- String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
queueName, tag, noLocal,
@@ -934,9 +940,7 @@ namespace Apache.Qpid.Client
_logger.Debug(string.Format("DeleteQueue name={0}", queueName));
AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
-
- _replayFrames.Add(queueDelete);
-
+
if (noWait)
{
_connection.ProtocolWriter.Write(queueDelete);
@@ -945,6 +949,8 @@ namespace Apache.Qpid.Client
{
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
}
catch (AMQException)
{
@@ -958,14 +964,16 @@ namespace Apache.Qpid.Client
queueName, isDurable, isExclusive, isAutoDelete));
AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
- isAutoDelete, true, null);
+ isAutoDelete, false, null);
- _replayFrames.Add(queueDeclare);
lock (_connection.FailoverMutex)
{
- _connection.ProtocolWriter.Write(queueDeclare);
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+ isAutoDelete, true, null));
}
// AMQP-level method.
@@ -978,8 +986,6 @@ namespace Apache.Qpid.Client
AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive,
durable, autoDelete, xinternal, noWait, args);
-
- _replayFrames.Add(declareExchange);
if (noWait)
{
@@ -987,6 +993,8 @@ namespace Apache.Qpid.Client
{
_connection.ProtocolWriter.Write(declareExchange);
}
+ // AS FIXME: wasnae me
+ _replayFrames.Add(declareExchange);
}
else
{
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index e88cf8f04c..6fee316cb4 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -266,7 +266,11 @@ namespace Apache.Qpid.Client
public override void Close()
{
- // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
+ if (_closed == CLOSED)
+ {
+ return;
+ }
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
lock (_channel.Connection.FailoverMutex)
{
lock (_closingLock)
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
index ca6d2abee5..f33afc452e 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -306,7 +306,10 @@ namespace Apache.Qpid.Client
if ( !_disableTimestamps )
{
message.Timestamp = DateTime.UtcNow.Ticks;
- message.Expiration = message.Timestamp + timeToLive;
+ if (timeToLive != 0)
+ {
+ message.Expiration = message.Timestamp + timeToLive;
+ }
} else
{
message.Expiration = 0;
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
index 7ae086e35f..1fb3d407eb 100644
--- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -35,12 +35,6 @@ namespace Apache.Qpid.Client.Protocol
private readonly IProtocolWriter _protocolWriter;
private readonly IConnectionCloser _connectionCloser;
- /**
- * Counter to ensure unique queue names
- */
- private int _queueId = 1;
- private readonly Object _queueIdLock = new Object();
-
/// <summary>
/// Maps from the channel id to the AmqChannel that it represents.
/// </summary>
@@ -267,13 +261,7 @@ namespace Apache.Qpid.Client.Protocol
internal string GenerateQueueName()
{
- int id;
- lock(_queueIdLock)
- {
- id = _queueId++;
- }
-
- return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;
+ return "ntmp_" + System.Guid.NewGuid();
}
}
}
diff --git a/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
index fcaabfea3b..547266a7a5 100644
--- a/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
+++ b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs
@@ -1,320 +1,397 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Runtime.InteropServices;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.interactive
-{
- [TestFixture, Category("Interactive")]
- public class FailoverTest : IConnectionListener
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
-
- /// <summary>Specifies the number of times to run the test cycle.</summary>
- const int NUM_MESSAGES = 10;
-
- /// <summary>Determines how many messages to send within each commit.</summary>
- const int COMMIT_BATCH_SIZE = 1;
-
- /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
- //const int SLEEP_MILLIS = 1;
-
- /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
- const int TIMEOUT = 10000;
-
- /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
- const int FAIL_POINT = 5;
-
- /// <summary>Specified the ack mode to use for the test.</summary>
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
-
- /// <summary>Determines whether this test runs transactionally or not. </summary>
- bool transacted = false;
-
- /// <summary>Holds the connection to run the test over.</summary>
- AMQConnection _connection;
-
- /// <summary>Holds the channel for the test message publisher. </summary>
- IChannel publishingChannel;
-
- /// <summary>Holds the test message publisher. </summary>
- IMessagePublisher publisher;
-
- /// <summary>Used to keep count of the number of messages sent. </summary>
- int messagesSent;
-
- /// <summary>Used to keep count of the number of messages received. </summary>
- int messagesReceived;
-
- /// <summary>Used to wait for test completion on. </summary>
- private static object testComplete = new Object();
-
- /// <summary>
- /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
- /// </summary>
- /// [SetUp]
- public void Init(IConnectionInfo connectionInfo)
- {
- // Reset all counts.
- messagesSent = 0;
- messagesReceived = 0;
-
- // Create a connection for the test.
- _connection = new AMQConnection(connectionInfo);
- _connection.ConnectionListener = this;
-
- // Create a consumer to receive the test messages.
- IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
-
- string queueName = receivingChannel.GenerateUniqueName();
- receivingChannel.DeclareQueue(queueName, false, true, true);
- receivingChannel.Bind(queueName, "amq.direct", queueName);
-
- IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
- .WithPrefetchLow(30)
- .WithPrefetchHigh(60).Create();
-
- consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
- _connection.Start();
-
- // Create a publisher to send the test messages.
- publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
- publisher = publishingChannel.CreatePublisherBuilder()
- .WithRoutingKey(queueName)
- .Create();
-
- _log.Debug("connection = " + _connection);
- _log.Debug("connectionInfo = " + connectionInfo);
- _log.Debug("connection.AsUrl = " + _connection.toURL());
- _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
- }
-
- /// <summary>
- /// Clean up the test connection.
- /// </summary>
- [TearDown]
- public virtual void Shutdown()
- {
- Thread.Sleep(2000);
- _connection.Close();
- }
-
- /// <summary>
- /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
- /// to fail between are seperately added into the connection info.
- /// </summary>
- /*[Test]
- public void TestWithBasicInfo()
- {
- _log.Debug("public void TestWithBasicInfo(): called");
-
- // Manually create the connection parameters.
- QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
-
- Init(connectionInfo);
- DoFailoverTest();
- }*/
-
- /// <summary>
- /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
- /// </summary>
- [Test]
- public void TestWithUrl()
- {
- _log.Debug("public void runTestWithUrl(): called");
-
- // Parse the connection parameters from a URL.
- String clientId = "failover" + DateTime.Now.Ticks;
- string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
- "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
-
- Init(connectionInfo);
- DoFailoverTest();
- }
-
- /// <summary>
- /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
- /// are received within the test time limit.
- /// </summary>
- ///
- /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
- void DoFailoverTest()
- {
- _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
-
- for (int i = 1; i <= NUM_MESSAGES; ++i)
- {
- ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
- //_log.Debug("sending message = " + msg.Text);
- publisher.Send(msg);
- messagesSent++;
-
- _log.Debug("messagesSent = " + messagesSent);
-
- if (transacted)
- {
- publishingChannel.Commit();
- }
-
- // Prompt the user to cause a failure if at the fail point.
- if (i == FAIL_POINT)
- {
- PromptAndWait("Cause a broker failure now, then press return...");
- }
-
- //Thread.Sleep(SLEEP_MILLIS);
- }
-
- // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
- bool withinTimeout;
-
- lock(testComplete)
- {
- withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
- }
-
- if (!withinTimeout)
- {
- Assert.Fail("Test timed out, before all messages received.");
- }
-
- _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
- }
-
- /// <summary>
- /// Receives all of the test messages.
- /// </summary>
- ///
- /// <param name="message">The newly arrived test message.</param>
- public void OnMessage(IMessage message)
- {
- try
- {
- if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- message.Acknowledge();
- }
-
- messagesReceived++;
-
- _log.Debug("messagesReceived = " + messagesReceived);
-
- // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
- // succesfully completed.
- if (messagesReceived == NUM_MESSAGES)
- {
- lock (testComplete)
- {
- Monitor.Pulse(testComplete);
- }
- }
- }
- catch (QpidException e)
- {
- _log.Fatal("Exception received. About to stop.", e);
- Stop();
- }
- }
-
- /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
- ///
- /// <param name="message">The message to prompt the user with.</param>
- private void PromptAndWait(string message)
- {
- Console.WriteLine("\n" + message);
- Console.ReadLine();
- }
-
- // <summary>Closes the test connection.</summary>
- private void Stop()
- {
- _log.Debug("Stopping...");
- try
- {
- _connection.Close();
- }
- catch (QpidException e)
- {
- _log.Debug("Failed to shutdown: ", e);
- }
- }
-
- /// <summary>
- /// Called when bytes have been transmitted to the server
- /// </summary>
- ///
- /// <param>count the number of bytes sent in total since the connection was opened</param>
- public void BytesSent(long count) {}
-
- /// <summary>
- /// Called when some bytes have been received on a connection
- /// </summary>
- ///
- /// <param>count the number of bytes received in total since the connection was opened</param>
- public void BytesReceived(long count) {}
-
- /// <summary>
- /// Called after the infrastructure has detected that failover is required but before attempting failover.
- /// </summary>
- ///
- /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
- ///
- /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>
- public bool PreFailover(bool redirect)
- {
- _log.Debug("public bool PreFailover(bool redirect): called");
- return true;
- }
-
- /// <summary>
- /// Called after connection has been made to another broker after failover has been started but before
- /// any resubscription has been done.
- /// </summary>
- ///
- /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
- /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
- /// and consumers are invalidated.
- /// </return>
- public bool PreResubscribe()
- {
- _log.Debug("public bool PreResubscribe(): called");
- return true;
- }
-
- /// <summary>
- /// Called once failover has completed successfully. This is called irrespective of whether the client has
- /// vetoed automatic resubscription.
- /// </summary>
- public void FailoverComplete()
- {
- _log.Debug("public void FailoverComplete(): called");
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+ [TestFixture, Category("Interactive")]
+ public class FailoverTest : IConnectionListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
+
+ /// <summary>Specifies the number of times to run the test cycle.</summary>
+ const int NUM_MESSAGES = 10;
+
+ /// <summary>Determines how many messages to send within each commit.</summary>
+ const int COMMIT_BATCH_SIZE = 1;
+
+ /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+ //const int SLEEP_MILLIS = 1;
+
+ /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+ const int TIMEOUT = 10000;
+
+ /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+ const int FAIL_POINT = 5;
+
+ /// <summary>Specified the ack mode to use for the test.</summary>
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+ /// <summary>Determines whether this test runs transactionally or not. </summary>
+ bool transacted = false;
+
+ /// <summary>Holds the connection to run the test over.</summary>
+ AMQConnection _connection;
+
+ /// <summary>Holds the channel for the test message publisher. </summary>
+ IChannel publishingChannel;
+
+ /// <summary>Holds the test message publisher. </summary>
+ IMessagePublisher publisher;
+
+ /// <summary>Used to keep count of the number of messages sent. </summary>
+ int messagesSent;
+
+ /// <summary>Used to keep count of the number of messages received. </summary>
+ int messagesReceived;
+
+ /// <summary>Used to wait for test completion on. </summary>
+ private static object testComplete = new Object();
+
+ /// <summary>Used to wait for failover completion on. </summary>
+ private static object failoverComplete = new Object();
+
+ bool failedOver=false;
+
+ /// <summary>Used to record the extra message count (1) if the message sent right after failover actually made it to the new broker.</summary>
+ int _extraMessage = 0;
+
+ /// <summary>
+ /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+ /// </summary>
+ /// [SetUp]
+ public void Init(IConnectionInfo connectionInfo)
+ {
+ //log4net.Config.BasicConfigurator.Configure();
+ // Reset all counts.
+ messagesSent = 0;
+ messagesReceived = 0;
+ failedOver=false;
+ _extraMessage = 0;
+
+ PromptAndWait("Ensure both brokers are running, then press Enter");
+
+ // Create a connection for the test.
+ _connection = new AMQConnection(connectionInfo);
+ _connection.ConnectionListener = this;
+
+ // Create a consumer to receive the test messages.
+ IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+ string queueName = receivingChannel.GenerateUniqueName();
+ receivingChannel.DeclareQueue(queueName, false, true, true);
+ receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
+
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+
+ // Create a publisher to send the test messages.
+ publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+ publisher = publishingChannel.CreatePublisherBuilder()
+ .WithRoutingKey(queueName)
+ .Create();
+
+ _log.Debug("connection = " + _connection);
+ _log.Debug("connectionInfo = " + connectionInfo);
+ _log.Debug("connection.AsUrl = " + _connection.toURL());
+ _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+ }
+
+ /// <summary>
+ /// Clean up the test connection.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
+ {
+ if (!failedOver)
+ {
+ Assert.Fail("The failover callback never occured.");
+ }
+
+ Console.WriteLine("Test done shutting down");
+ Thread.Sleep(2000);
+ _connection.Close();
+ }
+
+ /// <summary>
+ /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+ /// to fail between are seperately added into the connection info.
+ /// </summary>
+ /*[Test]
+ public void TestWithBasicInfo()
+ {
+ _log.Debug("public void TestWithBasicInfo(): called");
+
+ // Manually create the connection parameters.
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+ Init(connectionInfo);
+ DoFailoverTest();
+ }*/
+
+ /// <summary>
+ /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+ /// </summary>
+ [Test]
+ public void TestWithUrl()
+ {
+ _log.Debug("public void runTestWithUrl(): called");
+
+ // Parse the connection parameters from a URL.
+ String clientId = "failover" + DateTime.Now.Ticks;
+ string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+ "?brokerlist='tcp://localhost:9672;tcp://localhost:9673'&failover='roundrobin'";
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+
+ Init(connectionInfo);
+ DoFailoverTest(0);
+ }
+
+ /// <summary>
+ /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+ /// are received within the test time limit.
+ /// </summary>
+ ///
+ /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+ void DoFailoverTest(int delay)
+ {
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
+
+ // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+ bool withinTimeout = false;
+
+ for (int i = 1; i <= NUM_MESSAGES; ++i)
+ {
+ SendMessage();
+
+ // Prompt the user to cause a failure if at the fail point.
+ if (i == FAIL_POINT)
+ {
+ for( int min = delay ; min > 0 ; min--)
+ {
+ Console.WriteLine("Waiting for "+min+" minutes to test connection time bug.");
+ Thread.Sleep(60*1000);
+ }
+
+ PromptAndWait("Cause a broker failure now, then press return.");
+ Console.WriteLine("NOTE: ensure that the delay between killing the broker and continuing here is less than 20 second");
+
+ Console.WriteLine("Sending a message to ensure send right after works");
+
+ SendMessage();
+
+ Console.WriteLine("Waiting for fail-over to complete before continuing...");
+
+
+ lock(failoverComplete)
+ {
+ if (!failedOver)
+ {
+ withinTimeout = Monitor.Wait(failoverComplete, TIMEOUT);
+ }
+ else
+ {
+ withinTimeout=true;
+ }
+ }
+
+ if (!withinTimeout)
+ {
+ PromptAndWait("Failover has not yet occured. Press enter to give up waiting.");
+ }
+ }
+ }
+
+ lock(testComplete)
+ {
+ withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+ }
+
+ if (!withinTimeout)
+ {
+ Assert.Fail("Test timed out, before all messages received.");
+ }
+
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+ }
+
+ [Test]
+ public void Test5MinuteWait()
+ {
+ String clientId = "failover" + DateTime.Now.Ticks;
+
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.Username = "guest";
+ connectionInfo.Password = "guest";
+ connectionInfo.ClientName = clientId;
+ connectionInfo.VirtualHost = "/test";
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9672, false));
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 9673, false));
+
+ Init(connectionInfo);
+ DoFailoverTest(5);
+ }
+
+ void SendMessage()
+ {
+ ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+
+ publisher.Send(msg);
+ messagesSent++;
+
+ if (transacted)
+ {
+ publishingChannel.Commit();
+ }
+
+ Console.WriteLine("messagesSent = " + messagesSent);
+ }
+
+ /// <summary>
+ /// Receives all of the test messages.
+ /// </summary>
+ ///
+ /// <param name="message">The newly arrived test message.</param>
+ public void OnMessage(IMessage message)
+ {
+ try
+ {
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ message.Acknowledge();
+ }
+
+ messagesReceived++;
+
+ _log.Debug("messagesReceived = " + messagesReceived);
+
+ // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
+ // succesfully completed.
+ if (messagesReceived == NUM_MESSAGES + _extraMessage)
+ {
+ lock (testComplete)
+ {
+ failedOver = true;
+ Monitor.Pulse(testComplete);
+ }
+ }
+ }
+ catch (QpidException e)
+ {
+ _log.Fatal("Exception received. About to stop.", e);
+ Stop();
+ }
+ }
+
+ /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+ ///
+ /// <param name="message">The message to prompt the user with.</param>
+ private void PromptAndWait(string message)
+ {
+ Console.WriteLine("\n" + message);
+ Console.ReadLine();
+ }
+
+ // <summary>Closes the test connection.</summary>
+ private void Stop()
+ {
+ _log.Debug("Stopping...");
+ try
+ {
+ _connection.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Debug("Failed to shutdown: ", e);
+ }
+ }
+
+ /// <summary>
+ /// Called when bytes have been transmitted to the server
+ /// </summary>
+ ///
+ /// <param>count the number of bytes sent in total since the connection was opened</param>
+ public void BytesSent(long count) {}
+
+ /// <summary>
+ /// Called when some bytes have been received on a connection
+ /// </summary>
+ ///
+ /// <param>count the number of bytes received in total since the connection was opened</param>
+ public void BytesReceived(long count) {}
+
+ /// <summary>
+ /// Called after the infrastructure has detected that failover is required but before attempting failover.
+ /// </summary>
+ ///
+ /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+ ///
+ /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>
+ public bool PreFailover(bool redirect)
+ {
+ _log.Debug("public bool PreFailover(bool redirect): called");
+ return true;
+ }
+
+ /// <summary>
+ /// Called after connection has been made to another broker after failover has been started but before
+ /// any resubscription has been done.
+ /// </summary>
+ ///
+ /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+ /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+ /// and consumers are invalidated.
+ /// </return>
+ public bool PreResubscribe()
+ {
+ _log.Debug("public bool PreResubscribe(): called");
+ return true;
+ }
+
+ /// <summary>
+ /// Called once failover has completed successfully. This is called irrespective of whether the client has
+ /// vetoed automatic resubscription.
+ /// </summary>
+ public void FailoverComplete()
+ {
+ failedOver = true;
+ _log.Debug("public void FailoverComplete(): called");
+ Console.WriteLine("public void FailoverComplete(): called");
+ lock (failoverComplete)
+ {
+ Monitor.Pulse(failoverComplete);
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
index f6d511034f..d4b61a2788 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
@@ -41,7 +41,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
/// <summary> The default timeout in milliseconds to use on receives. </summary>
- private const long RECEIVE_WAIT = 500;
+ private const long RECEIVE_WAIT = 2000;
/// <summary> The default AMQ connection URL to use for tests. </summary>
public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
@@ -55,6 +55,9 @@ namespace Apache.Qpid.Integration.Tests.testcases
/// <summary> Holds an array of channels for building mutiple test end-points. </summary>
protected IChannel[] testChannel = new IChannel[10];
+ /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
+ protected String[] testQueue = new String[10];
+
/// <summary> Holds an array of producers for building mutiple test end-points. </summary>
protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
@@ -65,7 +68,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
private static int uniqueId = 0;
/// <summary> Used to hold unique ids per test. </summary>
- protected int testId;
+ protected Guid testId;
/// <summary> Creates the test connection and channel. </summary>
[SetUp]
@@ -74,7 +77,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
log.Debug("public virtual void Init(): called");
// Set up a unique id for this test.
- testId = uniqueId++;
+ testId = System.Guid.NewGuid();
}
/// <summary>
@@ -144,6 +147,10 @@ namespace Apache.Qpid.Integration.Tests.testcases
if (declareBind)
{
+ if (durable)
+ {
+ testQueue[n] = queueName;
+ }
testChannel[n].DeclareQueue(queueName, durable, true, true);
testChannel[n].Bind(queueName, exchangeName, routingKey);
}
@@ -167,6 +174,10 @@ namespace Apache.Qpid.Integration.Tests.testcases
if (testConsumer[n] != null)
{
+ if (testQueue[n] != null)
+ {
+ testChannel[n].DeleteQueue(testQueue[n], false, false, true);
+ }
testConsumer[n].Close();
testConsumer[n].Dispose();
testConsumer[n] = null;
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
index 117dc200d3..e34864aefd 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
@@ -127,7 +127,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
.WithRoutingKey(_routingKey)
.Create();
_logger.Info("Publisher created...");
- SendTestMessage("Message 1");
+ SendTestMessage("DeleteNonEmptyQueue Message 1");
try
{
@@ -165,8 +165,8 @@ namespace Apache.Qpid.Integration.Tests.testcases
.Create();
_logger.Info("Publisher created...");
- SendTestMessage("Message 1");
- SendTestMessage("Message 2");
+ SendTestMessage("DeleteQueueWithResponse Message 1");
+ SendTestMessage("DeleteQueueWithResponse Message 2");
// delete the queue, the server must respond
_channel.DeleteQueue(_queueName, false, false, false);
diff --git a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
index aab279285e..1951a8a171 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
@@ -94,11 +94,11 @@ namespace Apache.Qpid.Integration.Tests.testcases
public void TestCommittedSendReceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "B", testConsumer[1]);
testChannel[1].Commit();
}
@@ -107,11 +107,11 @@ namespace Apache.Qpid.Integration.Tests.testcases
public void TestRolledBackSendNotReceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
testChannel[0].Rollback();
// Try to receive messages.
- ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(0, "B", testConsumer[1]);
testChannel[1].Commit();
}
@@ -124,17 +124,17 @@ namespace Apache.Qpid.Integration.Tests.testcases
true, false, null);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
// Close end-point 1 without committing the message, then re-open to consume again.
CloseEndPoint(1);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[2]);
CloseEndPoint(2);
}
@@ -144,38 +144,33 @@ namespace Apache.Qpid.Integration.Tests.testcases
public void TestCommittedReceiveNotRereceived()
{
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(0, "D", testConsumer[1]);
}
/// <summary> Check that a rolled back receive can be re-received. </summary>
[Test]
public void TestRolledBackReceiveCanBeRereceived()
{
- // Create a third end-point as an alternative delivery route for the message.
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
testChannel[0].Commit();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
testChannel[1].Rollback();
// Try to receive messages.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
-
- CloseEndPoint(2);
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+
}
}
-} \ No newline at end of file
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
index ceda19af3e..ac975100b1 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
@@ -99,7 +99,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
true, "TestSubscription" + testId);
ConsumeNMessagesOnly(1, "B", testConsumer[2]);
-
+
// Clean up any open consumers at the end of the test.
CloseEndPoint(2);
CloseEndPoint(1);
@@ -113,23 +113,23 @@ namespace Apache.Qpid.Integration.Tests.testcases
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
+ true, true, "foo"+testId);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
testChannel[0].Commit();
// Try to receive messages, but don't commit them.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+ testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
@@ -141,24 +141,24 @@ namespace Apache.Qpid.Integration.Tests.testcases
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
true, false, null);
SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
+ true, true, "foo"+testId);
// Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
testChannel[0].Commit();
// Try to receive messages, but roll them back.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
testChannel[1].Rollback();
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
-
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+ testChannel[1].Commit();
CloseEndPoint(1);
CloseEndPoint(0);
}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
index 1ab8d79250..5e17cf1d2d 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
@@ -50,7 +50,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
/// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
- private static readonly int TIMEOUT = 1000;
+ private static readonly int TIMEOUT = 2000;
/// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
private string _exchangeName = "ServiceQ1";
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
index 7a4bf29cf6..876e7c7bf7 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
@@ -117,7 +117,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
}
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
// Check that all messages really were received.
Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
@@ -139,14 +139,14 @@ namespace Apache.Qpid.Integration.Tests.testcases
SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
true, false, null);
- expectedMessageCount = MESSAGE_COUNT;
+ expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++)
{
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
}
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 10), false);
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
// Check that all messages really were received.
Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);
diff --git a/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj b/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
new file mode 100644
index 0000000000..8b27526da4
--- /dev/null
+++ b/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
@@ -0,0 +1,42 @@
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <ProjectGuid>{EFEB9E41-B66E-4674-85F7-18FAD056AD67}</ProjectGuid>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>Qpid.Integration.Tests</RootNamespace>
+ <AssemblyName>Qpid.Integration.Tests</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
+ <OutputPath>bin\Debug\</OutputPath>
+ <DebugSymbols>True</DebugSymbols>
+ <DebugType>Full</DebugType>
+ <Optimize>False</Optimize>
+ <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
+ <OutputPath>bin\Release\</OutputPath>
+ <DebugSymbols>False</DebugSymbols>
+ <DebugType>None</DebugType>
+ <Optimize>True</Optimize>
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ <DefineConstants>TRACE</DefineConstants>
+ </PropertyGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.Targets" />
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BaseMessagingTestFixture.cs" />
+ <Compile Include="ChannelQueueTest.cs" />
+ <Compile Include="CommitRollbackTest.cs" />
+ <Compile Include="ConnectionTest.cs" />
+ <Compile Include="DurableSubscriptionTest.cs" />
+ <Compile Include="HeadersExchangeTest.cs" />
+ <Compile Include="MandatoryMessageTest.cs" />
+ <Compile Include="ProducerMultiConsumerTest.cs" />
+ <Compile Include="SslConnectionTest.cs" />
+ </ItemGroup>
+</Project> \ No newline at end of file
diff --git a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
index 950acbed9c..1c104d1451 100644
--- a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
+++ b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
@@ -39,7 +39,7 @@ namespace Apache.Qpid.Integration.Tests.testcases
/// Make a test TLS connection to the broker
/// without using client-certificates
/// </summary>
- [Test]
+ //[Test]
public void DoSslConnection()
{
// because for tests we don't usually trust the server certificate
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 7dfcae95c3..cf607548f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
-
- /** Used by any reaping thread to purge messages */
- private StoreContext _reapingStoreContext = new StoreContext();
-
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -218,22 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeExpired() throws AMQException
{
_lock.lock();
-
-
- for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ try
{
- QueueEntry entry = iter.next();
- if(entry.expired())
+ // New Context to for dealing with the MessageStore.
+ StoreContext context = new StoreContext();
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
{
- // fixme: Currently we have to update the total byte size here for the data in the queue
- _totalMessageSize.addAndGet(-entry.getSize());
- _queue.dequeue(_reapingStoreContext,entry);
- iter.remove();
- }
- }
+ QueueEntry entry = iter.next();
+ if(entry.expired())
+ {
+ // fixme: Currently we have to update the total byte size here for the data in the queue
+ _totalMessageSize.addAndGet(-entry.getSize());
+ // Remove the message from the queue in the MessageStore
+ _queue.dequeue(context,entry);
- _lock.unlock();
+ // This queue nolonger needs a reference to this message
+ entry.getMessage().decrementReference(context);
+ iter.remove();
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
/** @return the state of the async processor. */
@@ -249,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
public List<QueueEntry> getMessages()
{
- _lock.lock();
- List<QueueEntry> list = new ArrayList<QueueEntry>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- list.add(entry);
+ for (QueueEntry entry : _messages)
+ {
+ list.add(entry);
+ }
+ }
+ finally
+ {
+ _lock.unlock();
}
- _lock.unlock();
return list;
}
@@ -278,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long maxMessageCount = toMessageId - fromMessageId + 1;
- _lock.lock();
-
List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- long msgId = entry.getMessage().getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
+ for (QueueEntry entry : _messages)
{
- foundMessagesList.add(entry);
- }
- // break if the no of messages are found
- if (foundMessagesList.size() == maxMessageCount)
- {
- break;
+ long msgId = entry.getMessage().getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(entry);
+ }
+ // break if the no of messages are found
+ if (foundMessagesList.size() == maxMessageCount)
+ {
+ break;
+ }
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return foundMessagesList;
}
@@ -445,45 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_lock.lock();
- QueueEntry entry = _messages.poll();
-
- if (entry != null)
+ try
{
- queue.dequeue(storeContext, entry);
+ QueueEntry entry = _messages.poll();
- _totalMessageSize.addAndGet(-entry.getSize());
+ if (entry != null)
+ {
+ queue.dequeue(storeContext, entry);
- //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- entry.getMessage().decrementReference(storeContext);
+ _totalMessageSize.addAndGet(-entry.getSize());
- }
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ entry.getMessage().decrementReference(storeContext);
- _lock.unlock();
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
public long clearAllMessages(StoreContext storeContext) throws AMQException
{
long count = 0;
- _lock.lock();
- synchronized (_queueHeadLock)
+ _lock.lock();
+ try
{
- QueueEntry entry = getNextMessage();
- while (entry != null)
+ synchronized (_queueHeadLock)
{
- //and remove it
- _messages.poll();
+ QueueEntry entry = getNextMessage();
- _queue.dequeue(storeContext, entry);
+ // todo: note: why do we need this? Why not reuse the passed 'storeContext'
+ //Create a new StoreContext for decrementing the References
+ StoreContext context = new StoreContext();
+
+ while (entry != null)
+ {
+ //and remove it
+ _messages.poll();
- entry.getMessage().decrementReference(_reapingStoreContext);
+ // todo: NOTE: Why is this a different context to the new local 'context'?
+ _queue.dequeue(storeContext, entry);
- entry = getNextMessage();
- count++;
+ entry.getMessage().decrementReference(context);
+
+ entry = getNextMessage();
+ count++;
+ }
+ _totalMessageSize.set(0L);
}
- _totalMessageSize.set(0L);
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return count;
}
@@ -518,10 +551,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_totalMessageSize.addAndGet(-entry.getSize());
+ // New Store Context for removing expired messages
+ StoreContext storeContext = new StoreContext();
+
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, entry);
+ _queue.dequeue(storeContext, entry);
- message.decrementReference(_reapingStoreContext);
+ message.decrementReference(storeContext);
if (_log.isInfoEnabled())
{
@@ -760,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (QueueEntry entry : movedMessageList)
- {
- addMessageToQueue(entry, false);
- }
-
- // enqueue on the pre delivery queues
- for (Subscription sub : _subscriptions.getSubscriptions())
+ try
{
for (QueueEntry entry : movedMessageList)
{
- // Only give the message to those that want them.
- if (sub.hasInterest(entry))
+ addMessageToQueue(entry, false);
+ }
+
+ // enqueue on the pre delivery queues
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ for (QueueEntry entry : movedMessageList)
{
- sub.enqueueForPreDelivery(entry, true);
+ // Only give the message to those that want them.
+ if (sub.hasInterest(entry))
+ {
+ sub.enqueueForPreDelivery(entry, true);
+ }
}
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 3ff9b8c356..f663cffee1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -71,7 +71,7 @@ public class VirtualHost implements Accessable
private ACLPlugin _accessManager;
- private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+ private final Timer _houseKeepingTimer;
private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
@@ -172,41 +172,53 @@ public class VirtualHost implements Accessable
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
+ _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
initialiseHouseKeeping(hostConfig);
}
private void initialiseHouseKeeping(final Configuration hostConfig)
{
-
- long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
-
- /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
- if(period != 0L)
- {
- class RemoveExpiredMessagesTask extends TimerTask
- {
- public void run()
- {
- for(AMQQueue q : _queueRegistry.getQueues())
- {
-
- try
- {
- q.removeExpiredIfNoSubscribers();
- }
- catch (AMQException e)
- {
- _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
- period/2,
- period);
- }
+
+ long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+ if (period != 0L)
+ {
+ class RemoveExpiredMessagesTask extends TimerTask
+ {
+ public void run()
+ {
+ try
+ {
+ _logger.info("Start Run");
+ for (AMQQueue q : _queueRegistry.getQueues())
+ {
+
+ try
+ {
+ q.removeExpiredIfNoSubscribers();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ _logger.info("Stop Run");
+ }
+ catch (Exception fatal)
+ {
+ System.err.println(Thread.currentThread().getName()+" Exception in housekeeping "+fatal);
+ fatal.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+ period / 2,
+ period);
+ }
}
private void initialiseMessageStore(Configuration config) throws Exception