summaryrefslogtreecommitdiff
path: root/ACE/ace/PIP_Invocation_Manager.cpp
blob: d18d8789e82e53a6980f16784e27825bb6aa910f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

#include "ace/PIP_Invocation_Manager.h"
#include "ace/PIP_IO_Handler.h"
#include <ace/PIP_Message_Handler.h>

#include <iostream>
ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::invocation_manager_ = 0;
ACE_Mutex                   ACE_PIP_Invocation_Manager::instance_lock_;
bool                        ACE_PIP_Invocation_Manager::delete_manager_ = false;
ACE_UINT64                  ACE_PIP_Invocation_Manager::message_id_base_ = 0;
ACE_UINT32                  ACE_PIP_Invocation_Manager::site_id_ = 0;

/// Constructor
ACE_PIP_Invocation_Manager::ACE_PIP_Invocation_Manager()
   : message_counter_(0)
{
}

/// Destructor
ACE_PIP_Invocation_Manager::~ACE_PIP_Invocation_Manager()
{

}

/// Processes requests received from I/O handler
void ACE_PIP_Invocation_Manager::process_inbound_request(ACE_PIP_Protocol_Message* message)
{
  ACE_PIP_Data_Message* payload = 
    static_cast<ACE_PIP_Data_Message*>(message->release_next());
  
  ACE_UINT32 handler_id = payload->destination_handler_ID();  
  ACE_PIP_Message_Handler* handler(0);
  
  big_lock_.acquire();
  ACE_UINT64 message_id = message->message_id();
  if (object_id_handler_map_.find(handler_id, handler) == 0)
  {
     // look to see if there are any accelerations. If so, accelerate.
     // Map the message ID to a list of outgoing messages
     in_out_id_map_.bind(message_id,
                         std::list<ACE_UINT64>());

     // Keep a record of the message and its priority so
     // it can be accelerated if necessary
     Invocation_Data invocation_data;
     invocation_data.site_id = site_id_;
     invocation_data.priority = payload->message_priority();
     invocation_data_map_.bind(message_id, invocation_data);

     big_lock_.release();
     
     // Pass the message to the message handler, deleting
     // the corresponding struct
     handler->process_incoming_message(payload->release_block(), message_id);
     delete payload;
     delete message;
     
     // Once message processing has completed,
     // clean-up any message residue
     big_lock_.acquire();
     in_out_id_map_.unbind(message_id);   
  }
  
  big_lock_.release();
}

/// Processes request to be forwarded to another handler
int ACE_PIP_Invocation_Manager::process_outbound_request(ACE_Message_Block* message, 
                                                          ACE_UINT64 token,
                                                          ACE_Future<ACE_Message_Block*>*& response_holder)
{
  // Create a protocol message from the data block
   ACE_PIP_Protocol_Message* protocol_message(0);

   ACE_NEW_RETURN(protocol_message, ACE_PIP_Protocol_Message, -1);
   protocol_message->message_type(ACE_PIP_Protocol_Message::REQUEST);
   protocol_message->process_message_payload(message);
      
   ACE_PIP_Data_Message* data_message =
     static_cast<ACE_PIP_Data_Message*>(protocol_message->next());

   // determine if the message expects a reply. If so, create a future for it.
   if (data_message->reply_expected())
     {
       // create and store future
        ACE_NEW_RETURN(response_holder, ACE_Future<ACE_Message_Block*>, -1);
     }
   
   Invocation_Data remote_info;
   remote_info.response_holder = response_holder;
   remote_info.site_id = data_message->destination_site_ID();
   remote_info.priority = data_message->message_priority();
   
   // Associate this message with token. This enables acceleration forwarding. Only
   // Token == -1 indicates this is the start of a call chain
   
   big_lock_.acquire();
   ACE_UINT64 message_id = generate_message_id();
   invocation_data_map_.bind(message_id, remote_info);
   protocol_message->message_id(message_id);

   if (token != 0)
     {
       ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* entry(0);
       // map the originating message to the outgoing message
       // so that accelerations can be forwarded appropriately
       in_out_id_map_.find(token, entry);
       if (entry)
         {
           entry->item().push_back(message_id);
         }
     }

   // use the priority and address to determine which I/O handler to send to 
   // pass the message to the I/O handler
   PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
   site_to_handlers_map_.find(data_message->destination_site_ID(),
                              handler_map);
   
   ACE_PIP_IO_Handler* IO_handler(0);
   handler_map->find(data_message->message_priority(),
                     IO_handler);

   big_lock_.release();
   
   if (IO_handler)
   {
      IO_handler->put_message(protocol_message);
   }

   return 0;
}

ACE_PIP_Invocation_Manager* ACE_PIP_Invocation_Manager::instance()
{
  if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0)
    {
      instance_lock_.acquire();
      
      if (ACE_PIP_Invocation_Manager::invocation_manager_ == 0)
        {
          ACE_NEW_RETURN (ACE_PIP_Invocation_Manager::invocation_manager_,
                          ACE_PIP_Invocation_Manager,
                          0);

          delete_manager_ = true;
        }

      instance_lock_.release();
    }
  
  return invocation_manager_;
}

/// Process response received from a handler
void ACE_PIP_Invocation_Manager::process_inbound_response(ACE_PIP_Protocol_Message* message)
{
   Invocation_Data remote_info;
  
   ACE_Guard<ACE_Mutex> guard(big_lock_);
   
   // Remove the child ID
   if (invocation_data_map_.unbind(message->message_id(),
                                   remote_info) != -1)
   {
      // Pass the received response to the message handler
      // via a Future
      remote_info.response_holder->set(message->next()->block());
      remote_info.response_holder = 0;
   }
}

