summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs')
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs208
1 files changed, 208 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
new file mode 100644
index 0000000000..4fd0419e9c
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+
+namespace Apache.Qpid.Client.Tests.interop
+{
+ public class TopicPublisher
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TopicPublisher));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
+ const int TIMEOUT = 10000;
+
+ /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
+ const string CONTROL_ROUTING_KEY = "topic_control";
+
+ /// <summary> Holds the routing key for the queue to send reports to. </summary>
+ const string RESPONSE_ROUTING_KEY = "response";
+
+ /// <summary> Holds the number of messages to send in each test run. </summary>
+ private int numMessages;
+
+ /// <summary> Holds the number of subscribers listening to the messsages. </summary>
+ private int numSubscribers;
+
+ /// <summary> A monitor used to wait for all reports to arrive back from consumers on. </summary>
+ private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false);
+
+ /// <summary> Holds the connection to listen on. </summary>
+ private IConnection connection;
+
+ /// <summary> Holds the channel for all test messages.</summary>
+ private IChannel channel;
+
+ /// <summary> Holds the producer to send test messages on. </summary>
+ private IMessagePublisher publisher;
+
+ /// <summary>
+ /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
+ /// subscribers.
+ /// </summary>
+ ///
+ /// <param name="connectionUri">The broker URL.</param>
+ /// <param name="numMessages">The number of messages to send in each test.</param>
+ /// <param name="numSubscribers">The number of subscribes that are expected to reply with a report.</param>
+ TopicPublisher(string connectionUri, int numMessages, int numSubscribers)
+ {
+ log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages +
+ ", int numSubscribers = " + numSubscribers + "): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ connection = new AMQConnection(connectionInfo);
+
+ // Establish a session on the broker.
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Set up a queue to listen for reports on.
+ string responseQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(responseQueueName, false, true, true);
+
+ // Set this listener up to listen for reports on the response queue.
+ channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY);
+ //channel.Bind(responseQueueName, "<<default>>", RESPONSE_ROUTING_KEY);
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
+ .Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ // Set up this listener with a producer to send the test messages and report requests on.
+ publisher = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(CONTROL_ROUTING_KEY)
+ .Create();
+
+ // Keep the test parameters.
+ this.numMessages = numMessages;
+ this.numSubscribers = numSubscribers;
+
+ connection.Start();
+ Console.WriteLine("Sending messages and waiting for reports...");
+ }
+
+ /// <summary>
+ /// Start a test subscriber. The broker URL must be specified as the first command line argument.
+ /// </summary>
+ ///
+ /// <param name="argv">The command line arguments, broker URL first.</param>
+ public static void Main(String[] argv)
+ {
+ // Create an instance of this publisher with the command line parameters.
+ TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1);
+
+ // Publish the test messages.
+ publisher.DoTest();
+ }
+
+ /// <summary>
+ /// Sends the test messages and waits for all subscribers to reply with a report.
+ /// </summary>
+ public void DoTest()
+ {
+ log.Debug("public void DoTest(): called");
+
+ // Create a test message to send.
+ IMessage testMessage = channel.CreateTextMessage("test");
+
+ // Send the desired number of test messages.
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.Send(testMessage);
+ }
+
+ log.Debug("Sent " + numMessages + " test messages.");
+
+ // Send the report request.
+ IMessage reportRequestMessage = channel.CreateTextMessage("Report request message.");
+ reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST";
+
+ reportRequestMessage.Headers.SetBoolean("BOOLEAN", false);
+ //reportRequestMessage.Headers.SetByte("BYTE", 5);
+ reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141);
+ reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f);
+ reportRequestMessage.Headers.SetInt("INT", 1);
+ reportRequestMessage.Headers.SetLong("LONG", 1);
+ reportRequestMessage.Headers.SetString("STRING", "hello");
+ reportRequestMessage.Headers.SetShort("SHORT", 2);
+
+ publisher.Send(reportRequestMessage);
+
+ log.Debug("Sent the report request message, waiting for all replies...");
+
+ // Wait until all the reports come in.
+ allReportsReceivedEvt.WaitOne(TIMEOUT, true);
+
+ // Check if all reports were really received or if the timeout occurred.
+ if (numSubscribers == 0)
+ {
+ log.Debug("Got all reports.");
+ }
+ else
+ {
+ log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + ".");
+ }
+
+ // Send the termination request.
+ IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message.");
+ terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST";
+ publisher.Send(terminationRequestMessage);
+
+ log.Debug("Sent the termination request message.");
+
+ // Close all message producers and consumers and the connection to the broker.
+ Shutdown();
+ }
+
+ /// <summary>
+ /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
+ /// zero, at which time waiting threads are notified of this event.
+ /// </summary>
+ ///
+ /// <param name="message">The received report message.</param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void OnMessage(IMessage message = " + message + "): called");
+
+ // Decrement the count of expected messages and release the wait monitor when this becomes zero.
+ if (--numSubscribers == 0)
+ {
+ log.Debug("Got reports from all subscribers.");
+ allReportsReceivedEvt.Set();
+ }
+ }
+
+ /// <summary> Stops the message consumers and closes the connection. </summary>
+ private void Shutdown()
+ {
+ connection.Stop();
+ publisher.Dispose();
+ channel.Dispose();
+ connection.Dispose();
+ }
+ }
+}