diff options
author | Ossama Othman <ossama-othman@users.noreply.github.com> | 2005-02-11 16:44:28 +0000 |
---|---|---|
committer | Ossama Othman <ossama-othman@users.noreply.github.com> | 2005-02-11 16:44:28 +0000 |
commit | b7574d650a11bcac003e4bd7c09b90c97a0f4cf7 (patch) | |
tree | 06681716b6e03e1ec2fd991f63f9c8228f7fdef1 /examples | |
parent | c57ab4efd3a27b32548da59b13bae3af0ebbe05f (diff) | |
download | ATCD-b7574d650a11bcac003e4bd7c09b90c97a0f4cf7.tar.gz |
ChangeLogTag:Fri Feb 11 08:43:04 2005 Ossama Othman <ossama@dre.vanderbilt.edu>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/RMCast/Makefile.am | 13 | ||||
-rw-r--r-- | examples/RMCast/Send_File/Makefile.am | 55 | ||||
-rw-r--r-- | examples/RMCast/Send_File/Receiver.cpp | 167 | ||||
-rw-r--r-- | examples/RMCast/Send_File/Sender.cpp | 120 | ||||
-rw-r--r-- | examples/RMCast/Send_Msg/Protocol.h | 18 | ||||
-rw-r--r-- | examples/RMCast/Send_Msg/README | 18 | ||||
-rw-r--r-- | examples/RMCast/Send_Msg/Receiver.cpp | 119 | ||||
-rw-r--r-- | examples/RMCast/Send_Msg/Send_Msg.mpc (renamed from examples/RMCast/Send_File/RMCast_Send_File.mpc) | 0 | ||||
-rw-r--r-- | examples/RMCast/Send_Msg/Sender.cpp | 54 |
9 files changed, 209 insertions, 355 deletions
diff --git a/examples/RMCast/Makefile.am b/examples/RMCast/Makefile.am deleted file mode 100644 index 514702c29ec..00000000000 --- a/examples/RMCast/Makefile.am +++ /dev/null @@ -1,13 +0,0 @@ -## Process this file with automake to create Makefile.in -## -## $Id$ -## -## This file was generated by MPC. Any changes made directly to -## this file will be lost the next time it is generated. -## -## MPC Command: -## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu - -SUBDIRS = \ - Send_File - diff --git a/examples/RMCast/Send_File/Makefile.am b/examples/RMCast/Send_File/Makefile.am deleted file mode 100644 index 8eb5d564aa9..00000000000 --- a/examples/RMCast/Send_File/Makefile.am +++ /dev/null @@ -1,55 +0,0 @@ -## Process this file with automake to create Makefile.in -## -## $Id$ -## -## This file was generated by MPC. Any changes made directly to -## this file will be lost the next time it is generated. -## -## MPC Command: -## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu - -ACE_BUILDDIR = $(top_builddir) -ACE_ROOT = $(top_srcdir) - -noinst_PROGRAMS = -## Makefile.RMCast_Send_File_Receiver.am - -if BUILD_RMCAST -noinst_PROGRAMS += receiver - -receiver_CPPFLAGS = \ - -I$(ACE_ROOT) \ - -I$(ACE_BUILDDIR) - -receiver_SOURCES = \ - Receiver.cpp - -receiver_LDADD = \ - $(top_builddir)/ace/RMCast/libACE_RMCast.la $(top_builddir)/ace/libACE.la - -endif BUILD_RMCAST - -## Makefile.RMCast_Send_File_Sender.am - -if BUILD_RMCAST -noinst_PROGRAMS += sender - -sender_CPPFLAGS = \ - -I$(ACE_ROOT) \ - -I$(ACE_BUILDDIR) - -sender_SOURCES = \ - Sender.cpp - -sender_LDADD = \ - $(top_builddir)/ace/RMCast/libACE_RMCast.la $(top_builddir)/ace/libACE.la - -endif BUILD_RMCAST - -## Clean up template repositories, etc. -clean-local: - -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* - -rm -f gcctemp.c gcctemp so_locations *.ics - -rm -rf cxx_repository ptrepository ti_files - -rm -rf templateregistry ir.out - -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/examples/RMCast/Send_File/Receiver.cpp b/examples/RMCast/Send_File/Receiver.cpp deleted file mode 100644 index 80ac7be26bd..00000000000 --- a/examples/RMCast/Send_File/Receiver.cpp +++ /dev/null @@ -1,167 +0,0 @@ -// $Id$ - -#include "ace/OS_main.h" -#include "ace/OS_NS_fcntl.h" -#include "ace/RMCast/RMCast_UDP_Reliable_Receiver.h" -#include "ace/INET_Addr.h" -#include "ace/FILE_IO.h" -#include "ace/Message_Block.h" -#include "ace/Reactor.h" - -ACE_RCSID(tests, RMCast_Examples_Receiver, "$Id$") - -class File_Module : public ACE_RMCast_Module -{ -public: - File_Module (void); - - /// Return 1 if all the data has been received - int status (void) const; - - /// Initialize the module - int init (const ACE_TCHAR *filename); - - int close (void); - int data (ACE_RMCast::Data &data); - int ack_join (ACE_RMCast::Ack_Join &ack_join); - int ack_leave (ACE_RMCast::Ack_Leave &ack_leave); - -private: - /// Set to 1 when the last block is received - int status_; - - /// Used to dump the received data into a file - ACE_FILE_IO file_io_; -}; - -int -ACE_TMAIN (int argc, ACE_TCHAR *argv[]) -{ - if (argc != 3) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Usage: %s <filename> <mcastgroup:port>\n", - argv[0]), - 1); - } - - const ACE_TCHAR *filename = argv[1]; - - File_Module file_module; - if (file_module.init (filename) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot init file module\n"), - 1); - } - - ACE_RMCast_UDP_Reliable_Receiver receiver (&file_module); - - ACE_INET_Addr mcast_group; - if (mcast_group.set (argv[2]) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot setup multicast group <%s>\n", - argv[2]), - 1); - } - - if (receiver.init (mcast_group) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot init UDP I/O at <%s:%d> %p\n", - mcast_group.get_host_name (), - mcast_group.get_port_number (), - ""), - 1); - } - - // Use the Reactor to demultiplex all the messages - ACE_Reactor *reactor = ACE_Reactor::instance (); - receiver.reactive_incoming_messages (reactor); - - // Wait until all the messages are successfully delivered - do - { - // Try for 50 milliseconds... - ACE_Time_Value tv (5, 0); // 0, 50000); - int r = reactor->handle_events (&tv); - if (r == -1) - break; - } - while (file_module.status () != 2); - - ACE_DEBUG ((LM_DEBUG, "event loop completed\n")); - - return 0; -} - -// **************************************************************** - -File_Module::File_Module (void) - : status_ (0) -{ -} - -int -File_Module::status (void) const -{ - return this->status_; -} - -int -File_Module::init (const ACE_TCHAR * filename) -{ - ACE_HANDLE handle = ACE_OS::open (filename, - O_WRONLY|O_BINARY|O_CREAT, - ACE_DEFAULT_FILE_PERMS); - if (handle == ACE_INVALID_HANDLE) - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot open file <%s> %p\n", filename, ""), - -1); - this->file_io_.set_handle (handle); - return 0; -} - -int -File_Module::close (void) -{ - ACE_DEBUG ((LM_DEBUG, "File_Module closed\n")); - (void) this->file_io_.close (); - return 0; -} - -int -File_Module::data (ACE_RMCast::Data &data) -{ - if (this->status_ == 1) - return -1; - - size_t length = data.payload->length () - 1; - (void) this->file_io_.send (data.payload->rd_ptr () + 1, length); - - if (*(data.payload->rd_ptr ()) == 'E') - { - this->status_ = 1; - return -1; - } - - return 0; -} - -int -File_Module::ack_join (ACE_RMCast::Ack_Join &) -{ - ACE_DEBUG ((LM_DEBUG, - "File_Module::ack_join\n")); - return 0; -} - -int -File_Module::ack_leave (ACE_RMCast::Ack_Leave &) -{ - ACE_DEBUG ((LM_DEBUG, - "File_Module::ack_leave\n")); - this->status_ = 2; - return 0; -} diff --git a/examples/RMCast/Send_File/Sender.cpp b/examples/RMCast/Send_File/Sender.cpp deleted file mode 100644 index ec6e3e43642..00000000000 --- a/examples/RMCast/Send_File/Sender.cpp +++ /dev/null @@ -1,120 +0,0 @@ -// $Id$ - -#include "ace/OS_main.h" -#include "ace/OS_NS_unistd.h" -#include "ace/OS_NS_fcntl.h" -#include "ace/RMCast/RMCast_UDP_Reliable_Sender.h" -#include "ace/INET_Addr.h" -#include "ace/FILE_IO.h" -#include "ace/Message_Block.h" -#include "ace/Reactor.h" - -ACE_RCSID(tests, RMCast_Examples_Sender, "$Id$") - -int -ACE_TMAIN (int argc, ACE_TCHAR *argv[]) -{ - if (argc != 3) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Usage: %s <filename> <mcastgroup:port>\n", - argv[0]), - 1); - } - - const ACE_TCHAR *filename = argv[1]; - if (ACE_OS::access (filename, R_OK) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot read file <%s>\n", filename), - 1); - } - - ACE_INET_Addr mcast_group; - if (mcast_group.set (argv[2]) != 0) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot setup multicast group <%s>\n", - argv[2]), - 1); - } - - - ACE_HANDLE handle = ACE_OS::open (filename, O_RDONLY|O_BINARY); - if (handle == ACE_INVALID_HANDLE) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot open file <%s> %p\n", filename, ""), - 1); - } - ACE_FILE_IO file_io; - file_io.set_handle (handle); - - // We don't provide a module to receive the control messages, in - // this example we simply ignore them. - ACE_RMCast_UDP_Reliable_Sender sender (0); - - if (sender.init (mcast_group) == -1) - { - ACE_ERROR_RETURN ((LM_ERROR, - "Cannot init UDP I/O at <%s:%d> %p\n", - mcast_group.get_host_name (), - mcast_group.get_port_number (), - ""), - 1); - } - - // Use the Reactor to demultiplex all the messages - ACE_Reactor *reactor = ACE_Reactor::instance (); - - sender.reactive_incoming_messages (reactor); - { - // Resend the messages every 20 milliseconds.. - ACE_Time_Value tv (2, 0); - sender.reactive_resends (reactor, tv); - } - - for (;;) - { - ACE_Message_Block payload (BUFSIZ + 1); - - ssize_t r = file_io.recv (payload.rd_ptr () + 1, BUFSIZ); - if (r <= 0) - break; - - payload.wr_ptr (r + 1); - *(payload.rd_ptr ()) = 'N'; // Normal - if (r < BUFSIZ) - { - *(payload.rd_ptr ()) = 'E'; // EOF - } - - ACE_RMCast::Data data; - data.payload = &payload; - if (sender.data (data) != 0) - break; - - if (r < BUFSIZ) - { - // Last buffer, terminate loop - break; - } - - // Handle incoming events, without blocking... - ACE_Time_Value tv (4, 0); - reactor->handle_events (&tv); - } - - // Wait until all the messages are successfully delivered - do - { - // Try for 50 milliseconds... - ACE_Time_Value tv (5, 0); - int r = reactor->handle_events (&tv); - if (r == -1) - break; - } - while (sender.has_data () || sender.has_members ()); - - return 0; -} diff --git a/examples/RMCast/Send_Msg/Protocol.h b/examples/RMCast/Send_Msg/Protocol.h new file mode 100644 index 00000000000..c3edf43b1fb --- /dev/null +++ b/examples/RMCast/Send_Msg/Protocol.h @@ -0,0 +1,18 @@ +// file : Protocol.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef PROTOCOL_H +#define PROTOCOL_H + +unsigned short const payload_size = 256; +unsigned long const message_count = 10000; + +struct Message +{ + unsigned long sn; + + unsigned short payload[payload_size]; +}; + +#endif // PROTOCOL_H diff --git a/examples/RMCast/Send_Msg/README b/examples/RMCast/Send_Msg/README new file mode 100644 index 00000000000..b02056bf0cf --- /dev/null +++ b/examples/RMCast/Send_Msg/README @@ -0,0 +1,18 @@ +In this example SENDER sends a number (defined in Protocol.h, 10000 +by default) of messages to the multicast group. Each message has +an application-level sequence number. RECEIVER tries to receive them +and checks for damaged, lost, and reordered messages. Since reliable +multicast is used there should be no damaged or reordered messages. +There could be some number of lost messages at the beginning, +howevere (standard race condition). + +To run the example start a one or more RECEIVERS, e.g., + +$ ./receiver 224.1.0.1:10000 + +Then start one SENDER: + +$ ./sender 224.1.0.1:10000 + +-- +Boris Kolpackov <boris@kolpackov.net> diff --git a/examples/RMCast/Send_Msg/Receiver.cpp b/examples/RMCast/Send_Msg/Receiver.cpp new file mode 100644 index 00000000000..72d202716cd --- /dev/null +++ b/examples/RMCast/Send_Msg/Receiver.cpp @@ -0,0 +1,119 @@ +// file : Receiver.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include <vector> +#include <iostream> +#include <cstring> // memcmp + +#include <ace/RMCast/Socket.h> + +#include "Protocol.h" + +using std::cout; +using std::cerr; +using std::endl; + +typedef +std::vector<unsigned char> +Status_List; + +class args {}; + +int +main (int argc, char* argv[]) +{ + try + { + if (argc < 2) throw args (); + + ACE_RMCast::Address addr (argv[1]); + + ACE_RMCast::Socket socket (addr); + + + Message expected_msg; + expected_msg.sn = 0; + + for (unsigned short i = 0; i < payload_size; i++) + { + expected_msg.payload[i] = i; + } + + Status_List received (message_count, 0); + Status_List damaged (message_count, 0); + Status_List duplicate (message_count, 0); + + Message msg; + + while (true) + { + socket.recv (&msg, sizeof (msg)); + + if (received[msg.sn] == 1) + { + duplicate[msg.sn] = 1; + } + else + { + received[msg.sn] = 1; + + if (std::memcmp (expected_msg.payload, msg.payload, payload_size) != 0) + { + damaged[msg.sn] = 1; + } + } + + if (msg.sn + 1 == message_count) break; + } + + unsigned long lost_count (0), damaged_count (0), duplicate_count (0); + + for (Status_List::iterator i (received.begin ()), end (received.end ()); + i != end; + ++i) if (*i == 0) ++lost_count; + + for (Status_List::iterator i (damaged.begin ()), end (damaged.end ()); + i != end; + ++i) if (*i == 1) ++damaged_count; + + for (Status_List::iterator i (duplicate.begin ()), end (duplicate.end ()); + i != end; + ++i) if (*i == 1) ++duplicate_count; + + cout << "lost : " << lost_count << endl + << "damaged : " << damaged_count << endl + << "duplicate : " << duplicate_count << endl << endl; + + cout << "lost message dump:" << endl; + + unsigned long total = 0; + + for (Status_List::iterator + begin (received.begin ()), i (begin), end (received.end ()); + i != end;) + { + if (*i == 0) + { + unsigned long count = 1; + + for (Status_List::iterator j = i + 1; + j < end && *j == 0; + j++, count++); + + cout << '\t' << i - begin << " : " << count << endl; + + i += count; + total += count; + } + else ++i; + } + + if (total != lost_count) cerr << "trouble" << endl; + + } + catch (args const&) + { + cerr << "usage: " << argv[0] << " <IPv4 Multicast Address>" << endl; + } +} diff --git a/examples/RMCast/Send_File/RMCast_Send_File.mpc b/examples/RMCast/Send_Msg/Send_Msg.mpc index 897e1611b71..897e1611b71 100644 --- a/examples/RMCast/Send_File/RMCast_Send_File.mpc +++ b/examples/RMCast/Send_Msg/Send_Msg.mpc diff --git a/examples/RMCast/Send_Msg/Sender.cpp b/examples/RMCast/Send_Msg/Sender.cpp new file mode 100644 index 00000000000..e784b6362c7 --- /dev/null +++ b/examples/RMCast/Send_Msg/Sender.cpp @@ -0,0 +1,54 @@ +// file : Sender.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + + +#include <ace/OS.h> +#include <ace/RMCast/Socket.h> + +#include <iostream> + +#include "Protocol.h" + +using std::cerr; +using std::endl; + +class args {}; + +int +main (int argc, char* argv[]) +{ + try + { + if (argc < 2) throw args (); + + ACE_RMCast::Address addr (argv[1]); + + ACE_RMCast::Socket socket (addr); + + Message msg; + msg.sn = 0; + + for (unsigned short i = 0; i < payload_size; i++) + { + msg.payload[i] = i; + } + + for (; msg.sn < message_count; msg.sn++) + { + socket.send (&msg, sizeof (msg)); + } + + // Keep running in case retransmissions are needed. + // + ACE_OS::sleep (ACE_Time_Value (50, 0)); + + return 0; + } + catch (args const&) + { + cerr << "usage: " << argv[0] << " <IPv4 Multicast Address>" << endl; + } + + return 1; +} |