summaryrefslogtreecommitdiff
path: root/src/3rd_party-static/MessageBroker/include/mb_controller.hpp
blob: 2d196aced2cad8a5af7a00a3aa04a7e8027f9217 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/**
 * \file mb_controller.hpp
 * \brief MessageBroker Controller.
 * \author AKara
 */

#ifndef MB_CONTROLLER_H 
#define MB_CONTROLLER_H 

#include <iostream>

#include "json/json.h"

#include "mb_tcpclient.hpp"
#include "utils/lock.h"
#include "utils/atomic_object.h"

#include <cstring> 

/**
 * \namespace NsMessageBroker
 * \brief MessageBroker related functions.
 */ 
namespace NsMessageBroker
{

   /**
    *\class CMessageBrokerController
    * \brief MessageBroker Controller.
    */
   class CMessageBrokerController : public TcpClient
   {
   public:
      /**
      * \brief Constructor.
      * \param address remote network address or FQDN
      * \param port remote local port
      * \param name name of component
      */
      CMessageBrokerController(const std::string& address, uint16_t port, std::string name);


      /**
      * \brief Destructor.
      */
      ~CMessageBrokerController();

      /**
      * \brief Receive data from the network.
      * \param data if data is received it will put in this reference
      * \return number of bytes received or -1 if error
      * \note This method will blocked until data comes.
      */
      virtual ssize_t Recv(std::string& data);

      /**
      * \brief Send data.
      * \param data data to send
      * \return number of bytes sent or -1 if error
      */
      ssize_t Send(const std::string& data);

      /**
       * \brief send Json message.
       * \param message JSON message.
       */
      void sendJsonMessage(Json::Value& message);

      /**
      * \brief generates new message id from diapason mControllersIdStart - (mControllersIdStart+999).
      * \return next id for message
      */
      int getNextMessageId();

      /**
      * \brief generates new message with id, jsonrpc version.
      * \param root container for prepared message
      */
      void prepareMessage(Json::Value& root);

      /**
      * \brief generates new message with id, jsonrpc version.
      * \note Doesn't change/add fields id and jsonrpc
      * \param errCode error code
      * \param errMessage string of message
      * \param error container for prepared error message
      */
      void prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error);

      /**
       * \brief gets  destination component name.
       * \param root JSON message.
       * \return string destination component name.
       */
      std::string getDestinationComponentName(Json::Value& root);

      /**
       * \brief gets  method name.
       * \param root JSON message.
       * \return string method name.
       */
      std::string getMethodName(Json::Value& root);

      /**
       * \brief checks is message notification or not.
       * \param root JSON message.
       * \return true if notification.
       */
      bool isNotification(Json::Value& root);

      /**
       * \brief checks is message response or not.
       * \param root JSON message.
       * \return true if response.
       */
      bool isResponse(Json::Value& root);

      /**
       * \brief searches Method by id in mWaitResponseQueue.
       * \param id id of incoming JSON message.
       * \return string method name or "" in case not found.
       */
      std::string findMethodById(std::string id);

      /**
       * \brief register controller on the server.
       * \param id message id for JSON message due the id diapason hasn't been received.
       */
      void registerController(int id = 0);

      /**
       * \brief unregister controller on the server.
       */
      void unregisterController();

      /**
       * \brief subscribes controller to the property changing.
       * \param property property name in format ComponentName.PropertyName.
       */
      void subscribeTo(std::string property);

      /**
       * \brief unsubscribes controller from the property changing.
       * \param property property name in format ComponentName.PropertyName.
       */
      void unsubscribeFrom(std::string property);

      /**
       * \brief pure virtual method to process response.
       * \param method method name which has been called.
       * \param root JSON message.
       */
      virtual void processResponse(std::string method, Json::Value& root) = 0;

      /**
       * \brief pure virtual method to process request.
       * \param root JSON message.
       */
      virtual void processRequest(Json::Value& root) = 0;

      /**
       * \brief Process notification message.
       * \brief Notify subscribers about property change.
       * expected notification format example:
       * \code
       * {"jsonrpc": "2.0", "method": "<ComponentName>.<NotificationName>", "params": <list of params>}
       * \endcode
       * \param root JSON message.
       */
      virtual void processNotification(Json::Value& root) = 0;

      /**
       * \brief Checks message.
       * \param root JSON message.
       * \param error JSON message to fill in case of any errors.
       * \return true if message is good.
       */
      bool checkMessage(Json::Value& root, Json::Value& error);
      
      /**
       * \brief Returns name of Controller.
       * \return name of controller.
       */
      std::string getControllersName();

      /**
      * \brief Method for receiving thread.
      */
      void* MethodForReceiverThread(void * arg);

      virtual void exitReceivingThread() {
        Close();
        stop = true;
      }

   protected:
      /**
       * @brief flag top stop thread
       */
      sync_primitives::atomic_bool stop;

   private:
      /**
      * \brief Method for receiving messages without tcp packeting.
      * \param message received data
      */
      void onMessageReceived(Json::Value message);
      /**
      * \brief Start value of id's diapason.
      */
      std::string m_receivingBuffer;

      /**
      * \brief Start value of id's diapason.
      */
      int mControllersIdStart;

      /**
      * \brief Current id's value.
      */
      int mControllersIdCurrent;

      /**
      * \brief Already sent messages Methods to recognize esponses: MessageId:MethodName.
      */
      std::map<std::string, std::string> mWaitResponseQueue;

      /**
      * \brief Name of component.
      */
      std::string mControllersName;

      /**
      * \brief JSON reader.
      */
     Json::Reader m_reader;

     /**
      * \brief JSON writer.
      */
     Json::FastWriter m_writer;

     /**
      * \brief JSON writer.
      */
     Json::FastWriter m_receiverWriter;
     /*
      * @brief mutex for mWaitResponseQueue
      */
     sync_primitives::Lock       queue_lock_;
   };
} /* namespace NsMessageBroker */
#endif /* MB_CONTROLLER_H */