summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-06-17 15:36:33 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-06-17 15:36:33 +0000
commit1e4d29b15a06bb20c48bd59e924fed928819930d (patch)
tree19cdd4bc92934d16e4268e17a64488cbac2dc79c
parent9ce4ad6fe8253499ec009c9db983dc626fa4216d (diff)
downloadATCD-1e4d29b15a06bb20c48bd59e924fed928819930d.tar.gz
ChangeLogTag:Fri Jun 17 17:22:13 2005 Boris Kolpackov <boris@kolpackov.net>
-rw-r--r--ChangeLog25
-rw-r--r--protocols/ace/RMCast/Acknowledge.cpp110
-rw-r--r--protocols/ace/RMCast/Acknowledge.h9
-rw-r--r--protocols/ace/RMCast/Fragment.cpp59
-rw-r--r--protocols/ace/RMCast/Fragment.h25
-rw-r--r--protocols/ace/RMCast/Link.cpp33
-rw-r--r--protocols/ace/RMCast/Protocol.cpp1
-rw-r--r--protocols/ace/RMCast/Protocol.h435
-rw-r--r--protocols/ace/RMCast/Reassemble.cpp119
-rw-r--r--protocols/ace/RMCast/Reassemble.h39
-rw-r--r--protocols/ace/RMCast/Retransmit.cpp18
-rw-r--r--protocols/ace/RMCast/Retransmit.h23
-rw-r--r--protocols/ace/RMCast/Simulator.h4
-rw-r--r--protocols/ace/RMCast/Socket.cpp38
-rw-r--r--protocols/ace/RMCast/Socket.h2
-rw-r--r--protocols/examples/RMCast/Send_Msg/Protocol.h2
-rw-r--r--protocols/tests/RMCast/Protocol.h2
-rw-r--r--protocols/tests/RMCast/Sender.cpp2
18 files changed, 826 insertions, 120 deletions
diff --git a/ChangeLog b/ChangeLog
index c40bf1a2665..1629bc2e380 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,28 @@
+Fri Jun 17 17:22:13 2005 Boris Kolpackov <boris@kolpackov.net>
+
+ * protocols/ace/RMCast/Acknowledge.cpp:
+ * protocols/ace/RMCast/Acknowledge.h:
+ * protocols/ace/RMCast/Fragment.cpp:
+ * protocols/ace/RMCast/Fragment.h:
+ * protocols/ace/RMCast/Link.cpp:
+ * protocols/ace/RMCast/Protocol.cpp:
+ * protocols/ace/RMCast/Protocol.h:
+ * protocols/ace/RMCast/Reassemble.cpp:
+ * protocols/ace/RMCast/Reassemble.h:
+ * protocols/ace/RMCast/Retransmit.cpp:
+ * protocols/ace/RMCast/Retransmit.h:
+ * protocols/ace/RMCast/Simulator.h:
+ * protocols/ace/RMCast/Socket.cpp:
+ * protocols/ace/RMCast/Socket.h:
+
+ Implemented message fragmentation.
+
+ * protocols/examples/RMCast/Send_Msg/Protocol.h:
+ * protocols/tests/RMCast/Protocol.h:
+ * protocols/tests/RMCast/Sender.cpp:
+
+ Changed to send messages that would require fragmentation.
+
Fri Jun 17 09:14:12 UTC 2005 Johnny Willemsen <jwillemsen@remedy.nl>
* bin/MakeProjectCreator/config/pi_server.mpb:
diff --git a/protocols/ace/RMCast/Acknowledge.cpp b/protocols/ace/RMCast/Acknowledge.cpp
index 12a857f1c53..1056d6a3281 100644
--- a/protocols/ace/RMCast/Acknowledge.cpp
+++ b/protocols/ace/RMCast/Acknowledge.cpp
@@ -2,12 +2,18 @@
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
-#include <ace/Time_Value.h> // ACE_Time_Value
-#include <ace/OS_NS_unistd.h>
-#include <ace/OS_NS_sys_time.h> // gettimeofday
+#include "ace/Time_Value.h" // ACE_Time_Value
+#include "ace/OS_NS_unistd.h"
+#include "ace/OS_NS_sys_time.h" // gettimeofday
#include "Acknowledge.h"
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
namespace ACE_RMCast
{
ACE_Time_Value const tick (0, 5000);
@@ -108,7 +114,9 @@ namespace ACE_RMCast
// Send NRTM.
//
- Profile_ptr nrtm (create_nrtm ());
+ u32 max_elem (NRTM::max_count (max_payload_size));
+
+ Profile_ptr nrtm (create_nrtm (max_elem));
if (nrtm.get ())
{
@@ -157,48 +165,66 @@ namespace ACE_RMCast
void Acknowledge::
track_queue (Address const& addr, Queue& q, Messages& msgs)
{
- NAK_ptr nak (new NAK (addr));
+ u32 max_elem (NAK::max_count (max_payload_size));
+ u32 count (0);
+
+ Queue::iterator i (q.begin ()), e (q.end ());
// Track existing losses.
//
- for (Queue::iterator i (q.begin ()), e (q.end ()); i != e; ++i)
+ while (i != e)
{
- u64 sn ((*i).ext_id_);
- Descr& d = (*i).int_id_;
+ NAK_ptr nak (new NAK (addr));
- if (d.lost ())
+ // Inner loop that fills NAK profile with up to max_elem elements.
+ //
+ for (; i != e && nak->count () < max_elem; ++i)
{
- d.timer (d.timer () - 1);
+ u64 sn ((*i).ext_id_);
+ Descr& d = (*i).int_id_;
- if (d.timer () == 0)
+ if (d.lost ())
{
- //@@ Need exp fallback.
- //
- d.nak_count (d.nak_count () + 1);
- d.timer ((d.nak_count () + 1) * nak_timeout);
+ d.timer (d.timer () - 1);
- nak->add (sn);
+ if (d.timer () == 0)
+ {
+ //@@ Need exp fallback.
+ //
+ d.nak_count (d.nak_count () + 1);
+ d.timer ((d.nak_count () + 1) * nak_timeout);
- //cerr << 6 << "NAK # " << d.nak_count () << ": "
- // << addr << " " << sn << endl;
+ nak->add (sn);
+
+ ++count;
+
+ //cerr << 6 << "NAK # " << d.nak_count () << ": "
+ // << addr << " " << sn << endl;
+ }
}
}
- }
- // Send NAK.
- //
- if (nak->count ())
- {
- // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
- // << endl;
+ // Send this NAK.
+ //
+ if (nak->count ())
+ {
+ // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
+ // << endl;
- Message_ptr m (new Message);
+ Message_ptr m (new Message);
- m->add (Profile_ptr (nak.release ()));
+ m->add (Profile_ptr (nak.release ()));
- msgs.push_back (m);
+ msgs.push_back (m);
+ }
}
+ /*
+ if (count > max_elem)
+ cerr << "NAC count : " << count << endl
+ << "NAK max : " << max_elem << endl;
+ */
+
// Detect and record new losses.
//
for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
@@ -256,6 +282,8 @@ namespace ACE_RMCast
// First message from this source.
//
hold_.bind (from, Queue (sn));
+ //@@ rm
+ //
hold_.find (from, e);
in_->recv (m);
@@ -300,13 +328,20 @@ namespace ACE_RMCast
void Acknowledge::
send (Message_ptr m)
{
- if (m->find (Data::id) != 0)
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
{
- Lock l (mutex_);
+ u32 max_size (max_payload_size - data->size ());
+ u32 max_elem (NRTM::max_count (max_size));
- Profile_ptr nrtm (create_nrtm ());
+ if (max_elem > 0)
+ {
+ Lock l (mutex_);
+
+ Profile_ptr nrtm (create_nrtm (max_elem));
- if (nrtm.get ()) m->add (nrtm);
+ if (nrtm.get ())
+ m->add (nrtm);
+ }
nrtm_timer_ = nrtm_timeout; // Reset timer.
}
@@ -315,7 +350,7 @@ namespace ACE_RMCast
}
Profile_ptr Acknowledge::
- create_nrtm ()
+ create_nrtm (u32 max_elem)
{
// Prepare NRTM.
//
@@ -332,11 +367,16 @@ namespace ACE_RMCast
//@@ Should look for the highest known number.
//
nrtm->insert (addr, q.sn ());
+
+ if (--max_elem == 0)
+ break;
}
}
- if (nrtm->empty ()) return Profile_ptr (0);
- else return Profile_ptr (nrtm.release ());
+ if (nrtm->empty ())
+ return Profile_ptr (0);
+ else
+ return Profile_ptr (nrtm.release ());
}
ACE_THR_FUNC_RETURN Acknowledge::
diff --git a/protocols/ace/RMCast/Acknowledge.h b/protocols/ace/RMCast/Acknowledge.h
index 5726f07d0d3..ec0ff77d12b 100644
--- a/protocols/ace/RMCast/Acknowledge.h
+++ b/protocols/ace/RMCast/Acknowledge.h
@@ -5,8 +5,8 @@
#ifndef ACE_RMCAST_ACKNOWLEDGE_H
#define ACE_RMCAST_ACKNOWLEDGE_H
-#include <ace/Hash_Map_Manager.h>
-#include <ace/Thread_Manager.h>
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Thread_Manager.h"
#include "Stack.h"
#include "Protocol.h"
@@ -217,7 +217,7 @@ namespace ACE_RMCast
track_queue (Address const& addr, Queue& q, Messages& msgs);
Profile_ptr
- create_nrtm ();
+ create_nrtm (u32 max_elem);
static ACE_THR_FUNC_RETURN
track_thunk (void* obj);
@@ -232,9 +232,6 @@ namespace ACE_RMCast
bool stop_;
ACE_Thread_Manager tracker_mgr_;
};
-
-
-
}
#endif // ACE_RMCAST_ACKNOWLEDGE_H
diff --git a/protocols/ace/RMCast/Fragment.cpp b/protocols/ace/RMCast/Fragment.cpp
new file mode 100644
index 00000000000..6a0c203a133
--- /dev/null
+++ b/protocols/ace/RMCast/Fragment.cpp
@@ -0,0 +1,59 @@
+// file : ace/RMCast/Fragment.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include "Fragment.h"
+
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
+namespace ACE_RMCast
+{
+ Fragment::
+ Fragment ()
+ {
+ }
+
+ void Fragment::
+ send (Message_ptr m)
+ {
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
+ {
+ if (data->size () <= max_payload_size)
+ {
+ out_->send (m);
+ return;
+ }
+
+ char const* p (data->buf ());
+ size_t size (data->size ());
+
+ // Need fragmentation.
+ //
+ u32 packets (size / max_payload_size + (size % max_payload_size ? 1 : 0));
+
+ // cerr << "size : " << size << endl
+ // << "packs: " << packets << endl;
+
+
+ for (u32 i (1); i <= packets; ++i)
+ {
+ Message_ptr part (new Message);
+
+ size_t s (i == packets ? size % max_payload_size : max_payload_size);
+
+ // cerr << "pack: " << s << endl;
+
+ part->add (Profile_ptr (new Part (i, packets, size)));
+ part->add (Profile_ptr (new Data (p, s)));
+
+ out_->send (part);
+
+ p += s;
+ }
+ }
+ }
+}
diff --git a/protocols/ace/RMCast/Fragment.h b/protocols/ace/RMCast/Fragment.h
new file mode 100644
index 00000000000..836307e71a8
--- /dev/null
+++ b/protocols/ace/RMCast/Fragment.h
@@ -0,0 +1,25 @@
+// file : ace/RMCast/Fragment.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_FRAGMENT_H
+#define ACE_RMCAST_FRAGMENT_H
+
+#include "Stack.h"
+#include "Protocol.h"
+#include "Bits.h"
+
+namespace ACE_RMCast
+{
+ class Fragment : public Element
+ {
+ public:
+ Fragment ();
+
+ public:
+ virtual void
+ send (Message_ptr m);
+ };
+}
+
+#endif // ACE_RMCAST_FRAGMENT_H
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
index 7e8cab77845..6bfcf7085dd 100644
--- a/protocols/ace/RMCast/Link.cpp
+++ b/protocols/ace/RMCast/Link.cpp
@@ -2,8 +2,8 @@
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
-#include <ace/Time_Value.h> // ACE_Time_Value
-#include <ace/OS_NS_sys_socket.h>
+#include "ace/Time_Value.h" // ACE_Time_Value
+#include "ace/OS_NS_sys_socket.h"
#include "Link.h"
@@ -118,12 +118,19 @@ namespace ACE_RMCast
}
else
{
- hold_ = m;
+ if ((rand () % 5) != 0)
+ {
+ send_ (m);
+ }
+ else
+ {
+ hold_ = m;
- // Make a copy in M so that the reliable loop below
- // won't add FROM and TO to HOLD_.
- //
- m = Message_ptr (new Message (*hold_));
+ // Make a copy in M so that the reliable loop below
+ // won't add FROM and TO to HOLD_.
+ //
+ m = hold_->clone ();
+ }
}
}
}
@@ -145,6 +152,14 @@ namespace ACE_RMCast
os << *m;
+ if (os.length () > max_packet_size)
+ {
+ ACE_ERROR ((LM_ERROR,
+ "packet length (%d) exceeds max_poacket_size (%d)\n",
+ os.length (), max_packet_size));
+ abort ();
+ }
+
ssock_.send (os.buffer (), os.length (), addr_);
/*
@@ -284,6 +299,10 @@ namespace ACE_RMCast
{
m->add (Profile_ptr (new NoData (hdr, is)));
}
+ else if (id == Part::id)
+ {
+ m->add (Profile_ptr (new Part (hdr, is)));
+ }
else
{
//cerr << 0 << "unknown profile id " << hdr.id () << endl;
diff --git a/protocols/ace/RMCast/Protocol.cpp b/protocols/ace/RMCast/Protocol.cpp
index afa60396e31..755872371cb 100644
--- a/protocols/ace/RMCast/Protocol.cpp
+++ b/protocols/ace/RMCast/Protocol.cpp
@@ -13,4 +13,5 @@ namespace ACE_RMCast
u16 const NAK:: id = 0x0005;
u16 const NRTM:: id = 0x0006;
u16 const NoData::id = 0x0007;
+ u16 const Part:: id = 0x0008;
}
diff --git a/protocols/ace/RMCast/Protocol.h b/protocols/ace/RMCast/Protocol.h
index 82df32e467f..7df54fd5f88 100644
--- a/protocols/ace/RMCast/Protocol.h
+++ b/protocols/ace/RMCast/Protocol.h
@@ -29,6 +29,17 @@ namespace ACE_RMCast
typedef ACE_CDR::ULong u32;
typedef ACE_CDR::ULongLong u64;
+ // Protocol parameters
+ //
+ //
+ u32 const max_packet_size = 1460; // MTU (1500) - IP-header - UDP-header
+ u32 const max_service_size = 40; // service profiles (Part, SN, etc) sizes
+ // plus message size.
+ u32 const max_payload_size = max_packet_size - max_service_size;
+
+ //
+ //
+ //
typedef ACE_INET_Addr Address;
struct AddressHasher
@@ -107,6 +118,12 @@ namespace ACE_RMCast
{
}
+ Profile_ptr
+ clone ()
+ {
+ return clone_ ();
+ }
+
protected:
Profile (u16 id)
: header_ (id, 0)
@@ -117,6 +134,14 @@ namespace ACE_RMCast
: header_ (h)
{
}
+
+ virtual Profile_ptr
+ clone_ () = 0;
+
+ private:
+ Profile&
+ operator= (Profile const&);
+
public:
u16
id () const
@@ -206,10 +231,10 @@ namespace ACE_RMCast
return ss;
}
+
//
//
//
-
class Message;
typedef
@@ -224,17 +249,29 @@ namespace ACE_RMCast
{
}
+ Message_ptr
+ clone ()
+ {
+ return new Message (*this);
+ }
+
+ protected:
Message (Message const& m)
: profiles_ (4)
{
for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ())
{
- // Shallow copy of profiles.
+ // Shallow copy of profiles. This implies that profiles are not
+ // modified as they go up/down the stack.
//
profiles_.bind ((*i).ext_id_, (*i).int_id_);
}
}
+ private:
+ Message&
+ operator= (Message const&);
+
public:
bool
add (Profile_ptr p)
@@ -251,6 +288,18 @@ namespace ACE_RMCast
return true;
}
+ void
+ replace (Profile_ptr p)
+ {
+ profiles_.rebind (p->id (), p);
+ }
+
+ void
+ remove (u16 id)
+ {
+ profiles_.unbind (id);
+ }
+
Profile const*
find (u16 id) const
{
@@ -336,6 +385,25 @@ namespace ACE_RMCast
size (calculate_size ());
}
+ From_ptr
+ clone ()
+ {
+ return From_ptr (static_cast<From*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new From (*this);
+ }
+
+ From (From const& from)
+ : Profile (from),
+ address_ (from.address_)
+ {
+ }
+
public:
Address const&
address () const
@@ -401,6 +469,25 @@ namespace ACE_RMCast
size (calculate_size ());
}
+ To_ptr
+ clone ()
+ {
+ return To_ptr (static_cast<To*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new To (*this);
+ }
+
+ To (To const& to)
+ : Profile (to),
+ address_ (to.address_)
+ {
+ }
+
public:
Address const&
address () const
@@ -449,27 +536,61 @@ namespace ACE_RMCast
public:
Data (Header const& h, istream& is)
- : Profile (h), buf_ (0), size_ (h.size ())
+ : Profile (h),
+ buf_ (0),
+ size_ (h.size ()),
+ capacity_ (size_)
{
if (size_)
{
- buf_ = reinterpret_cast<char*> (operator new (size_));
+ buf_ = reinterpret_cast<char*> (operator new (capacity_));
is.read_char_array (buf_, size_);
}
}
- Data (void const* buf, size_t s)
- : Profile (id), buf_ (0), size_ (s)
+ Data (void const* buf, size_t s, size_t capacity = 0)
+ : Profile (id),
+ buf_ (0),
+ size_ (s),
+ capacity_ (capacity < size_ ? size_ : capacity)
{
if (size_)
{
- buf_ = reinterpret_cast<char*> (operator new (size_));
+ buf_ = reinterpret_cast<char*> (operator new (capacity_));
ACE_OS::memcpy (buf_, buf, size_);
}
Profile::size (calculate_size ());
}
+ Data_ptr
+ clone ()
+ {
+ return Data_ptr (static_cast<Data*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new Data (*this);
+ }
+
+ Data (Data const& d)
+ : Profile (d),
+ buf_ (0),
+ size_ (d.size_),
+ capacity_ (d.capacity_)
+ {
+ if (size_)
+ {
+ buf_ = reinterpret_cast<char*> (operator new (capacity_));
+ ACE_OS::memcpy (buf_, d.buf_, size_);
+ }
+
+ Profile::size (calculate_size ());
+ }
+
public:
char const*
buf () const
@@ -477,12 +598,35 @@ namespace ACE_RMCast
return buf_;
}
+ char*
+ buf ()
+ {
+ return buf_;
+ }
+
size_t
size () const
{
return size_;
}
+ void
+ size (size_t s)
+ {
+ if (s > capacity_)
+ abort ();
+
+ size_ = s;
+
+ Profile::size (calculate_size ());
+ }
+
+ size_t
+ capacity () const
+ {
+ return capacity_;
+ }
+
public:
virtual void
serialize_body (ostream& os) const
@@ -499,6 +643,7 @@ namespace ACE_RMCast
private:
char* buf_;
size_t size_;
+ size_t capacity_;
};
@@ -528,6 +673,25 @@ namespace ACE_RMCast
size (calculate_size ());
}
+ SN_ptr
+ clone ()
+ {
+ return SN_ptr (static_cast<SN*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new SN (*this);
+ }
+
+ SN (SN const& sn)
+ : Profile (sn),
+ n_ (sn.n_)
+ {
+ }
+
public:
u64
num () const
@@ -590,6 +754,10 @@ namespace ACE_RMCast
size_t addr_size (ss.total_length ());
+
+ is >> addr;
+ is >> port;
+
// num_of_sns = (size - addr_size) / sn_size
//
for (unsigned long i (0); i < ((h.size () - addr_size) / sn_size); ++i)
@@ -598,9 +766,6 @@ namespace ACE_RMCast
sns_.push_back (sn);
}
- is >> addr;
- is >> port;
-
address_ = Address (port, addr);
}
@@ -611,6 +776,26 @@ namespace ACE_RMCast
size (calculate_size ());
}
+ NAK_ptr
+ clone ()
+ {
+ return NAK_ptr (static_cast<NAK*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new NAK (*this);
+ }
+
+ NAK (NAK const& nak)
+ : Profile (nak),
+ address_ (nak.address_),
+ sns_ (nak.sns_)
+ {
+ }
+
public:
void
add (u64 sn)
@@ -648,11 +833,50 @@ namespace ACE_RMCast
}
public:
+ // Count max number of elements that will fit into NAK profile
+ // with size <= max_size.
+ //
+ static u32
+ max_count (u32 max_size)
+ {
+ u32 n (0);
+
+ sstream ss;
+
+ Profile::Header hdr (0, 0);
+ ss << hdr;
+
+ u32 addr (0);
+ u16 port (0);
+ ss << addr << port;
+
+ while (true)
+ {
+ u64 sn (0);
+ ss << sn;
+
+ if (ss.total_length () <= max_size)
+ ++n;
+
+ if (ss.total_length () >= max_size)
+ break;
+ }
+
+ return n;
+ }
+
+ public:
virtual void
serialize_body (ostream& os) const
{
NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM.
+ u32 addr (address_.get_ip_address ());
+ u16 port (address_.get_port_number ());
+
+ os << addr;
+ os << port;
+
// Stone age iteration.
//
for (iterator i (this_.begin ()); !i.done (); i.advance ())
@@ -661,13 +885,6 @@ namespace ACE_RMCast
i.next (psn);
os << *psn;
}
-
-
- u32 addr (address_.get_ip_address ());
- u16 port (address_.get_port_number ());
-
- os << addr;
- os << port;
}
virtual void
@@ -675,6 +892,12 @@ namespace ACE_RMCast
{
NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM.
+ u32 addr (0);
+ u16 port (0);
+
+ ss << addr;
+ ss << port;
+
// Stone age iteration.
//
for (iterator i (this_.begin ()); !i.done (); i.advance ())
@@ -682,13 +905,6 @@ namespace ACE_RMCast
u64 sn (0);
ss << sn;
}
-
-
- u32 addr (0);
- u16 port (0);
-
- ss << addr;
- ss << port;
}
private:
@@ -734,7 +950,6 @@ namespace ACE_RMCast
is >> addr;
is >> port;
-
map_.bind (Address (port, addr), sn);
}
}
@@ -745,6 +960,28 @@ namespace ACE_RMCast
size (calculate_size ());
}
+ NRTM_ptr
+ clone ()
+ {
+ return NRTM_ptr (static_cast<NRTM*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new NRTM (*this);
+ }
+
+ NRTM (NRTM const& nrtm)
+ : Profile (nrtm)
+ {
+ for (Map::const_iterator i (nrtm.map_); !i.done (); i.advance ())
+ {
+ map_.bind ((*i).ext_id_, (*i).int_id_);
+ }
+ }
+
public:
void
insert (Address const& addr, u64 sn)
@@ -771,6 +1008,40 @@ namespace ACE_RMCast
}
public:
+ // Count max number of elements that will fit into NRTM profile
+ // with size <= max_size.
+ //
+ static u32
+ max_count (u32 max_size)
+ {
+ u32 n (0);
+
+ sstream ss;
+
+ Profile::Header hdr (0, 0);
+ ss << hdr;
+
+ while (true)
+ {
+ u32 addr (0);
+ u16 port (0);
+ u64 sn (0);
+
+ ss << sn;
+ ss << addr;
+ ss << port;
+
+ if (ss.total_length () <= max_size)
+ ++n;
+
+ if (ss.total_length () >= max_size)
+ break;
+ }
+
+ return n;
+ }
+
+ public:
virtual void
serialize_body (ostream& os) const
{
@@ -783,6 +1054,7 @@ namespace ACE_RMCast
os << sn;
os << addr;
os << port;
+
}
}
@@ -839,6 +1111,24 @@ namespace ACE_RMCast
Profile::size (0);
}
+ NoData_ptr
+ clone ()
+ {
+ return NoData_ptr (static_cast<NoData*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new NoData (*this);
+ }
+
+ NoData (NoData const& no_data)
+ : Profile (no_data)
+ {
+ }
+
public:
virtual void
serialize_body (ostream&) const
@@ -851,6 +1141,101 @@ namespace ACE_RMCast
}
};
+ //
+ //
+ //
+ struct Part;
+
+ typedef
+ ACE_Refcounted_Auto_Ptr<Part, ACE_Null_Mutex>
+ Part_ptr;
+
+ struct Part : Profile
+ {
+ static u16 const id;
+
+ public:
+ Part (Header const& h, istream& is)
+ : Profile (h)
+ {
+ is >> num_;
+ is >> of_;
+ is >> total_size_;
+ }
+
+ Part (u32 num, u32 of, u64 total_size)
+ : Profile (id),
+ num_ (num),
+ of_ (of),
+ total_size_ (total_size)
+ {
+ size (calculate_size ());
+ }
+
+ Part_ptr
+ clone ()
+ {
+ return Part_ptr (static_cast<Part*> (clone_ ().release ()));
+ }
+
+ protected:
+ virtual Profile_ptr
+ clone_ ()
+ {
+ return new Part (*this);
+ }
+
+ Part (Part const& part)
+ : Profile (part),
+ num_ (part.num_),
+ of_ (part.of_),
+ total_size_ (part.total_size_)
+ {
+ }
+
+ public:
+ u32
+ num () const
+ {
+ return num_;
+ }
+
+ u32
+ of () const
+ {
+ return of_;
+ }
+
+ u64
+ total_size () const
+ {
+ return total_size_;
+ }
+
+ public:
+ virtual void
+ serialize_body (ostream& os) const
+ {
+ os << num_;
+ os << of_;
+ os << total_size_;
+ }
+
+ virtual void
+ serialize_body (sstream& ss) const
+ {
+ ss << num_;
+ ss << of_;
+ ss << total_size_;
+ }
+
+
+ private:
+ u32 num_;
+ u32 of_;
+ u64 total_size_;
+ };
+
}
/*
diff --git a/protocols/ace/RMCast/Reassemble.cpp b/protocols/ace/RMCast/Reassemble.cpp
new file mode 100644
index 00000000000..a089fb45c03
--- /dev/null
+++ b/protocols/ace/RMCast/Reassemble.cpp
@@ -0,0 +1,119 @@
+// file : ace/RMCast/Reassemble.cpp
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#include "Reassemble.h"
+
+/*
+#include <iostream>
+using std::cerr;
+using std::endl;
+*/
+
+namespace ACE_RMCast
+{
+ Reassemble::
+ Reassemble ()
+ {
+ }
+
+ void Reassemble::
+ recv (Message_ptr m)
+ {
+ Map::ENTRY* e;
+ Address from (
+ static_cast<From const*> (m->find (From::id))->address ());
+
+ if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
+ {
+ if (Part const* part = static_cast<Part const*> (m->find (Part::id)))
+ {
+ if (map_.find (from, e) == -1)
+ {
+ // First part of the message.
+ //
+
+ if (part->num () != 1)
+ {
+ // We assume that we received NoData for one of the preceding
+ // fragments. Ignore this one.
+ return;
+ }
+
+ Data_ptr new_data (new Data (data->buf (),
+ data->size (),
+ part->total_size ()));
+
+ //std::cerr << "part->total_size (): " << part->total_size () << endl;
+
+ map_.bind (from, new_data);
+ }
+ else
+ {
+ // Next part of the message.
+ //
+
+ if (part->num () == 1)
+ abort ();
+
+ Data const* data (static_cast<Data const*> (m->find (Data::id)));
+
+ Data_ptr& new_data (e->int_id_);
+
+ ACE_OS::memcpy (new_data->buf () + new_data->size (),
+ data->buf (),
+ data->size ());
+
+ //std::cerr << "data->size (): " << data->size () << endl
+ // << "new_data->size (): " << new_data->size () << endl
+ // << "new_data->capa (): " << new_data->capacity () << endl;
+
+ new_data->size (new_data->size () + data->size ());
+
+
+ if (part->num () == part->of ())
+ {
+ // Reassembly is complete.
+ //
+ if (part->total_size () != new_data->size ())
+ abort ();
+
+ Message_ptr new_msg (new Message ());
+
+ Address to (
+ static_cast<To const*> (m->find (To::id))->address ());
+
+ new_msg->add (Profile_ptr (new To (to)));
+ new_msg->add (Profile_ptr (new From (from)));
+ new_msg->add (Profile_ptr (new_data.release ()));
+
+ map_.unbind (from);
+
+ in_->recv (new_msg);
+ }
+ }
+ }
+ else
+ {
+ // Non-fragmented message. Make sure we are in the consistent state
+ // and forward it up.
+ //
+ if (map_.find (from, e) != -1)
+ abort ();
+
+ in_->recv (m);
+ }
+ }
+ else if (m->find (NoData::id) != 0)
+ {
+ if (map_.find (from, e) != -1)
+ {
+ // We already received some fragments. Clean everyhting up.
+ //
+ map_.unbind (from);
+ }
+
+ in_->recv (m);
+ }
+ }
+}
diff --git a/protocols/ace/RMCast/Reassemble.h b/protocols/ace/RMCast/Reassemble.h
new file mode 100644
index 00000000000..0f074c9855c
--- /dev/null
+++ b/protocols/ace/RMCast/Reassemble.h
@@ -0,0 +1,39 @@
+// file : ace/RMCast/Reassemble.h
+// author : Boris Kolpackov <boris@kolpackov.net>
+// cvs-id : $Id$
+
+#ifndef ACE_RMCAST_REASSEMBLE_H
+#define ACE_RMCAST_REASSEMBLE_H
+
+#include "ace/Hash_Map_Manager.h"
+
+#include "Stack.h"
+#include "Protocol.h"
+#include "Bits.h"
+
+namespace ACE_RMCast
+{
+ class Reassemble : public Element
+ {
+ public:
+ Reassemble ();
+
+ public:
+ virtual void
+ recv (Message_ptr m);
+
+ private:
+ typedef
+ ACE_Hash_Map_Manager_Ex<Address,
+ Data_ptr,
+ AddressHasher,
+ ACE_Equal_To<Address>,
+ ACE_Null_Mutex>
+ Map;
+
+ Map map_;
+ };
+}
+
+
+#endif // ACE_RMCAST_REASSEMBLE_H
diff --git a/protocols/ace/RMCast/Retransmit.cpp b/protocols/ace/RMCast/Retransmit.cpp
index 609ec526294..7850df9c6f3 100644
--- a/protocols/ace/RMCast/Retransmit.cpp
+++ b/protocols/ace/RMCast/Retransmit.cpp
@@ -14,7 +14,9 @@ namespace ACE_RMCast
Retransmit::
Retransmit ()
- : cond_ (mutex_), stop_ (false)
+ : cond_ (mutex_),
+ sn_ (1),
+ stop_ (false)
{
}
@@ -43,13 +45,12 @@ namespace ACE_RMCast
void Retransmit::
send (Message_ptr m)
{
- if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
+ if (m->find (Data::id) != 0)
{
- u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
+ m->add (Profile_ptr (new SN (sn_)));
Lock l (mutex_);
-
- queue_.bind (sn, Descr (Data_ptr (new Data (*data))));
+ queue_.bind (sn_++, Descr (m->clone ()));
}
out_->send (m);
@@ -73,8 +74,7 @@ namespace ACE_RMCast
u64* psn;
j.next (psn);
- Message_ptr m (new Message);
- m->add (Profile_ptr (new SN (*psn)));
+ Message_ptr m;
Queue::ENTRY* pair;
@@ -82,7 +82,7 @@ namespace ACE_RMCast
{
//cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
- m->add (pair->int_id_.data ());
+ m = pair->int_id_.message ();
pair->int_id_.reset ();
}
@@ -90,6 +90,8 @@ namespace ACE_RMCast
{
//cerr << 4 << "message " << *psn << " not available" << endl;
+ m = Message_ptr (new Message);
+ m->add (Profile_ptr (new SN (*psn)));
m->add (Profile_ptr (new NoData));
}
diff --git a/protocols/ace/RMCast/Retransmit.h b/protocols/ace/RMCast/Retransmit.h
index fa910ee2837..c76299a8d5b 100644
--- a/protocols/ace/RMCast/Retransmit.h
+++ b/protocols/ace/RMCast/Retransmit.h
@@ -5,8 +5,8 @@
#ifndef ACE_RMCAST_RETRANSMIT_H
#define ACE_RMCAST_RETRANSMIT_H
-#include <ace/Hash_Map_Manager.h>
-#include <ace/Thread_Manager.h>
+#include "ace/Hash_Map_Manager.h"
+#include "ace/Thread_Manager.h"
#include "Stack.h"
#include "Protocol.h"
@@ -38,12 +38,12 @@ namespace ACE_RMCast
// Shouldn't be available but ACE_Hash_Map needs it.
//
Descr ()
- : data_ (), count_ (0)
+ : msg_ (), count_ (0)
{
}
- Descr (Data_ptr d)
- : data_ (d), count_ (0)
+ Descr (Message_ptr msg)
+ : msg_ (msg), count_ (0)
{
}
@@ -59,17 +59,14 @@ namespace ACE_RMCast
count_ = 0;
}
- // It would be logical to return data_ptr but ACE ref_auto_ptr
- // hasn't learned how to convert between pointers yet.
- //
- Profile_ptr
- data () const
+ Message_ptr
+ message () const
{
- return Profile_ptr (new Data (*data_));
+ return msg_->clone ();
}
private:
- Data_ptr data_;
+ Message_ptr msg_;
unsigned long count_;
};
@@ -89,6 +86,8 @@ namespace ACE_RMCast
Mutex mutex_;
Condition cond_;
+ u64 sn_;
+
bool stop_;
ACE_Thread_Manager tracker_mgr_;
};
diff --git a/protocols/ace/RMCast/Simulator.h b/protocols/ace/RMCast/Simulator.h
index f8ffe13473f..27b227f488b 100644
--- a/protocols/ace/RMCast/Simulator.h
+++ b/protocols/ace/RMCast/Simulator.h
@@ -1,5 +1,3 @@
-// -*- C++ -*-
-
// file : ace/RMCast/Simulator.h
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
@@ -7,8 +5,6 @@
#ifndef ACE_RMCAST_SIMULATOR_H
#define ACE_RMCAST_SIMULATOR_H
-#include "ace/Thread_Mutex.h"
-
#include "Stack.h"
#include "Protocol.h"
#include "Bits.h"
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
index 31f2807ac53..519879ee489 100644
--- a/protocols/ace/RMCast/Socket.cpp
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -15,11 +15,11 @@
#include "Protocol.h"
#include "Bits.h"
-#include "Link.h"
-#include "Simulator.h"
-#include "Retransmit.h"
+#include "Fragment.h"
+#include "Reassemble.h"
#include "Acknowledge.h"
-
+#include "Retransmit.h"
+#include "Link.h"
#include "Socket.h"
@@ -52,8 +52,6 @@ namespace ACE_RMCast
private:
bool loop_;
- u64 sn_; //@@ lock?
-
Mutex mutex_;
Condition cond_;
@@ -61,9 +59,10 @@ namespace ACE_RMCast
ACE_Pipe signal_pipe_;
+ ACE_Auto_Ptr<Fragment> fragment_;
+ ACE_Auto_Ptr<Reassemble> reassemble_;
ACE_Auto_Ptr<Acknowledge> acknowledge_;
ACE_Auto_Ptr<Retransmit> retransmit_;
- ACE_Auto_Ptr<Simulator> simulator_;
ACE_Auto_Ptr<Link> link_;
};
@@ -71,31 +70,33 @@ namespace ACE_RMCast
Socket_Impl::
Socket_Impl (Address const& a, bool loop, bool simulator)
: loop_ (loop),
- sn_ (1),
cond_ (mutex_)
{
signal_pipe_.open ();
+ fragment_.reset (new Fragment ());
+ reassemble_.reset (new Reassemble ());
acknowledge_.reset (new Acknowledge ());
retransmit_.reset (new Retransmit ());
- simulator_.reset (new Simulator ());
link_.reset (new Link (a, simulator));
// Start IN stack from top to bottom.
//
in_start (0);
- acknowledge_->in_start (this);
+ fragment_->in_start (this);
+ reassemble_->in_start (fragment_.get ());
+ acknowledge_->in_start (reassemble_.get ());
retransmit_->in_start (acknowledge_.get ());
- simulator_->in_start (retransmit_.get ());
- link_->in_start (simulator_.get ());
+ link_->in_start (retransmit_.get ());
// Start OUT stack from bottom up.
//
link_->out_start (0);
- simulator_->out_start (link_.get ());
- retransmit_->out_start (simulator_.get ());
+ retransmit_->out_start (link_.get ());
acknowledge_->out_start (retransmit_.get ());
- out_start (acknowledge_.get ());
+ reassemble_->out_start (acknowledge_.get ());
+ fragment_->out_start (reassemble_.get ());
+ out_start (fragment_.get ());
}
Socket_Impl::
@@ -104,17 +105,19 @@ namespace ACE_RMCast
// Stop OUT stack from top to bottom.
//
out_stop ();
+ fragment_->out_stop ();
+ reassemble_->out_stop ();
acknowledge_->out_stop ();
retransmit_->out_stop ();
- simulator_->out_stop ();
link_->out_stop ();
// Stop IN stack from bottom up.
//
link_->in_stop ();
- simulator_->in_stop ();
retransmit_->in_stop ();
acknowledge_->in_stop ();
+ reassemble_->in_stop ();
+ fragment_->in_stop ();
in_stop ();
}
@@ -124,7 +127,6 @@ namespace ACE_RMCast
{
Message_ptr m (new Message);
- m->add (Profile_ptr (new SN (sn_++)));
m->add (Profile_ptr (new Data (buf, s)));
// Qualification is for VC6 and VxWorks.
diff --git a/protocols/ace/RMCast/Socket.h b/protocols/ace/RMCast/Socket.h
index 9f516d488ce..98e3b8b0ba8 100644
--- a/protocols/ace/RMCast/Socket.h
+++ b/protocols/ace/RMCast/Socket.h
@@ -1,5 +1,3 @@
-// -*- C++ -*-
-
// file : ace/RMCast/Socket.h
// author : Boris Kolpackov <boris@kolpackov.net>
// cvs-id : $Id$
diff --git a/protocols/examples/RMCast/Send_Msg/Protocol.h b/protocols/examples/RMCast/Send_Msg/Protocol.h
index c3edf43b1fb..9c7be6eb0c8 100644
--- a/protocols/examples/RMCast/Send_Msg/Protocol.h
+++ b/protocols/examples/RMCast/Send_Msg/Protocol.h
@@ -5,7 +5,7 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H
-unsigned short const payload_size = 256;
+unsigned short const payload_size = 512;
unsigned long const message_count = 10000;
struct Message
diff --git a/protocols/tests/RMCast/Protocol.h b/protocols/tests/RMCast/Protocol.h
index 18adc37fc3a..c0ecca3dae9 100644
--- a/protocols/tests/RMCast/Protocol.h
+++ b/protocols/tests/RMCast/Protocol.h
@@ -5,7 +5,7 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H
-unsigned short const payload_size = 256;
+unsigned short const payload_size = 1234;
unsigned long const message_count = 1000;
struct Message
diff --git a/protocols/tests/RMCast/Sender.cpp b/protocols/tests/RMCast/Sender.cpp
index 70b97049443..85340b6b4a0 100644
--- a/protocols/tests/RMCast/Sender.cpp
+++ b/protocols/tests/RMCast/Sender.cpp
@@ -39,7 +39,7 @@ ACE_TMAIN (int argc, ACE_TCHAR* argv[])
// Keep running in case retransmissions are needed.
//
- ACE_OS::sleep (ACE_Time_Value (50, 0));
+ ACE_OS::sleep (ACE_Time_Value (60, 0));
return 0;
}