summaryrefslogtreecommitdiff
path: root/ace/RMCast/RMCast_Partial_Message.cpp
blob: 5de2e660a11dfb535921efcaa67e34c056683d7b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// $Id$

#include "RMCast_Partial_Message.h"

#if !defined (__ACE_INLINE__)
#include "RMCast_Partial_Message.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(ace, RMCast_Partial_Message, "$Id$")

ACE_RMCast_Partial_Message::
ACE_RMCast_Partial_Message (ACE_UINT32 message_size)
  :  max_hole_count_ (ACE_RMCAST_DEFAULT_HOLE_COUNT),
     hole_count_ (1)
{
  ACE_NEW (this->hole_list_,
           ACE_RMCast_Partial_Message::Hole[this->max_hole_count_]);
  this->hole_list_[0].start = 0;
  this->hole_list_[0].end = message_size;

  this->message_body_.size (message_size);
  this->message_body_.wr_ptr (message_size);
}

ACE_RMCast_Partial_Message::
~ACE_RMCast_Partial_Message (void)
{
  delete[] this->hole_list_;
}

int
ACE_RMCast_Partial_Message::fragment_received (ACE_UINT32 message_size,
                                               ACE_UINT32 offset,
                                               ACE_Message_Block *mb)
{
  if (this->message_body_.length () != message_size)
    {
      //      ACE_DEBUG ((LM_DEBUG,
      //                  "Partial_Message::fragment_received - "
      //                  "invalid message length\n"));
      return -1;
    }

  // Just copy the data...
  char *rd_ptr = this->message_body_.rd_ptr () + offset;
  size_t total_length = 0;
  {
    for (const ACE_Message_Block *i = mb; i != 0; i = i->cont ())
      {
        if (rd_ptr + i->length () > this->message_body_.wr_ptr ())
          {
            //            ACE_DEBUG ((LM_DEBUG,
            //                        "Partial_Message::fragment_received - "
            //                        "invalid payload length\n"));
            return -1;
          }
        ACE_OS::memcpy (rd_ptr, i->rd_ptr (), i->length ());
        rd_ptr       += i->length ();
        total_length += i->length ();
      }
  }

  // The algorithm works like this:
  //
  //   For each hole we determine if there is an intersection between
  //   the hole and the incoming fragment.  If there is none we do
  //   nothing (actually since the holes are ordered we can stop the
  //   iteration if the

  ACE_UINT32 start = offset;
  ACE_UINT32 end   = offset + total_length;

  while (start != end && this->hole_count_ != 0)
    {
      for (size_t i = 0; i < this->hole_count_; ++i)
        {
          Hole& hole = this->hole_list_[i];

          // First check if the new data insersects the hole...
          if (end <= hole.start)
            return 0;
          if (start >= hole.end)
            {
              if (i == this->hole_count_ - 1)
                return 0;
              else
                continue;
            }

          // The hole and the new fragment intersect, we have to
          // update the hole list.
          //
          // There are only three cases for the <start> value:
          // start < hole.start
          // start == hole.start
          // hole.start < start < hole.end
          //
          // But the code for both start == hole.start and start <
          // hole.start is identical....

          if (start <= hole.start)
            {
              if (end < hole.end)
                {
                  // NOTE: hole.start < end, because of previous test

                  // In this case we shrink the hole, but it is not
                  // removed!
                  hole.start = end;
                  return 0;
                }
              else // end >= hole.end
                {
                  start = hole.end;
                  // We remove the hole, and continue the iteration...
                  if (this->remove_hole (i) == -1)
                    return -1;
                  break;
                }
            }
          else // hole.start < start < hole.end
            {
              if (end >= hole.end)
                {
                  // Just adjust the size of the hole...
                  ACE_UINT32 tmp = hole.end;
                  hole.end = start;
                  start = tmp;
                  break;
                }
              else // if (end < hole.end)
                {
                  // Nasty, we need to insert a new hole...
                  if (this->insert_hole (i, end, hole.end) == -1)
                    return -1;
                  // and change the old hole...
                  // NOTE: we have to refetch it because the array may
                  // have been reallocated!
                  this->hole_list_[i].end = start;
                  return 0;
                }
            }
        }
    }
  return 0;
  // @@ OLD COMMENTS, the algorithm has changed since!
  // There are several cases:
  //
  // 1) The fragment is completely contained in data already received,
  //    nothing changes in this case.
  //
  // 2) Part of the fragment is contained in data already received and
  //    part is new data:
  //    2.1) The new data closes a hole, remove it from the list
  //    2.2) The beginning of the new fragment is the new data, reduce
  //    the size of the hole
  //    2.3) The end of the new fragment is the new data, increment
  //    the size of the received block
  //
  // 3) The fragment is completely contained in a hole
  //    3.1) It closes the hole, remove it from the list
  //    3.2) It starts at the beginning of a hole, grow the received
  //    block
  //    3.3) It ends at the end of a hole, reduce the hole size
  //    3.4) It is in the middle of a hole, insert a new hole
  //
}

int
ACE_RMCast_Partial_Message::insert_hole (size_t i,
                                         ACE_UINT32 start,
                                         ACE_UINT32 end)
{
  //  ACE_DEBUG ((LM_DEBUG,
  //              "Partial_Message::insert_hole %d = [%d,%d]\n",
  //              i, start, end));
  if (this->hole_count_ + 1 > this->max_hole_count_)
    {
      this->max_hole_count_ *= 2;
      Hole *tmp;
      ACE_NEW_RETURN (tmp, Hole[this->max_hole_count_], -1);
      for (size_t j = 0; j != this->hole_count_; ++j)
        {
          tmp[j] = this->hole_list_[j];
        }
      delete[] this->hole_list_;
      this->hole_list_ = tmp;
    }
  if (this->hole_count_ != 0)
    {
      for (size_t j = this->hole_count_ - 1; j >= i + 1; --j)
        {
          this->hole_list_[j+1] = this->hole_list_[j];
        }
    }

  this->hole_list_[i + 1].start = start;
  this->hole_list_[i + 1].end   = end;
  this->hole_count_++;

  return 0;
}

int
ACE_RMCast_Partial_Message::remove_hole (size_t i)
{
  //  ACE_DEBUG ((LM_DEBUG,
  //              "Partial_Message::remove_hole %d\n",
  //              i));
  for (size_t j = i; j != this->hole_count_ - 1; ++j)
    this->hole_list_[j] = this->hole_list_[j + 1];

  this->hole_count_--;
  return 0;
}