summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/LoadBalancing/Hash_ReplicaControl.cpp
blob: 866f8d434297e91f515117bff941f5da28fb67f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// -*- C++ -*-

// $Id$

#include "Hash_ReplicaControl.h"

Hash_ReplicaControl::Hash_ReplicaControl (void)
  : adapter_ (this),
    replica_ (this),
    interval_start_ (ACE_OS::gettimeofday ()),
    request_count_ (0),
    current_load_ (0)
{
}

void
Hash_ReplicaControl::init (CORBA::ORB_ptr orb,
                           LoadBalancing::LoadBalancer_ptr balancer,
                           CORBA::Environment &ACE_TRY_ENV)
{
  ACE_DEBUG ((LM_DEBUG,
              "Hash_ReplicaControl::init\n"));
  ACE_Time_Value interval (1, 0);
  ACE_Time_Value restart (1, 0);
  ACE_Reactor *reactor = orb->orb_core ()->reactor ();
  reactor->schedule_timer (&this->adapter_, 0, interval, restart);

  LoadBalancing::ReplicaControl_var control =
    this->_this (ACE_TRY_ENV);
  ACE_CHECK;

  CORBA::Object_var replica =
    this->replica_._this (ACE_TRY_ENV);
  ACE_CHECK;

  this->group_ =
    balancer->group_identity (ACE_TRY_ENV);
  ACE_CHECK;

  this->proxy_ =
    balancer->connect (control.in (),
                       replica.in (),
                       ACE_TRY_ENV);
  ACE_CHECK;
}

int
Hash_ReplicaControl::handle_timeout (const ACE_Time_Value &,
                                     const void *)
{
  ACE_Time_Value elapsed_time =
    ACE_OS::gettimeofday () - this->interval_start_;
  this->interval_start_ = ACE_OS::gettimeofday ();

  CORBA::Float load =
    CORBA::Float(this->request_count_) / elapsed_time.msec ();
  this->request_count_ = 0;
  this->interval_start_ = ACE_OS::gettimeofday ();

  this->current_load_ =
    0.9F * this->current_load_ + 0.1F * load;

  ACE_TRY_NEW_ENV
    {
      this->proxy_->current_load (this->current_load_,
                                  ACE_TRY_ENV);
      ACE_TRY_CHECK;
      ACE_DEBUG ((LM_DEBUG, "Current_Load = %f\n", this->current_load_));
    }
  ACE_CATCHANY
    {
    }
  ACE_ENDTRY;
  return 0;
}

void
Hash_ReplicaControl::request_received (void)
{
  this->request_count_++;
}


void
Hash_ReplicaControl::request_rejected (CORBA::Environment &ACE_TRY_ENV)
{
  this->replica_.reject_requests (0);
  ACE_THROW (PortableServer::ForwardRequest (
                 CORBA::Object::_duplicate (this->group_.in ())));

}

void
Hash_ReplicaControl::high_load_advisory (CORBA::Environment &
                                         /* ACE_TRY_ENV */)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Notify the replica that it should reject all requests.
  this->replica_.reject_requests (1);
  ACE_DEBUG ((LM_DEBUG, "**** Load is high\n"));
}

void
Hash_ReplicaControl::nominal_load_advisory (CORBA::Environment &
                                            /* ACE_TRY_ENV */)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Notify the replica that it should once again accept requests.
  this->replica_.reject_requests (0);
  ACE_DEBUG ((LM_DEBUG, "**** Load is nominal\n"));
}

// ****************************************************************

Timeout_Adapter::Timeout_Adapter (Hash_ReplicaControl *adaptee)
  :  adaptee_ (adaptee)
{
}

int
Timeout_Adapter::handle_timeout (const ACE_Time_Value &current_time,
                                 const void *arg)
{
  return this->adaptee_->handle_timeout (current_time, arg);
}