summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp')
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp182
1 files changed, 182 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp b/qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp
new file mode 100644
index 0000000000..da0f764bcd
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ public class ServiceRequestingClient : BaseMessagingTestFixture
+ {
+ private const int MESSAGE_SIZE = 1024;
+ private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
+
+ private const int PACK = 100;
+ private const int NUM_MESSAGES = PACK*10; // increase when in standalone
+
+ private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
+
+ ManualResetEvent _finishedEvent = new ManualResetEvent(false);
+
+ private int _expectedMessageCount = NUM_MESSAGES;
+
+ private long _startTime = 0;
+
+ private string _commandQueueName = "ServiceQ1";
+
+ private IMessagePublisher _publisher;
+
+ Avergager averager = new Avergager();
+
+ private void InitialiseProducer()
+ {
+ try
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
+ .Create();
+ _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
+ }
+ catch (QpidException e)
+ {
+ _log.Error("Error: " + e, e);
+ }
+ }
+
+ [Test]
+ public void SendMessages()
+ {
+ InitialiseProducer();
+
+ string replyQueueName = _channel.GenerateUniqueName();
+
+ _channel.DeclareQueue(replyQueueName, false, true, true);
+
+ IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(200)
+ .WithNoLocal(true)
+ .WithExclusive(true).Create();
+
+ _startTime = DateTime.Now.Ticks;
+
+ messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+ for (int i = 0; i < _expectedMessageCount; i++)
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(MESSAGE_DATA + i);
+ }
+ catch (Exception e)
+ {
+ _log.Error("Error creating message: " + e, e);
+ break;
+ }
+ msg.ReplyToRoutingKey = replyQueueName;
+
+ // Added timestamp.
+ long timeNow = DateTime.Now.Ticks;
+ string timeSentString = String.Format("{0:G}", timeNow);
+ msg.Headers.SetLong("timeSent", timeNow);
+
+ _publisher.Send(msg);
+ }
+
+ // Assert that the test finishes within a reasonable amount of time.
+ const int waitSeconds = 40;
+ const int waitMilliseconds = waitSeconds * 1000;
+ _log.Info("Finished sending " + _expectedMessageCount + " messages");
+ _log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
+ Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
+ String.Format("Expected to finish in {0} seconds", waitSeconds));
+ }
+
+ public void OnMessage(IMessage m)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Message received: " + m);
+ }
+
+ if (!m.Headers.Contains("timeSent"))
+ {
+ throw new Exception("Set timeSent!");
+ }
+
+ long sentAt = m.Headers.GetLong("timeSent");
+ long now = DateTime.Now.Ticks;
+ long latencyTicks = now - sentAt;
+ long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
+
+ averager.Add(latencyMilliseconds);
+
+ if (averager.Num % PACK == 0)
+ {
+ _log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
+ _log.Info(String.Format("Average latency (ms) = {0}", averager));
+ _log.Info("Received message count: " + averager.Num);
+ }
+
+ if (averager.Num == _expectedMessageCount)
+ {
+ _log.Info(String.Format("Final average latency (ms) = {0}", averager));
+
+ double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000);
+ _log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+ timeTakenSeconds + "s, equivalent to " +
+ (_expectedMessageCount/timeTakenSeconds) + " messages per second");
+
+ _finishedEvent.Set(); // Notify main thread to quit.
+ }
+ }
+ }
+
+ class Avergager
+ {
+ long num = 0;
+ long sum = 0;
+
+ long min = long.MaxValue;
+ long max = long.MinValue;
+
+ public void Add(long item)
+ {
+ ++num;
+ sum += item;
+ if (item < min) min = item;
+ if (item > max) max = item;
+ }
+
+ public long Average { get { return sum/num; }}
+
+ public long Num { get { return num; } }
+
+ public override string ToString()
+ {
+ return String.Format("average={0} min={1} max={2}", Average, min, max);
+ }
+ }
+}