summaryrefslogtreecommitdiff
path: root/ACE/protocols/ace/RMCast/Reassemble.cpp
blob: e5aeab9251927af2e224cab5e1be2d6ecc2443e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// file      : ace/RMCast/Reassemble.cpp
// author    : Boris Kolpackov <boris@kolpackov.net>
// cvs-id    : $Id$

#include "Reassemble.h"
#include "ace/OS_NS_stdlib.h"

/*
#include <iostream>
using std::cerr;
using std::endl;
*/

namespace ACE_RMCast
{
  Reassemble::
  Reassemble (Parameters const& params)
      : params_ (params)
  {
  }

  void Reassemble::recv (Message_ptr m)
  {
    Map::ENTRY* e;
    Address from (
      static_cast<From const*> (m->find (From::id))->address ());

    if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
    {
      if (Part const* part = static_cast<Part const*> (m->find (Part::id)))
      {
        if (map_.find (from, e) == -1)
        {
          // First part of the message.
          //

          if (part->num () != 1)
          {
            // We assume that we received NoData for one of the preceding
            // fragments. Ignore this one.
            return;
          }

          Data_ptr new_data (new Data (data->buf (),
                                       static_cast<size_t> (data->size ()),
                                       static_cast<size_t> (part->total_size ())));

          //std::cerr << "part->total_size (): " << part->total_size () << endl;

          map_.bind (from, new_data);
        }
        else
        {
          // Next part of the message.
          //

          if (part->num () == 1)
            ACE_OS::abort ();


          Data const* data = static_cast<Data const*> (m->find (Data::id));

          Data_ptr& new_data = e->int_id_;

          ACE_OS::memcpy (new_data->buf () + new_data->size (),
                          data->buf (),
                          data->size ());

          //std::cerr << "data->size (): " << data->size () << endl
          //          << "new_data->size (): " << new_data->size () << endl
          //          << "new_data->capa (): " << new_data->capacity () << endl;

          new_data->size (new_data->size () + data->size ());


          if (part->num () == part->of ())
          {
            // Reassembly is complete.
            //
            if (part->total_size () != new_data->size ())
              ACE_OS::abort ();

            Message_ptr new_msg (new Message ());

            Address to (
              static_cast<To const*> (m->find (To::id))->address ());

            new_msg->add (Profile_ptr (new To (to)));
            new_msg->add (Profile_ptr (new From (from)));
            /*
             * Heads up... we need to add the new_data to new_msg then
             * unbind the entry that maps to new_data, which will decrement
             * its reference count. If the bound/refcounted pointer acted
             * polymorphically like a regular pointer does, we'd be able to
             * just pass new_data to add(Profile_Ptr) and it would work.
             * However, Profile_Ptr and Data_Ptr are not compatible, but
             * we can use the secret knowledge that both are instances of the
             * same template and that the pointers they contain really are
             * hierarchically compatible, and do this funky cast to get
             * the result we want.
             */
            //new_msg->add (*(reinterpret_cast<Profile_ptr*> (&new_data)));

            new_msg->add (Profile_ptr (new_data));

            map_.unbind (from);

            in_->recv (new_msg);
          }
        }
      }
      else
      {
        // Non-fragmented message. Make sure we are in the consistent state
        // and forward it up.
        //
        if (map_.find (from, e) != -1)
          ACE_OS::abort ();

        in_->recv (m);
      }
    }
    else if (m->find (NoData::id) != 0)
    {
      if (map_.find (from, e) != -1)
      {
        // We already received some fragments. Clean everyhting up.
        //
        map_.unbind (from);
      }

      in_->recv (m);
    }
  }
}