summaryrefslogtreecommitdiff
path: root/ace/RMCast/Socket.cpp
blob: 640a1a7d6949cf2abe1751d61f495625171a13fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// file      : ace/RMCast/Socket.cpp
// author    : Boris Kolpackov <boris@kolpackov.net>
// cvs-id    : $Id$

#include <ace/OS.h>

#include <ace/RMCast/Socket.h>

namespace ACE_RMCast
{
  Socket::
  Socket (Address const& a, bool loop)
      : loop_ (loop), sn_ (1), cond_ (mutex_)
  {
    acknowledge_ = auto_ptr<Acknowledge> (new Acknowledge ());
    retransmit_ = auto_ptr<Retransmit> (new Retransmit ());
    simulator_ = auto_ptr<Simulator> (new Simulator ());
    link_ = auto_ptr<Link> (new Link (a));

    // Start IN stack from top to bottom.
    //
    in_start (0);
    acknowledge_->in_start (this);
    retransmit_->in_start (acknowledge_.get ());
    simulator_->in_start (retransmit_.get ());
    link_->in_start (simulator_.get ());

    // Start OUT stack from bottom up.
    //
    link_->out_start (0);
    simulator_->out_start (link_.get ());
    retransmit_->out_start (simulator_.get ());
    acknowledge_->out_start (retransmit_.get ());
    out_start (acknowledge_.get ());
  }

  Socket::
  ~Socket ()
  {
    // Stop OUT stack from top to bottom.
    //
    out_stop ();
    acknowledge_->out_stop ();
    retransmit_->out_stop ();
    simulator_->out_stop ();
    link_->out_stop ();

    // Stop IN stack from bottom up.
    //
    link_->in_stop ();
    simulator_->in_stop ();
    retransmit_->in_stop ();
    acknowledge_->in_stop ();
    in_stop ();
  }


  void Socket::
  send (void const* buf, size_t s)
  {
    Message_ptr m (new Message);

    m->add (Profile_ptr (new SN (sn_++)));
    m->add (Profile_ptr (new Data (buf, s)));

    send (m);
  }

  size_t Socket::
  recv (void* buf, size_t s)
  {
    Lock l (mutex_);

    while (queue_.is_empty ()) cond_.wait ();

    Message_ptr m;
    if (queue_.dequeue_head (m) == -1) abort ();

    Data const* d (static_cast<Data const*>(m->find (Data::id)));

    size_t r (d->size () < s ? d->size () : s);

    ACE_OS::memcpy (buf, d->buf (), r);

    return r;
  }

  void Socket::
  recv (Message_ptr m)
  {
    if (m->find (Data::id) != 0)
    {
      if (!loop_)
      {
        Address to (static_cast<To const*> (m->find (To::id))->address ());

        Address from (
          static_cast<From const*> (m->find (From::id))->address ());

        if (to == from) return;
      }

      Lock l (mutex_);

      bool signal (queue_.is_empty ());

      queue_.enqueue_tail (m);

      if (signal) cond_.signal ();
    }
  }
}