summaryrefslogtreecommitdiff
path: root/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs')
-rw-r--r--qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs286
1 files changed, 286 insertions, 0 deletions
diff --git a/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs
new file mode 100644
index 0000000000..b1ba949e07
--- /dev/null
+++ b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using Org.Apache.Qpid.Messaging;
+using Org.Apache.Qpid.Messaging.SessionReceiver;
+
+namespace Org.Apache.Qpid.Messaging.Examples
+{
+ /// <summary>
+ /// A class with functions to display structured messages.
+ /// </summary>
+ public static class MessageViewer
+ {
+ /// <summary>
+ /// A Function to display a amqp/map message packaged as a Dictionary.
+ /// </summary>
+ /// <param name="dict">The AMQP map</param>
+ /// <param name="level">Nested depth</param>
+ public static void ShowDictionary(Dictionary<string, object> dict, int level)
+ {
+ foreach (KeyValuePair<string, object> kvp in dict)
+ {
+ Console.Write(new string(' ', level * 4));
+
+ if (QpidTypeCheck.ObjectIsMap(kvp.Value))
+ {
+ Console.WriteLine("Key: {0}, Value: Dictionary", kvp.Key);
+ ShowDictionary((Dictionary<string, object>)kvp.Value, level + 1);
+ }
+ else if (QpidTypeCheck.ObjectIsList(kvp.Value))
+ {
+ Console.WriteLine("Key: {0}, Value: List", kvp.Key);
+ ShowList((Collection<object>)kvp.Value, level + 1);
+ }
+ else
+ Console.WriteLine("Key: {0}, Value: {1}, Type: {2}",
+ kvp.Key, kvp.Value, kvp.Value.GetType().ToString());
+ }
+ }
+
+ /// <summary>
+ /// A function to display a ampq/list message packaged as a List.
+ /// </summary>
+ /// <param name="list">The AMQP list</param>
+ /// <param name="level">Nested depth</param>
+ public static void ShowList(Collection<object> list, int level)
+ {
+ foreach (object obj in list)
+ {
+ Console.Write(new string(' ', level * 4));
+
+ if (QpidTypeCheck.ObjectIsMap(obj))
+ {
+ Console.WriteLine("Dictionary");
+ ShowDictionary((Dictionary<string, object>)obj, level + 1);
+ }
+ else if (QpidTypeCheck.ObjectIsList(obj))
+ {
+ Console.WriteLine("List");
+ ShowList((Collection<object>)obj, level + 1);
+ }
+ else
+ Console.WriteLine("Value: {0}, Type: {1}",
+ obj.ToString(), obj.GetType().ToString());
+ }
+ }
+
+ /// <summary>
+ /// A function to diplay a Message. The native Object type is
+ /// decomposed into AMQP types.
+ /// </summary>
+ /// <param name="message">The Message</param>
+ public static void ShowMessage(Message message)
+ {
+ if ("amqp/map" == message.ContentType)
+ {
+ Console.WriteLine("Received a Dictionary");
+ Dictionary<string, object> content = new Dictionary<string, object>();
+ message.GetContent(content);
+ ShowDictionary(content, 0);
+ }
+ else if ("amqp/list" == message.ContentType)
+ {
+ Console.WriteLine("Received a List");
+ Collection<object> content = new Collection<object>();
+ message.GetContent(content);
+ ShowList(content, 0);
+ }
+ else
+ {
+ Console.WriteLine("Received a String");
+ Console.WriteLine(message.GetContent());
+ }
+ }
+ }
+
+
+
+ /// <summary>
+ /// A model class to demonstrate how a user may use the Qpid Messaging
+ /// interface to receive Session messages using a callback.
+ /// </summary>
+ class ReceiverProcess : ISessionReceiver
+ {
+ UInt32 messagesReceived = 0;
+
+ /// <summary>
+ /// SessionReceiver implements the ISessionReceiver interface.
+ /// It is the callback function that receives all messages for a Session.
+ /// It may be called any time server is running.
+ /// It is always called on server's private thread.
+ /// </summary>
+ /// <param name="receiver">The Receiver associated with the message.</param>
+ /// <param name="message">The Message</param>
+ public void SessionReceiver(Receiver receiver, Message message)
+ {
+ //
+ // Indicate message reception
+ //
+ Console.WriteLine("--- Message {0}", ++messagesReceived);
+
+ //
+ // Display the received message
+ //
+ MessageViewer.ShowMessage(message);
+
+ //
+ // Acknowledge the receipt of all received messages.
+ //
+ receiver.Session.Acknowledge();
+ }
+
+
+ /// <summary>
+ /// Usage
+ /// </summary>
+ /// <param name="url">Connection target</param>
+ /// <param name="addr">Address: broker exchange + routing key</param>
+ /// <param name="nSec">n seconds to keep callback open</param>
+ static void usage(string url, string addr, int nSec)
+ {
+
+ Console.WriteLine("usage: {0} [url [addr [nSec]]]",
+ System.Diagnostics.Process.GetCurrentProcess().ProcessName);
+ Console.WriteLine();
+ Console.WriteLine("A program to connect to a broker and receive");
+ Console.WriteLine("messages from a named exchange with a routing key.");
+ Console.WriteLine("The receiver uses a session callback and keeps the callback");
+ Console.WriteLine("server open for so many seconds.");
+ Console.WriteLine("The details of the message body's types and values are shown.");
+ Console.WriteLine();
+ Console.WriteLine(" url = target address for 'new Connection(url)'");
+ Console.WriteLine(" addr = address for 'session.CreateReceiver(addr)'");
+ Console.WriteLine(" nSec = time in seconds to keep the receiver callback open");
+ Console.WriteLine();
+ Console.WriteLine("Default values:");
+ Console.WriteLine(" {0} {1} {2} {3}",
+ System.Diagnostics.Process.GetCurrentProcess().ProcessName,
+ url, addr, nSec);
+ }
+
+
+ /// <summary>
+ /// A function to illustrate how to open a Session callback and
+ /// receive messages.
+ /// </summary>
+ /// <param name="args">Main program arguments</param>
+ public int TestProgram(string[] args)
+ {
+ string url = "amqp:tcp:localhost:5672";
+ string addr = "amq.direct/map_example";
+ int nSec = 30;
+ string connectionOptions = "";
+
+ if (1 == args.Length)
+ {
+ if (args[0].Equals("-h") || args[0].Equals("-H") || args[0].Equals("/?"))
+ {
+ usage(url, addr, nSec);
+ return 1;
+ }
+ }
+
+ if (args.Length > 0)
+ url = args[0];
+ if (args.Length > 1)
+ addr = args[1];
+ if (args.Length > 2)
+ nSec = System.Convert.ToInt32(args[2]);
+ if (args.Length > 3)
+ connectionOptions = args[3];
+
+ //
+ // Create and open an AMQP connection to the broker URL
+ //
+ Connection connection = new Connection(url, connectionOptions);
+ connection.Open();
+
+ //
+ // Create a session.
+ //
+ Session session = connection.CreateSession();
+
+ //
+ // Receive through callback
+ //
+ // Create callback server and implicitly start it
+ //
+ SessionReceiver.CallbackServer cbServer =
+ new SessionReceiver.CallbackServer(session, this);
+
+ //
+ // The callback server is running and executing callbacks on a
+ // separate thread.
+ //
+
+ //
+ // Create a receiver for the direct exchange using the
+ // routing key "map_example".
+ //
+ Receiver receiver = session.CreateReceiver(addr);
+
+ //
+ // Establish a capacity
+ //
+ receiver.Capacity = 100;
+
+ //
+ // Wait so many seconds for messages to arrive.
+ //
+ System.Threading.Thread.Sleep(nSec * 1000); // in mS
+
+ //
+ // Stop the callback server.
+ //
+ cbServer.Close();
+
+ //
+ // Close the receiver and the connection.
+ //
+ receiver.Close();
+ connection.Close();
+ return 0;
+ }
+ }
+
+
+ class MapCallbackReceiverMain
+ {
+ /// <summary>
+ /// Main program
+ /// </summary>
+ /// <param name="args">Main prgram args</param>
+ static int Main(string[] args)
+ {
+ // Invoke 'TestProgram' as non-static class.
+ ReceiverProcess mainProc = new ReceiverProcess();
+
+ int result = mainProc.TestProgram(args);
+
+ return result;
+ }
+ }
+}
+