summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/LoadBalancing/Hash_ReplicaControl.cpp
blob: 22c9c9361c3b094e4aeb2ca88e9a2ffb5de3367b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// -*- C++ -*-

// $Id$

#include "Hash_ReplicaControl.h"
#include "tao/ORB_Core.h"
#include "ace/Reactor.h"

Hash_ReplicaControl::Hash_ReplicaControl (void)
  : adapter_ (this),
    replica_ (this),
    interval_start_ (ACE_OS::gettimeofday ()),
    request_count_ (0),
    current_load_ (0)
{
}

void
Hash_ReplicaControl::init (CORBA::ORB_ptr orb,
                           LoadBalancing::LoadBalancer_ptr balancer,
                           CORBA::Environment &ACE_TRY_ENV)
{
  ACE_DEBUG ((LM_DEBUG,
              "Hash_ReplicaControl::init\n"));
  ACE_Time_Value interval (1, 0);
  ACE_Time_Value restart (1, 0);
  ACE_Reactor *reactor = orb->orb_core ()->reactor ();
  reactor->schedule_timer (&this->adapter_, 0, interval, restart);

  LoadBalancing::ReplicaControl_var control =
    this->_this (ACE_TRY_ENV);
  ACE_CHECK;

  CORBA::Object_var replica =
    this->replica_._this (ACE_TRY_ENV);
  ACE_CHECK;

  this->group_ =
    balancer->group_identity (ACE_TRY_ENV);
  ACE_CHECK;

  this->proxy_ =
    balancer->connect (control.in (),
                       replica.in (),
                       ACE_TRY_ENV);
  ACE_CHECK;
}

int
Hash_ReplicaControl::handle_timeout (const ACE_Time_Value &,
                                     const void *)
{
  ACE_Time_Value elapsed_time =
    ACE_OS::gettimeofday () - this->interval_start_;
  this->interval_start_ = ACE_OS::gettimeofday ();

  CORBA::Float load =
    CORBA::Float(this->request_count_) / elapsed_time.msec ();
  this->request_count_ = 0;
  this->interval_start_ = ACE_OS::gettimeofday ();

  // @@ Ossama: here is the dampening algorithm that i implemented, it
  // is not rocket science, but helps...
  this->current_load_ =
    0.9F * this->current_load_ + 0.1F * load;

  ACE_TRY_NEW_ENV
    {
      this->proxy_->current_load (this->current_load_,
                                  ACE_TRY_ENV);
      ACE_TRY_CHECK;
      ACE_DEBUG ((LM_DEBUG, "Current_Load = %f\n", this->current_load_));
    }
  ACE_CATCHANY
    {
    }
  ACE_ENDTRY;
  return 0;
}

void
Hash_ReplicaControl::request_received (void)
{
  this->request_count_++;
}


void
Hash_ReplicaControl::request_rejected (CORBA::Environment &ACE_TRY_ENV)
{
  // @@ Ossama: notice how we reject a single request.  Maybe the
  // advisory should include how many are we supposed to shed?
  this->replica_.reject_requests (0);
  ACE_THROW (PortableServer::ForwardRequest (
                 CORBA::Object::_duplicate (this->group_.in ())));

}

void
Hash_ReplicaControl::high_load_advisory (CORBA::Environment &
                                         /* ACE_TRY_ENV */)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Notify the replica that it should reject all requests.
  this->replica_.reject_requests (1);
  ACE_DEBUG ((LM_DEBUG, "**** Load is high\n"));
}

void
Hash_ReplicaControl::nominal_load_advisory (CORBA::Environment &
                                            /* ACE_TRY_ENV */)
  ACE_THROW_SPEC ((CORBA::SystemException))
{
  // Notify the replica that it should once again accept requests.
  this->replica_.reject_requests (0);
  ACE_DEBUG ((LM_DEBUG, "**** Load is nominal\n"));
}

// ****************************************************************

Timeout_Adapter::Timeout_Adapter (Hash_ReplicaControl *adaptee)
  :  adaptee_ (adaptee)
{
}

int
Timeout_Adapter::handle_timeout (const ACE_Time_Value &current_time,
                                 const void *arg)
{
  return this->adaptee_->handle_timeout (current_time, arg);
}