summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs')
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs211
1 files changed, 211 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs
new file mode 100644
index 0000000000..b355abb28d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+
+namespace Apache.Qpid.Client.Tests.interop
+{
+ public class TopicListener
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TopicListener));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
+ public static string CONTROL_ROUTING_KEY = "topic_control";
+
+ /// <summary> Holds the routing key for the queue to send reports to. </summary>
+ public static string RESPONSE_ROUTING_KEY = "response";
+
+ /// <summary> Holds the connection to listen on. </summary>
+ private IConnection connection;
+
+ /// <summary> Holds the channel for all test messages.</summary>
+ private IChannel channel;
+
+ /// <summary> Holds the producer to send report messages on. </summary>
+ private IMessagePublisher publisher;
+
+ /// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */
+ private bool init;
+
+ /// <summary> Holds the count of messages received by this listener. </summary> */
+ private int count;
+
+ /// <summary> Creates a topic listener using the specified broker URL. </summary>
+ ///
+ /// <param name="connectionUri">The broker URL to listen on.</param>
+ TopicListener(string connectionUri)
+ {
+ log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ connection = new AMQConnection(connectionInfo);
+
+ // Establish a session on the broker.
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Set up a queue to listen for test messages on.
+ string topicQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(topicQueueName, false, true, true);
+
+ // Set this listener up to listen for incoming messages on the test topic queue.
+ channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY);
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName)
+ .Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ // Set up this listener with a producer to send the reports on.
+ publisher = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.DIRECT)
+ .WithRoutingKey(RESPONSE_ROUTING_KEY)
+ .Create();
+
+ connection.Start();
+ Console.WriteLine("Waiting for messages...");
+ }
+
+ public static void Main(String[] argv)
+ {
+ // Create an instance of this listener with the command line parameters.
+ new TopicListener(DEFAULT_URI);
+ }
+
+ /// <summary>
+ /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
+ /// shutdown messages result in this listener being terminated.
+ /// </summary>
+ ///
+ /// <param name="message">The received message.</param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void onMessage(Message message = " + message + "): called");
+
+ // Take the start time of the first message if this is the first message.
+ if (!init)
+ {
+ count = 0;
+ init = true;
+ }
+
+ // Check if the message is a control message telling this listener to shut down.
+ if (IsShutdown(message))
+ {
+ log.Debug("Got a shutdown message.");
+ Shutdown();
+ }
+ // Check if the message is a report request message asking this listener to respond with the message count.
+ else if (IsReport(message))
+ {
+ log.Debug("Got a report request message.");
+
+ // Send the message count report.
+ SendReport();
+
+ // Reset the initialization flag so that the next message is considered to be the first.
+ init = false;
+ }
+ // Otherwise it is an ordinary test message, so increment the message count.
+ else
+ {
+ count++;
+ }
+ }
+
+ /// <summary> Checks a message to see if it is a shutdown control message. </summary>
+ ///
+ /// <param name="m">The message to check.</param>
+ ///
+ /// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns>
+ private bool IsShutdown(IMessage m)
+ {
+ bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ //log.Debug("isShutdown = " + result);
+
+ return result;
+ }
+
+ /// <summary> Checks a message to see if it is a report request control message. </summary>
+ ///
+ /// <param name="m">The message to check.</param>
+ ///
+ /// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns>
+ private bool IsReport(IMessage m)
+ {
+ bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST");
+
+ //log.Debug("isReport = " + result);
+
+ return result;
+ }
+
+ /// <summary> Checks whether or not a text field on a message has the specified value. </summary>
+ ///
+ /// <param name="e">The message to check.</param>
+ /// <param name="e">The name of the field to check.</param>
+ /// <param name="e">The expected value of the field to compare with.</param>
+ ///
+ /// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns>
+ private static bool CheckTextField(IMessage m, string fieldName, string value)
+ {
+ /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ + ", String value = " + value + "): called");*/
+
+ string comp = m.Headers.GetString(fieldName);
+
+ return (comp != null) && comp == value;
+ }
+
+ /// <summary> Stops the message consumer and closes the connection. </summary>
+ private void Shutdown()
+ {
+ connection.Stop();
+ channel.Dispose();
+ connection.Dispose();
+ }
+
+ /// <summary> Sends the report message to the response location. </summary>
+ private void SendReport()
+ {
+ string report = "Received " + count + ".";
+
+ IMessage reportMessage = channel.CreateTextMessage(report);
+
+ reportMessage.Headers.SetBoolean("BOOLEAN", false);
+ //reportMessage.Headers.SetByte("BYTE", 5);
+ reportMessage.Headers.SetDouble("DOUBLE", 3.141);
+ reportMessage.Headers.SetFloat("FLOAT", 1.0f);
+ reportMessage.Headers.SetInt("INT", 1);
+ reportMessage.Headers.SetLong("LONG", 1);
+ reportMessage.Headers.SetString("STRING", "hello");
+ reportMessage.Headers.SetShort("SHORT", 2);
+
+ publisher.Send(reportMessage);
+
+ Console.WriteLine("Sent report: " + report);
+ }
+ }
+}