summaryrefslogtreecommitdiff
path: root/examples/RMCast/Send_File/Receiver.cpp
blob: e905373b4b3a6df9b2318fb4fe37303c258c57e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// $Id$

#include "ace/RMCast/RMCast_UDP_Reliable_Receiver.h"
#include "ace/INET_Addr.h"
#include "ace/FILE_IO.h"
#include "ace/Message_Block.h"
#include "ace/Reactor.h"

ACE_RCSID(tests, RMCast_Examples_Receiver, "$Id$")

class File_Module : public ACE_RMCast_Module
{
public:
  File_Module (void);

  /// Return 1 if all the data has been received
  int status (void) const;

  /// Initialize the module
  int init (const ACE_TCHAR *filename);

  int close (void);
  int data (ACE_RMCast::Data &data);
  int ack_join (ACE_RMCast::Ack_Join &ack_join);
  int ack_leave (ACE_RMCast::Ack_Leave &ack_leave);

private:
  /// Set to 1 when the last block is received
  int status_;

  /// Used to dump the received data into a file
  ACE_FILE_IO file_io_;
};

int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  if (argc != 3)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Usage: %s <filename> <mcastgroup:port>\n",
                         argv[0]),
                        1);
    }

  const ACE_TCHAR *filename = argv[1];

  File_Module file_module;
  if (file_module.init (filename) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot init file module\n"),
                        1);
    }

  ACE_RMCast_UDP_Reliable_Receiver receiver (&file_module);

  ACE_INET_Addr mcast_group;
  if (mcast_group.set (argv[2]) != 0)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot setup multicast group <%s>\n",
                         argv[2]),
                        1);
    }

  if (receiver.init (mcast_group) == -1)
    {
      ACE_ERROR_RETURN ((LM_ERROR,
                         "Cannot init UDP I/O at <%s:%d> %p\n",
                         mcast_group.get_host_name (),
                         mcast_group.get_port_number (),
                         ""),
                        1);
    }

  // Use the Reactor to demultiplex all the messages
  ACE_Reactor *reactor = ACE_Reactor::instance ();
  receiver.reactive_incoming_messages (reactor);

  // Wait until all the messages are successfully delivered
  do
    {
      // Try for 50 milliseconds...
      ACE_Time_Value tv (5, 0); // 0, 50000);
      int r = reactor->handle_events (&tv);
      if (r == -1)
        break;
    }
  while (file_module.status () != 2);

  ACE_DEBUG ((LM_DEBUG, "event loop completed\n"));

  return 0;
}

// ****************************************************************

File_Module::File_Module (void)
  :  status_ (0)
{
}

int
File_Module::status (void) const
{
  return this->status_;
}

int
File_Module::init (const ACE_TCHAR * filename)
{
  ACE_HANDLE handle = ACE_OS::open (filename,
                                    O_WRONLY|O_BINARY|O_CREAT,
                                    ACE_DEFAULT_FILE_PERMS);
  if (handle == ACE_INVALID_HANDLE)
    ACE_ERROR_RETURN ((LM_ERROR,
                       "Cannot open file <%s> %p\n", filename, ""),
                      -1);
  this->file_io_.set_handle (handle);
  return 0;
}

int
File_Module::close (void)
{
  ACE_DEBUG ((LM_DEBUG, "File_Module closed\n"));
  (void) this->file_io_.close ();
  return 0;
}

int
File_Module::data (ACE_RMCast::Data &data)
{
  if (this->status_ == 1)
    return -1;

  size_t length = data.payload->length () - 1;
  (void) this->file_io_.send (data.payload->rd_ptr () + 1, length);

  if (*(data.payload->rd_ptr ()) == 'E')
    {
      this->status_ = 1;
      return -1;
    }

  return 0;
}

int
File_Module::ack_join (ACE_RMCast::Ack_Join &)
{
  ACE_DEBUG ((LM_DEBUG,
              "File_Module::ack_join\n"));
  return 0;
}

int
File_Module::ack_leave (ACE_RMCast::Ack_Leave &)
{
  ACE_DEBUG ((LM_DEBUG,
              "File_Module::ack_leave\n"));
  this->status_ = 2;
  return 0;
}