/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic; using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging.SessionReceiver { /// /// ISessionReceiver interface defines the callback for users to supply. /// Once established this callback will receive all messages for all /// receivers defined by the current session. /// Users are expected not to 'fetch' or 'get' messages by any other means. /// Users must acknowledge() the Session's messages either in the callback /// function or by some other scheme. /// public interface ISessionReceiver { void SessionReceiver(Receiver receiver, Message message); } /// /// EventEngine - wait for messages from the underlying C++ code. /// When available get them and deliver them via callback to our /// client through the ISessionReceiver interface. /// This class consumes the thread that calls the Run() function. /// internal class EventEngine { private Session session; private ISessionReceiver callback; private bool keepRunning; public EventEngine(Session theSession, ISessionReceiver thecallback) { this.session = theSession; this.callback = thecallback; } /// /// Function to call Session's nextReceiver, discover messages, /// and to deliver messages through the callback. /// public void Open() { Receiver rcvr; Message msg; keepRunning = true; while (keepRunning) { rcvr = session.NextReceiver(DurationConstants.SECOND); if (null != rcvr) { if (keepRunning) { msg = rcvr.Fetch(DurationConstants.SECOND); this.callback.SessionReceiver(rcvr, msg); } } //else // receive timed out // EventEngine exits the nextReceiver() function periodically // in order to test the keepRunning flag } // Private thread is now exiting. } /// /// Function to stop the EventEngine. Private thread will exit within /// one second. /// public void Close() { keepRunning = false; } } /// /// server is the class that users instantiate to connect a SessionReceiver /// callback to the stream of received messages received on a Session. /// public class CallbackServer { private EventEngine ee; /// /// Constructor for the server. /// /// The Session whose messages are collected. /// The user function call with each message. /// public CallbackServer(Session session, ISessionReceiver callback) { ee = new EventEngine(session, callback); new System.Threading.Thread( new System.Threading.ThreadStart(ee.Open)).Start(); } /// /// Function to stop the server. /// public void Close() { ee.Close(); } } }