summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.cpp
blob: c0d9b19aa45bb2749a4636baedf81b8099b13322 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "orbsvcs/Log_Macros.h"
#include "orbsvcs/PortableGroup/UIPMC_Transport_Recv_Packet.h"
#include "orbsvcs/PortableGroup/UIPMC_Mcast_Transport.h"

#include "ace/OS_NS_sys_time.h"

TAO_BEGIN_VERSIONED_NAMESPACE_DECL

namespace TAO_PG
{

  UIPMC_Recv_Packet_Cleanup_Guard::UIPMC_Recv_Packet_Cleanup_Guard (
    TAO_UIPMC_Mcast_Transport *transport
  )
    : transport_ (transport)
  {
  }

  UIPMC_Recv_Packet_Cleanup_Guard::~UIPMC_Recv_Packet_Cleanup_Guard ()
  {
    // Cleanup only expired packets.
    this->transport_->cleanup_packets (true);
  }

  UIPMC_Recv_Packet::UIPMC_Recv_Packet ()
    : last_fragment_id_ (0)
    , data_length_ (0)
    , started_ (ACE_OS::gettimeofday ())
  {
  }

  UIPMC_Recv_Packet::~UIPMC_Recv_Packet ()
  {
    for (Fragments_Map::iterator iter = this->fragments_.begin ();
         iter != this->fragments_.end ();
         ++iter)
      {
        delete [] (*iter).item ().buf;
      }
  }

  int
  UIPMC_Recv_Packet::add_fragment (char *data,
                                   CORBA::UShort len,
                                   CORBA::ULong id,
                                   bool is_last)
  {
    Fragment new_data;
    ACE_NEW_RETURN (new_data.buf,
                    char[len],
                    -1);
    ACE_OS::memcpy (new_data.buf, data, len);
    new_data.len = len;

    if (is_last)
      this->last_fragment_id_ = id;

    this->data_length_ += len;

    if (TAO_debug_level >= 10)
      ORBSVCS_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("TAO (%P|%t) - TAO_PG::UIPMC_Recv_Packet::")
                  ACE_TEXT ("add_fragment, adding fragment %d with %d out ")
                  ACE_TEXT ("of %d bytes\n"),
                  id,
                  len,
                  this->data_length_));

    if (this->fragments_.bind (id, new_data) != 0)
      {
        // We've failed to add a new fragment. It's an error no matter
        // what was the reason. Mark the packet as expired.
        this->started_ = ACE_Time_Value::zero;
        delete [] new_data.buf;
        return -1;
      }

    // We haven't encountered yet the last fragment.
    if (!is_last && this->last_fragment_id_ == 0)
      return 0;

    // We haven't encountered yet all the fragments but the last one is
    // already in.
    if (this->last_fragment_id_ + 1 != this->fragments_.current_size ())
      return 0;

    // Since fragments are enumerated from 0 to last_fragment_id_ this
    // is the heaviest but the most reliable check for packet completeness.
    for (CORBA::ULong id = 0; id <= this->last_fragment_id_; ++id)
      {
        if (this->fragments_.find (id) == -1)
          {
            // Mark the packet as if it timedout.
            this->started_ = ACE_Time_Value::zero;
            return 0;
          }
      }

    return 1;
  }

  ACE_Time_Value const &
  UIPMC_Recv_Packet::started () const
  {
    return this->started_;
  }

  CORBA::ULong
  UIPMC_Recv_Packet::data_length () const
  {
    return this->data_length_;
  }

  void
  UIPMC_Recv_Packet::copy_data (char *buf) const
  {
    for (CORBA::ULong id = 0; id <= this->last_fragment_id_; ++id)
      {
        Fragment f = { 0, 0 };
        this->fragments_.find (id, f);

        ACE_OS::memcpy (buf, f.buf, f.len);
        buf += f.len;
      }
  }

} // namespace TAO_PG

TAO_END_VERSIONED_NAMESPACE_DECL