summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
authorboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-09-04 20:40:20 +0000
committerboris <boris@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2005-09-04 20:40:20 +0000
commit592df896b81969d92e245f8419d5e349294cc452 (patch)
treeffc2fcb375d47419d4a69e85d6e8652491a18843 /protocols
parente0016f7d72ea4e178656e7d3bc6f316236f72720 (diff)
downloadATCD-592df896b81969d92e245f8419d5e349294cc452.tar.gz
ChangeLogTag: Sun Sep 4 22:27:19 2005 Boris Kolpackov <boris@kolpackov.net>
Diffstat (limited to 'protocols')
-rw-r--r--protocols/ace/RMCast/Link.cpp25
-rw-r--r--protocols/ace/RMCast/Socket.cpp70
-rw-r--r--protocols/ace/RMCast/Socket.h11
3 files changed, 72 insertions, 34 deletions
diff --git a/protocols/ace/RMCast/Link.cpp b/protocols/ace/RMCast/Link.cpp
index 1f3ba1d4aa8..3a9fdaea2b3 100644
--- a/protocols/ace/RMCast/Link.cpp
+++ b/protocols/ace/RMCast/Link.cpp
@@ -178,13 +178,17 @@ namespace ACE_RMCast
void Link::
recv ()
{
- // I could have used ACE_Data_Block but it does not support
- // resizing...
+ size_t max_packet_size (params_.max_packet_size ());
+
+ // This is wicked.
//
- size_t size (0), capacity (8192);
- char* data = reinterpret_cast<char*> (operator new (capacity));
+ ACE_Auto_Ptr<char> holder (
+ reinterpret_cast<char*> (
+ operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT)));
+
+ char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
- ACE_Auto_Ptr<char> holder (data); // This is wicked.
+ size_t size (0);
while (true)
{
@@ -237,7 +241,7 @@ namespace ACE_RMCast
is >> msg_size;
}
- if (msg_size <= 4)
+ if (msg_size <= 4 || msg_size > max_packet_size)
{
// Bad message.
//
@@ -245,14 +249,7 @@ namespace ACE_RMCast
continue;
}
- if (capacity < msg_size)
- {
- capacity = msg_size;
- data = reinterpret_cast<char*> (operator new (capacity));
- holder.reset (data);
- }
-
- size = rsock_.recv (data, capacity, addr);
+ size = rsock_.recv (data, max_packet_size, addr);
if (msg_size != size)
{
diff --git a/protocols/ace/RMCast/Socket.cpp b/protocols/ace/RMCast/Socket.cpp
index 4ed634bd59d..a6d83299486 100644
--- a/protocols/ace/RMCast/Socket.cpp
+++ b/protocols/ace/RMCast/Socket.cpp
@@ -44,13 +44,16 @@ namespace ACE_RMCast
send_ (void const* buf, size_t s);
ssize_t
- recv_ (void* buf, size_t s, ACE_Time_Value const* timeout);
+ recv_ (void* buf,
+ size_t s,
+ ACE_Time_Value const* timeout,
+ ACE_INET_Addr* from);
ssize_t
size_ (ACE_Time_Value const* timeout);
ACE_HANDLE
- get_handle_ () const;
+ get_handle_ ();
private:
virtual void
@@ -82,8 +85,6 @@ namespace ACE_RMCast
params_ (params),
cond_ (mutex_)
{
- signal_pipe_.open ();
-
fragment_.reset (new Fragment (params_));
reassemble_.reset (new Reassemble (params_));
acknowledge_.reset (new Acknowledge (params_));
@@ -150,7 +151,10 @@ namespace ACE_RMCast
}
ssize_t Socket_Impl::
- recv_ (void* buf, size_t s, ACE_Time_Value const* timeout)
+ recv_ (void* buf,
+ size_t s,
+ ACE_Time_Value const* timeout,
+ ACE_INET_Addr* from)
{
ACE_Time_Value abs_time;
@@ -186,15 +190,20 @@ namespace ACE_RMCast
{
// Remove data from the pipe.
//
- char c;
-
- if (ACE_OS::read (signal_pipe_.read_handle (), &c, 1) != 1)
+ if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
{
- perror ("read: ");
- abort ();
+ char c;
+
+ if (ACE::recv_n (signal_pipe_.read_handle (), &c, 1) != 1)
+ {
+ perror ("read: ");
+ abort ();
+ }
}
}
+ if (from)
+ *from = static_cast<From const*> (m->find (From::id))->address ();
if (m->find (NoData::id) != 0)
{
@@ -260,8 +269,13 @@ namespace ACE_RMCast
}
ACE_HANDLE Socket_Impl::
- get_handle_ () const
+ get_handle_ ()
{
+ if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
+ {
+ signal_pipe_.open ();
+ }
+
return signal_pipe_.read_handle ();
}
@@ -295,12 +309,15 @@ namespace ACE_RMCast
{
// Also write to the pipe.
//
- char c;
-
- if (ACE_OS::write (signal_pipe_.write_handle (), &c, 1) != 1)
+ if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
{
- // perror ("write: ");
- abort ();
+ char c;
+
+ if (ACE::send_n (signal_pipe_.write_handle (), &c, 1) != 1)
+ {
+ // perror ("write: ");
+ abort ();
+ }
}
cond_.signal ();
@@ -332,13 +349,28 @@ namespace ACE_RMCast
ssize_t Socket::
recv (void* buf, size_t s)
{
- return impl_->recv_ (buf, s, 0);
+ return impl_->recv_ (buf, s, 0, 0);
+ }
+
+ ssize_t Socket::
+ recv (void* buf, size_t s, ACE_INET_Addr& from)
+ {
+ return impl_->recv_ (buf, s, 0, &from);
}
ssize_t Socket::
recv (void* buf, size_t s, ACE_Time_Value const& timeout)
{
- return impl_->recv_ (buf, s, &timeout);
+ return impl_->recv_ (buf, s, &timeout, 0);
+ }
+
+ ssize_t Socket::
+ recv (void* buf,
+ size_t s,
+ ACE_Time_Value const& timeout,
+ ACE_INET_Addr& from)
+ {
+ return impl_->recv_ (buf, s, &timeout, &from);
}
ssize_t Socket::
@@ -354,7 +386,7 @@ namespace ACE_RMCast
}
ACE_HANDLE Socket::
- get_handle () const
+ get_handle ()
{
return impl_->get_handle_ ();
}
diff --git a/protocols/ace/RMCast/Socket.h b/protocols/ace/RMCast/Socket.h
index c1a0a26a565..3468df15d4d 100644
--- a/protocols/ace/RMCast/Socket.h
+++ b/protocols/ace/RMCast/Socket.h
@@ -45,6 +45,9 @@ namespace ACE_RMCast
virtual ssize_t
recv (void* buf, size_t s);
+ virtual ssize_t
+ recv (void* buf, size_t s, ACE_INET_Addr& from);
+
// Block for up to <timeout> until message is available. Upon
// successful completion return the next message. Otherwise
@@ -55,6 +58,12 @@ namespace ACE_RMCast
virtual ssize_t
recv (void* buf, size_t s, ACE_Time_Value const& timeout);
+ virtual ssize_t
+ recv (void* buf,
+ size_t s,
+ ACE_Time_Value const& timeout,
+ ACE_INET_Addr& from);
+
// Block if message is not available. Upon successful completion
// return the size of the next message. Otherwise return -1 and
@@ -79,7 +88,7 @@ namespace ACE_RMCast
// is for signalling purposes only.
//
ACE_HANDLE
- get_handle () const;
+ get_handle ();
private:
ACE_Auto_Ptr<Socket_Impl> impl_;