summaryrefslogtreecommitdiff
path: root/qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/sessionreceiver.cs
blob: 680732068ffc70f1bb18822bb98c005e59f852af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

using System;
using System.Collections.Generic;
using Org.Apache.Qpid.Messaging;

namespace Org.Apache.Qpid.Messaging.SessionReceiver
{
    /// <summary>
    /// ISessionReceiver interface defines the callback for users to supply.
    /// Once established this callback will receive all messages for all 
    /// receivers defined by the current session.
    /// Users are expected not to 'fetch' or 'get' messages by any other means.
    /// Users must acknowledge() the Session's messages either in the callback
    /// function or by some other scheme.
    /// </summary>

    public interface ISessionReceiver
    {
        void SessionReceiver(Receiver receiver, Message message);
    }

    
    /// <summary>
    /// EventEngine - wait for messages from the underlying C++ code.
    /// When available get them and deliver them via callback to our 
    /// client through the ISessionReceiver interface.
    /// This class consumes the thread that calls the Run() function.
    /// </summary>

    internal class EventEngine
    {
        private Session          session;
        private ISessionReceiver callback;
        private bool             keepRunning;

        public EventEngine(Session theSession, ISessionReceiver thecallback)
        {
            this.session  = theSession;
            this.callback = thecallback;
        }

        /// <summary>
        /// Function to call Session's nextReceiver, discover messages,
        /// and to deliver messages through the callback.
        /// </summary>
        public void Open()
        {
            Receiver rcvr;
            Message  msg;

            keepRunning = true;
            while (keepRunning)
            {
                rcvr = session.NextReceiver(DurationConstants.SECOND);

                if (null != rcvr)
                {
                    if (keepRunning)
                    {
                        msg = rcvr.Fetch(DurationConstants.SECOND);
                        this.callback.SessionReceiver(rcvr, msg);
                    }
                }
                //else
                //    receive timed out
                //    EventEngine exits the nextReceiver() function periodically
                //    in order to test the keepRunning flag
            }
            // Private thread is now exiting.
        }

        /// <summary>
        /// Function to stop the EventEngine. Private thread will exit within
        /// one second.
        /// </summary>
        public void Close()
        {
            keepRunning = false;
        }
    }


    /// <summary>
    /// server is the class that users instantiate to connect a SessionReceiver
    /// callback to the stream of received messages received on a Session.
    /// </summary>
    public class CallbackServer
    {
        private EventEngine ee;

        /// <summary>
        /// Constructor for the server.
        /// </summary>
        /// <param name="session">The Session whose messages are collected.</param>
        /// <param name="callback">The user function call with each message.</param>
        /// 
        public CallbackServer(Session session, ISessionReceiver callback)
        {
            ee = new EventEngine(session, callback);

            new System.Threading.Thread(
                new System.Threading.ThreadStart(ee.Open)).Start();
        }

        /// <summary>
        /// Function to stop the server.
        /// </summary>
        public void Close()
        {
            ee.Close();
        }
    }
}