diff options
Diffstat (limited to 'Final/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs')
-rw-r--r-- | Final/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/Final/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs b/Final/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs new file mode 100644 index 0000000000..f299812989 --- /dev/null +++ b/Final/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs @@ -0,0 +1,127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Client.Tests
+{
+ [TestFixture]
+ public class TestSyncConsumer : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(TestSyncConsumer));
+
+ private string _commandQueueName = "ServiceQ1";
+ private const int MESSAGE_COUNT = 1000;
+ private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while ( count < size + MESSAGE_DATA_BYTES.Length )
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if ( count < size )
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _connection.Start();
+ }
+
+ [Test]
+ public void ReceiveWithInfiniteWait()
+ {
+ // send all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8 * i));
+ } catch ( Exception e )
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+
+ _logger.Debug("All messages sent");
+ // receive all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ try
+ {
+ IMessage msg = _consumer.Receive();
+ Assert.IsNotNull(msg);
+ } catch ( Exception e )
+ {
+ _logger.Error("Error receiving message: " + e, e);
+ Assert.Fail(e.ToString());
+ }
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTimeout()
+ {
+ ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));
+ _publisher.Send(msg);
+
+ IMessage recvMsg = _consumer.Receive();
+ Assert.IsNotNull(recvMsg);
+ // empty queue, should timeout
+ Assert.IsNull(_consumer.Receive(1000));
+ }
+ }
+}
|