summaryrefslogtreecommitdiff
path: root/DAnCE/dance/LocalityManager/Handler/LocalityActivator_Impl.h
blob: af4a5def9ae602d139d91d8291865a759bb3963e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// $Id$
/**
 * @file LocalityActivator_Impl.h
 * @author William R. Otte
 */

#ifndef DAnCE_LocalityActivator_H_
#define DAnCE_LocalityActivator_H_

#include "ace/Process_Manager.h"
#include "ace/Event_Handler.h"
#include "ace/Condition_T.h"
#include "ace/OS_NS_sys_wait.h"
#include "ace/Refcounted_Auto_Ptr.h"
#include "ace/Unbounded_Set_Ex.h"
#include "ace/Synch_Traits.h"
#include "ace/Condition_T.h"
#include "tao/PortableServer/PortableServer.h"
#include "dance/DAnCE_LocalityManagerS.h"
#include "dance/DAnCE_Utility.h"

#include <set>

#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

namespace DAnCE
{
  /**
   * @author William R. Otte <wotte@dre.vanderbilt.edu>
   * @brief Default server activator for CIAO component servers.
   *
   * Implements the default component server activation strategy
   * which is to spawn new processes.  This is not thread-safe,
   * nor is it intended to be.  Containers are reated serially,
   * so there will be only one actor *modifying* data at a particular
   * point in time.
   */
  class DAnCE_LocalityActivator_i
    : public virtual ::POA_DAnCE::LocalityManagerActivator
  {
  public:
    /// Constructor
    DAnCE_LocalityActivator_i (CORBA::ULong def_spawn_delay,
                               const char * default_cs_path,
                               const char * cs_args,
                               bool multithreaded,
                               CORBA::ORB_ptr orb,
                               PortableServer::POA_ptr poa_);

    /// Destructor
    virtual ~DAnCE_LocalityActivator_i (void);

    virtual
      void locality_manager_callback (::DAnCE::LocalityManager_ptr serverref,
                                      const char * server_UUID,
                                      ::Deployment::Properties_out config);

    virtual void configuration_complete (const char *server_UUID);

    ::DAnCE::LocalityManager_ptr
    create_locality_manager (const ::Deployment::DeploymentPlan &plan,
                             CORBA::ULong instanceRef,
                             const ::Deployment::Properties & config);

    void remove_locality_manager (
                                  ::DAnCE::LocalityManager_ptr server);

    ::DAnCE::LocalityManager * get_locality_managers (void);

  private:
    struct Server_Info;
    class Server_Child_Handler;

    /// Builds command line options based on configuration information.
    /// May modify the uuid of the component server.
    ACE_CString construct_command_line (Server_Info &si);

    /// Spawns the component server process, but does not wait for it
    /// to call back.
    pid_t spawn_locality_manager (Server_Child_Handler* exit_handler,
                                  const ACE_CString &cmd_line);

    /// This method is only applicable when our program is configured as
    /// singled threaded . Internally it uses a @c perform_work blocking
    /// call to wait for NA object to call back
    void single_threaded_wait_for_callback (const Server_Info &si,
                                            ACE_Time_Value &timeout);

    /// This method is only applicable when our program is configured as
    /// multiple threaded. Internally it waits on a conditional variable
    /// that could be modified by the callback servant which runs in
    /// another thread
    void multi_threaded_wait_for_callback (Server_Info &si,
                                           ACE_Time_Value &timeout);

    void create_properties (const Server_Info &info,
                            Deployment::Properties_out config);

    struct Server_Info
    {
      enum ProcessStatus
        {
          ACTIVE,
          TERMINATE_REQUESTED,
          TERMINATED,
          INACTIVE
        };

      Server_Info (const ::Deployment::DeploymentPlan &plan,
                   CORBA::ULong instanceRef,
                   size_t cmap_size_hint = 128)
        : cmap_ (new DAnCE::Utility::PROPERTY_MAP (cmap_size_hint)),
          ref_ (DAnCE::LocalityManager::_nil ()),
          pid_ (ACE_INVALID_PID),
          status_ (INACTIVE),
          mutex_ (),
          condition_ (mutex_),
          plan_ (plan),
          instanceRef_ (instanceRef)
      {}

      typedef ACE_Refcounted_Auto_Ptr <DAnCE::Utility::PROPERTY_MAP,
                                       ACE_Null_Mutex> PROPERTY_MAP_PTR;

      ACE_CString uuid_;
      PROPERTY_MAP_PTR cmap_;
      DAnCE::LocalityManager_var ref_;
      pid_t pid_;
      ProcessStatus status_;
      TAO_SYNCH_MUTEX mutex_;
      ACE_Condition<TAO_SYNCH_MUTEX> condition_;
      const ::Deployment::DeploymentPlan &plan_;
      CORBA::ULong instanceRef_;
    };

    typedef ACE_Refcounted_Auto_Ptr<Server_Info, ACE_Null_Mutex> Safe_Server_Info;

    /**
    * @brief The exit handler class for the locality manager child process
    * to detect and report process exit
    */
    class Server_Child_Handler : public virtual ACE_Event_Handler
    {
    public:
      Server_Child_Handler (const Safe_Server_Info&  si);
      virtual ~Server_Child_Handler ();

      virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);

      virtual int handle_exit (ACE_Process *proc);

      const Server_Info& server_info () const
      {
        return *this->server_info_;
      }
    private:
      Safe_Server_Info server_info_;
    };

    struct _server_info
    {
      bool operator() (const Safe_Server_Info &a, const Safe_Server_Info &b) const
      {
        return a->uuid_ < b->uuid_;
      }
    };

    // Presumably, there won't be too many component servers per node application
    typedef std::set <Safe_Server_Info, _server_info> SERVER_INFOS;

    /// Default args to pass to all componentservers.
    ACE_CString default_args_;

    TAO_SYNCH_MUTEX container_mutex_;

    SERVER_INFOS server_infos_;

    ACE_Process_Manager process_manager_;

    CORBA::ULong spawn_delay_;

    /////*******NEW
    bool multithreaded_;

    CORBA::ORB_var orb_;

    PortableServer::POA_var poa_;

    ACE_CString cs_path_;

    ACE_CString cs_args_;

    TAO_SYNCH_MUTEX mutex_;

    ACE_Condition<TAO_SYNCH_MUTEX> condition_;
  };


}
#endif /* DAnCE_LocalityActivator_H_ */