1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Text;
using System.Threading;
using log4net;
using NUnit.Framework;
using Qpid.Messaging;
namespace Qpid.Client.Tests
{
[TestFixture]
public class ProducerMultiConsumer : BaseMessagingTestFixture
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumer));
private string _commandQueueName = "ServiceQ1";
private const int CONSUMER_COUNT = 5;
private const int MESSAGE_COUNT = 1000;
private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
AutoResetEvent _finishedEvent = new AutoResetEvent(false);
private static String GetData(int size)
{
StringBuilder buf = new StringBuilder(size);
int count = 0;
while (count < size + MESSAGE_DATA_BYTES.Length)
{
buf.Append(MESSAGE_DATA_BYTES);
count += MESSAGE_DATA_BYTES.Length;
}
if (count < size)
{
buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
}
return buf.ToString();
}
private IMessagePublisher _publisher;
private IMessageConsumer[] _consumers = new IMessageConsumer[CONSUMER_COUNT];
private int _messageReceivedCount = 0;
[SetUp]
public override void Init()
{
base.Init();
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
.WithExchangeName(ExchangeNameDefaults.TOPIC)
.Create();
_publisher.DisableMessageTimestamp = true;
_publisher.DeliveryMode = DeliveryMode.NonPersistent;
for (int i = 0; i < CONSUMER_COUNT; i++)
{
string queueName = _channel.GenerateUniqueName();
_channel.DeclareQueue(queueName, false, true, true);
_channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
_consumers[i] = _channel.CreateConsumerBuilder(queueName)
.WithPrefetchLow(100).Create();
_consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage);
}
_connection.Start();
}
public void OnMessage(IMessage m)
{
int newCount = Interlocked.Increment(ref _messageReceivedCount);
if (newCount % 1000 == 0) _logger.Info("Received count=" + newCount);
if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT))
{
_logger.Info("All messages received");
_finishedEvent.Set();
}
if ( newCount % 100 == 0 )
System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
}
[Test]
public void RunTest()
{
for (int i = 0; i < MESSAGE_COUNT; i++)
{
ITextMessage msg;
try
{
msg = _channel.CreateTextMessage(GetData(512 + 8*i));
}
catch (Exception e)
{
_logger.Error("Error creating message: " + e, e);
break;
}
_publisher.Send(msg);
}
_finishedEvent.WaitOne();
}
}
}
|