summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
blob: 4fd0419e9cb6fb58d46eba05b7b2b6554cf75e42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
using System;
using System.Threading;
using log4net;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;

namespace Apache.Qpid.Client.Tests.interop
{
    public class TopicPublisher
    {
        private static ILog log = LogManager.GetLogger(typeof(TopicPublisher));

        /// <summary> The default AMQ connection URL to use for tests. </summary>
        const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";

        /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
        const int TIMEOUT = 10000;

        /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
        const string CONTROL_ROUTING_KEY = "topic_control";

        /// <summary> Holds the routing key for the queue to send reports to. </summary>
        const string RESPONSE_ROUTING_KEY = "response";

        /// <summary> Holds the number of messages to send in each test run. </summary>
        private int numMessages;

        /// <summary> Holds the number of subscribers listening to the messsages. </summary>
        private int numSubscribers;

        /// <summary> A monitor used to wait for all reports to arrive back from consumers on. </summary>
        private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false);

        /// <summary> Holds the connection to listen on. </summary>
        private IConnection connection;

        /// <summary> Holds the channel for all test messages.</summary>
        private IChannel channel;

        /// <summary> Holds the producer to send test messages on. </summary>
        private IMessagePublisher publisher;

        /// <summary>
        /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
        /// subscribers.
        /// </summary>
        /// 
        /// <param name="connectionUri">The broker URL.</param>
        /// <param name="numMessages">The number of messages to send in each test.</param>
        /// <param name="numSubscribers">The number of subscribes that are expected to reply with a report.</param>
        TopicPublisher(string connectionUri, int numMessages, int numSubscribers)
        {
            log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages + 
                      ", int numSubscribers = " + numSubscribers + "): called");

            // Create a connection to the broker.
            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
            connection = new AMQConnection(connectionInfo);

            // Establish a session on the broker.
            channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);

            // Set up a queue to listen for reports on.
            string responseQueueName = channel.GenerateUniqueName();
            channel.DeclareQueue(responseQueueName, false, true, true);
            
            // Set this listener up to listen for reports on the response queue.
            channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY);
            //channel.Bind(responseQueueName, "<<default>>", RESPONSE_ROUTING_KEY);
            IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)               
                .Create();
            consumer.OnMessage += new MessageReceivedDelegate(OnMessage);

            // Set up this listener with a producer to send the test messages and report requests on.
            publisher = channel.CreatePublisherBuilder()
                .WithExchangeName(ExchangeNameDefaults.TOPIC)
                .WithRoutingKey(CONTROL_ROUTING_KEY)
                .Create();

            // Keep the test parameters.
            this.numMessages = numMessages;
            this.numSubscribers = numSubscribers;

            connection.Start();
            Console.WriteLine("Sending messages and waiting for reports...");
        }

        /// <summary>
        /// Start a test subscriber. The broker URL must be specified as the first command line argument.
        /// </summary>
        /// 
        /// <param name="argv">The command line arguments, broker URL first.</param>
        public static void Main(String[] argv)
        {
            // Create an instance of this publisher with the command line parameters.
            TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1);

            // Publish the test messages.
            publisher.DoTest();
        }

        /// <summary>
        /// Sends the test messages and waits for all subscribers to reply with a report.
        /// </summary>
        public void DoTest()
        {
            log.Debug("public void DoTest(): called");

            // Create a test message to send.
            IMessage testMessage = channel.CreateTextMessage("test");

            // Send the desired number of test messages.
            for (int i = 0; i < numMessages; i++)
            {
                publisher.Send(testMessage);
            }

            log.Debug("Sent " + numMessages + " test messages.");

            // Send the report request.
            IMessage reportRequestMessage = channel.CreateTextMessage("Report request message.");
            reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST";
            
            reportRequestMessage.Headers.SetBoolean("BOOLEAN", false);
            //reportRequestMessage.Headers.SetByte("BYTE", 5);
            reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141);
            reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f);
            reportRequestMessage.Headers.SetInt("INT", 1);
            reportRequestMessage.Headers.SetLong("LONG", 1);
            reportRequestMessage.Headers.SetString("STRING", "hello");
            reportRequestMessage.Headers.SetShort("SHORT", 2);

            publisher.Send(reportRequestMessage);

            log.Debug("Sent the report request message, waiting for all replies...");

            // Wait until all the reports come in.
            allReportsReceivedEvt.WaitOne(TIMEOUT, true);

            // Check if all reports were really received or if the timeout occurred.
            if (numSubscribers == 0)
            {
                log.Debug("Got all reports.");
            }
            else
            {
                log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + ".");
            }

            // Send the termination request.
            IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message.");
            terminationRequestMessage.Headers["TYPE"] =  "TERMINATION_REQUEST";
            publisher.Send(terminationRequestMessage);

            log.Debug("Sent the termination request message.");

            // Close all message producers and consumers and the connection to the broker.
            Shutdown();
        }

        /// <summary>
        /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
        /// zero, at which time waiting threads are notified of this event.
        /// </summary>
        /// 
        /// <param name="message">The received report message.</param>
        public void OnMessage(IMessage message)
        {
            log.Debug("public void OnMessage(IMessage message = " + message + "): called");

            // Decrement the count of expected messages and release the wait monitor when this becomes zero.
            if (--numSubscribers == 0)
            {
                log.Debug("Got reports from all subscribers.");
                allReportsReceivedEvt.Set();
            }
        }

        /// <summary> Stops the message consumers and closes the connection. </summary>
        private void Shutdown()
        {
            connection.Stop();
            publisher.Dispose();
            channel.Dispose();
            connection.Dispose();
        }
    }
}