/// Process response received from a handler
void ACE_PIP_Invocation_Manager::process_outbound_response(ACE_Message_Block* message, ACE_UINT64 token)
{
  // Parse the message
  ACE_PIP_Protocol_Message* response = new ACE_PIP_Protocol_Message;
  response->process_message_payload(message);
  response->message_type(ACE_PIP_Protocol_Message::RESPONSE);
  
  // Lookup the appropriate IO handler, and pass down the message
  ACE_PIP_Data_Message* data_message =
    static_cast<ACE_PIP_Data_Message*>(response->next());
  
  ACE_Guard<ACE_Mutex> guard(big_lock_);

  response->message_id(token);
  PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
  
  if (site_to_handlers_map_.find
      (data_message->destination_site_ID(),
       handler_map) != -1)
    {
      ACE_PIP_IO_Handler* handler(0);
      if (handler_map->find(data_message->message_priority(),
                            handler) != -1)
        {
          handler->put_message(response);
        }
    }
}

/// Process request to accelerate the priority of a process
void ACE_PIP_Invocation_Manager::process_acceleration(ACE_PIP_Protocol_Message* message)
{
   ACE_PIP_Accel_Message* accel_message =
      static_cast<ACE_PIP_Accel_Message*>(message->next());
 
   ACE_Guard<ACE_Mutex> guard(big_lock_);
  
   // Update the stored priority of the original message. This will enable subsequent upcalls
   // to adjust their priority appropriately
   Invocation_Data invocation_data;
   if (invocation_data_map_.unbind(message->message_id(), invocation_data) == 0)
   {
      if (invocation_data.priority < accel_message->new_priority())
      {
         invocation_data.priority = accel_message->new_priority();
      }
      
      invocation_data_map_.bind(message->message_id(), invocation_data);

      // Generate acceleration messages for each outgoing invocation
      // resulting from processing of incoming request. Do so
      // only if their priority is lower than the accelerated priority
      ACE_Hash_Map_Entry<ACE_UINT64, std::list<ACE_UINT64> >* child_entry(0);
      if (in_out_id_map_.find(message->message_id(), child_entry) == 0)
      {
         std::list<ACE_UINT64>::iterator child_iter = child_entry->item().begin();
         for (; child_iter != child_entry->item().end(); ++child_iter)
         {
            if (invocation_data_map_.unbind(*child_iter, invocation_data) == 0)
            {
               if (invocation_data.priority < accel_message->new_priority())
               {
                  invocation_data.priority = accel_message->new_priority();
                  invocation_data_map_.bind(*child_iter, invocation_data);
                  // Generate new message and send it to the appropriate site
                  PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
                  if (site_to_handlers_map_.find(invocation_data.site_id,
                                                 handler_map) == 0)
                  {
                     ACE_PIP_IO_Handler* handler(0);
                     if (handler_map->find(ACE_Event_Handler::HI_PRIORITY, handler) == 0)
                     {
                        ACE_PIP_Accel_Message* accel_copy = accel_message->copy();
                        ACE_PIP_Protocol_Message* proto_copy = message->copy();
                        proto_copy->next(accel_copy);
                        handler->put_message(proto_copy);
                     }
                  }
               }

               invocation_data_map_.bind(*child_iter, invocation_data);
            }
         }
      }
   }

   // delete the acceleration message here
}

/// Register an IO handler that can send messages on invocation
/// manager's behalf
int ACE_PIP_Invocation_Manager::register_IO_handler(ACE_PIP_IO_Handler* handler)
{
   // Extract the priority and remote address of handler
   ACE_UINT32 priority = handler->priority();
   ACE_Guard<ACE_Mutex> guard(big_lock_);
   
   // Map the destination site ID and priority to this handler
   PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
   if (site_to_handlers_map_.find(handler->destination_site_id(),
                                  handler_map) == -1)
   {
      ACE_NEW_RETURN(handler_map, PRIORITY_TO_IO_HANDLER_MAP, -1);
   }
  
   handler_map->bind(priority, handler);
   site_to_handlers_map_.bind(handler->destination_site_id(), handler_map);

   return 0;
}

void ACE_PIP_Invocation_Manager::unregister_IO_handler(ACE_PIP_IO_Handler* handler)
{
   ACE_UINT32 priority = handler->priority();
   
   ACE_Guard<ACE_Mutex> guard(big_lock_);
   
   // unbind the handler
   PRIORITY_TO_IO_HANDLER_MAP* handler_map(0);
   if (site_to_handlers_map_.find(handler->destination_site_id(),
                                  handler_map) != -1)
   {
      handler_map->unbind(priority, handler);
   }   
}

void ACE_PIP_Invocation_Manager::register_message_handler(ACE_PIP_Message_Handler* handler)
{
  // extract the object id from the handler
  // map the object id to the handler
  ACE_Guard<ACE_Mutex> guard(big_lock_);
  object_id_handler_map_.bind(handler->handler_id(), handler);
}

ACE_UINT64 ACE_PIP_Invocation_Manager::generate_message_id()
{
  return (((ACE_UINT64)site_id_) << 32) + message_counter_++;
}

void ACE_PIP_Invocation_Manager::site_id(ACE_UINT64 site_id)
{
  site_id_ = site_id;
}