summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/RMCast/Flow.cpp
blob: ddb8bf7b661cd4fdf9eae2617ad10bb2a781bf3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// author    : Boris Kolpackov <boris@kolpackov.net>
// $Id$

#include "Flow.h"

#include "ace/ACE.h"
#include "ace/OS_NS_unistd.h"   // sleep
#include "ace/OS_NS_sys_time.h" // gettimeofday

#include "ace/os_include/os_math.h" // exp

namespace ACE_RMCast
{
  Flow::
  Flow (Parameters const& )
    : //params_ (params),
        nak_time_ (0, 0),
        sample_start_time_ (0, 0),
        sample_bytes_ (0),
        current_tput_ (0.0),
        cap_tput_ (0.0)
  {
  }

  void Flow::send (Message_ptr m)
  {
    if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
    {
      ACE_Time_Value now_time (ACE_OS::gettimeofday ());

      Lock l (mutex_);
      sample_bytes_ += data->size ();

      if (sample_start_time_ == ACE_Time_Value (0, 0))
      {
        sample_start_time_ = now_time;
      }
      else
      {
        ACE_Time_Value delta (now_time - sample_start_time_);

        if (delta > ACE_Time_Value (0, 2000))
        {
          current_tput_ =
            double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());

          // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;

          sample_bytes_ = 0;
          sample_start_time_ = ACE_Time_Value (0, 0);
        }
      }

      if (!ACE::is_equal (cap_tput_, 0.0) &&
          !ACE::is_equal (current_tput_, 0.0) &&
          current_tput_ > cap_tput_)
      {
        double dev = (current_tput_ - cap_tput_) / current_tput_;

        // cerr << "deviation: " << dev << endl;

        // Cap decay algorithm.
        //
        {
          ACE_Time_Value delta (now_time - nak_time_);

          unsigned long msec = delta.msec ();

          double x = msec / -16000.0;
          double y = 1.0 * exp (x);
          cap_tput_ = cap_tput_ / y;

          // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
        }

        l.release ();


        timespec time;
        time.tv_sec = 0;
        time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);

        // Don't bother to sleep if the time is less than 10 usec.
        //
        if (time.tv_nsec > 10000)
          ACE_OS::sleep (ACE_Time_Value (time));
      }
    }

    out_->send (m);
  }

  void Flow::recv (Message_ptr m)
  {
    if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
    {
      Address to (static_cast<To const*> (m->find (To::id))->address ());

      if (nak->address () == to)
      {
        // This one is for us.
        //

        //cerr << "NAK from "
        //     << static_cast<From const*> (m->find (From::id))->address ()
        //     << " for " << nak->count () << " sns." << endl;


        ACE_Time_Value nak_time (ACE_OS::gettimeofday ());

        Lock l (mutex_);

        nak_time_ = nak_time;

        if (ACE::is_equal (cap_tput_, 0.0))
          cap_tput_ = current_tput_;

        if (!ACE::is_equal (cap_tput_, 0.0))
        {
          cap_tput_ = cap_tput_ - cap_tput_ / 6.0;

          // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
        }
      }
    }

    in_->recv (m);
  }
}