summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/IncomingMessage.h
blob: 7aa8e33df2423f291d637d5e88b20d1de049981f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifndef _IncomingMessage_
#define _IncomingMessage_

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "qpid/sys/Monitor.h"
#include <map>
#include <queue>
#include <vector>
#include <boost/variant.hpp>

namespace qpid {
namespace client {

class Message;

/**
 * Manage incoming messages.
 *
 * Uses reference and destination concepts from 0-9 Messsage class.
 *
 * Basic messages use special destination and reference names to indicate
 * get-ok, return etc. messages.
 *
 */
class IncomingMessage {
  public:
    /** Accumulate data associated with a set of messages. */
    struct Reference {
        std::string data;
        std::vector<Message> messages;
    };

    /** Interface to a destination for messages. */
    class Destination {
      public:
        virtual ~Destination();

        /** Pass a message to the destination */
        virtual void message(const Message&) = 0;

        /** Notify destination of queue-empty contition */
        virtual void empty() = 0;
    };


    /** A destination that a thread can wait on till a message arrives. */
    class WaitableDestination : public Destination
    {
      public:
        WaitableDestination();
        void message(const Message& msg);        
        void empty();
        /** Wait till message() or empty() is called. True for message() */
        bool wait(Message& msgOut);
        void shutdown();

      private:
        struct Empty {};
        typedef boost::variant<Message,Empty> Item;
        sys::Monitor monitor;
        std::queue<Item> queue;
        bool shutdownFlag;
    };
 


    /** Add a reference. Throws if already open. */
    void openReference(const std::string& name);

    /** Get a reference. Throws if not already open. */
    void appendReference(const std::string& name,
                         const std::string& data);

    /** Create a message to destination associated with reference
     *@exception if destination or reference non-existent.
     */
    Message&  createMessage(const std::string& destination,
                            const std::string& reference);

    /** Get a reference.
     *@exception if non-existent.
     */
    Reference& getReference(const std::string& name);
    
    /** Close a reference and deliver all its messages.
     * Throws if not open or a message has an invalid destination.
     */
    void closeReference(const std::string& name);

    /** Add a destination.
     *@exception if a different Destination is already registered
     * under name.
     */
    void addDestination(std::string name, Destination&);

    /** Remove a destination. Throws if does not exist */
    void removeDestination(std::string name);

    /** Get a destination. Throws if does not exist */
    Destination& getDestination(const std::string& name);
  private:

    typedef std::map<std::string, Reference> ReferenceMap;
    typedef std::map<std::string, Destination*> DestinationMap;
    
    Reference& getRefUnlocked(const std::string& name);
    Destination& getDestUnlocked(const std::string& name);

    mutable sys::Mutex lock;
    ReferenceMap references;
    DestinationMap destinations;
};

}}


#endif