diff options
Diffstat (limited to 'ACE/protocols/ace')
78 files changed, 10718 insertions, 0 deletions
diff --git a/ACE/protocols/ace/HTBP/ACE_HTBP.pc.in b/ACE/protocols/ace/HTBP/ACE_HTBP.pc.in new file mode 100644 index 00000000000..0ac5c75b8cf --- /dev/null +++ b/ACE/protocols/ace/HTBP/ACE_HTBP.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: ACE_HTBP +Description: ACE HTTP Tunneling, Bidirectional, Protocol +Requires: ACE +Version: @VERSION@ +Libs: -L${libdir} -lACE_HTBP +Cflags: -I${includedir} diff --git a/ACE/protocols/ace/HTBP/HTBP.mpc b/ACE/protocols/ace/HTBP/HTBP.mpc new file mode 100644 index 00000000000..46e625eb486 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP.mpc @@ -0,0 +1,12 @@ +// -*- MPC -*- +// $Id$ + +project(HTBP) : acelib, ace_output, install { + avoids += ace_for_tao + sharedname = ACE_HTBP + dynamicflags = HTBP_BUILD_DLL + + Pkgconfig_Files { + ACE_HTBP.pc.in + } +} diff --git a/ACE/protocols/ace/HTBP/HTBP_Addr.cpp b/ACE/protocols/ace/HTBP/HTBP_Addr.cpp new file mode 100644 index 00000000000..24b837c5e6c --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Addr.cpp @@ -0,0 +1,94 @@ +// $Id$ + +#include "HTBP_Addr.h" +#include "ace/OS_NS_string.h" + +ACE_RCSID (HTBP, + ACE_HTBP_Addr, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +// Constructor +ACE::HTBP::Addr::Addr () +{ +} + +// Creates a ACE_INET_Addr from a PORT_NUMBER and the remote +// HOST_NAME. + +ACE::HTBP::Addr::Addr (u_short port_number, + const char host_name[], + int address_family) + : ACE_INET_Addr (port_number, host_name, address_family), + htid_ () +{ +} + +ACE::HTBP::Addr::Addr (const char *htid) + : htid_(htid) +{ +} + +ACE::HTBP::Addr::Addr (const ACE::HTBP::Addr &other) + : ACE_INET_Addr (other), + htid_(other.htid_) +{ + +} + +// Destructor +ACE::HTBP::Addr::~Addr () +{ +} + +int +ACE::HTBP::Addr::set (u_short port, + const char host[], + const char *htid) +{ + if (htid != 0 && ACE_OS::strlen (htid) != 0) + return this->set_htid (htid); + return this->ACE_INET_Addr::set(port,host); +} + +int +ACE::HTBP::Addr::set_htid (const char *htid) +{ + this->htid_ = htid; + this->set_port_number (0); + return 0; +} + +const char * +ACE::HTBP::Addr::get_htid (void) const +{ + return this->htid_.c_str(); +} + +int +ACE::HTBP::Addr::addr_to_string (ACE_TCHAR buffer[], + size_t size, + int ipaddr_format) const +{ + if (this->htid_.length() == 0) + return this->ACE_INET_Addr::addr_to_string(buffer,size,ipaddr_format); + if (size < htid_.length()) + return -1; + ACE_OS::strncpy (buffer, + ACE_TEXT_CHAR_TO_TCHAR(htid_.c_str()), + size); + return 0; +} + +int +ACE::HTBP::Addr::string_to_addr (const char address[], + int address_family) +{ + // if (ACE_OS::strchr (address,':')) + return this->ACE_INET_Addr::string_to_addr(address, address_family); + // this->htid_ = address; + // return 0; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Addr.h b/ACE/protocols/ace/HTBP/HTBP_Addr.h new file mode 100644 index 00000000000..b19c4cf3be0 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Addr.h @@ -0,0 +1,92 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Addr.h + * + * $Id$ + * + * @author Phil Mesnier + */ +//============================================================================= + +#ifndef ACE_HTBP_ADDR_H +#define ACE_HTBP_ADDR_H +#include /**/ "ace/pre.h" + +#include "HTBP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/INET_Addr.h" +#include "ace/Synch.h" +#include "ace/SString.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + + /** + * @class Addr + * + * @brief Defines the member functions of the pure virtual class + * Addr + * + * This class derives from ACE_INET_Addr. It is the base class for + * the Inside, Proxy and Outside classes + */ + class HTBP_Export Addr : public ACE_INET_Addr + { + public: + /// Constructor + Addr (); + + Addr (const Addr &other); + + /// Create an Addr suitable for use on the inside of a firewall this + /// take a string used to populate the htid_ member. With this address, + /// the IP addr form is ignored. + Addr (const char *htid); + + /// Initialize an Addr for the outside of a firewall. In this form, + /// the htid_ is an empty string and the arguments are passed to the + /// base address type. + Addr (u_short port_number, + const char host_name[], + int address_family = AF_UNSPEC); + + /// Destructor + virtual ~Addr (); + + int set (u_short port, + const char host [], + const char *htid); + + /// Methods to help convert the OutSide_Addr to a String + /// and vice versa + int addr_to_string (ACE_TCHAR buffer[], + size_t size, + int ipaddr_format = 1) const; + int string_to_addr (const char address[], + int address_facmily = AF_UNSPEC); + + int set_htid(const char *htid); + const char *get_htid (void) const; + + private: + /// HTID - web-obtained unique-identifier + ACE_CString htid_; + }; + + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_ADDR_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Channel.cpp b/ACE/protocols/ace/HTBP/HTBP_Channel.cpp new file mode 100644 index 00000000000..77f7f088ddc --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Channel.cpp @@ -0,0 +1,561 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file HTBP_Channel.cpp + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= +#include "HTBP_Channel.h" + +#if !defined (__ACE_INLINE__) +#include "HTBP_Channel.inl" +#endif + +#include "HTBP_Session.h" +#include "HTBP_Filter_Factory.h" + +#include "ace/Message_Block.h" +#include "ace/Reactor.h" + + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +// Initialization and termination methods. +/// Constructor. +ACE::HTBP::Channel::Channel (ACE::HTBP::Session *s) + : filter_ (0), + session_ (s), + ace_stream_ (), + notifier_ (0), + leftovers_ (1001), + data_len_ (0), + data_consumed_ (0), + state_ (Init), + error_buffer_ (0) +{ + ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this)); + this->filter_ = get_filter (); + this->request_count_ = static_cast<unsigned long> (ACE_OS::time()); +} + +/// Constructor, takes ownership of the supplied stream +ACE::HTBP::Channel::Channel (ACE_SOCK_Stream &s) + : filter_ (0), + session_ (0), + ace_stream_ (s.get_handle()), + notifier_ (0), + leftovers_ (1001), + data_len_ (0), + data_consumed_ (0), + state_ (Init), + error_buffer_ (0) + +{ + filter_ = get_filter (); + this->request_count_ = static_cast<unsigned long> (ACE_OS::time()); +} + +ACE::HTBP::Channel::Channel (ACE_HANDLE h) + : filter_ (0), + session_ (0), + ace_stream_ (h), + notifier_ (0), + leftovers_ (1001), + data_len_ (0), + data_consumed_ (0), + state_ (Init), + error_buffer_ (0) +{ + filter_ = get_filter (); + this->request_count_ = static_cast<unsigned long> (ACE_OS::time()); +} + +/// Destructor. +ACE::HTBP::Channel::~Channel (void) +{ + delete this->notifier_; + delete this->filter_; +} + + /// Dump the state of an object. +void +ACE::HTBP::Channel::dump (void) const +{ +} + +unsigned long +ACE::HTBP::Channel::request_count (void) +{ + return this->request_count_++; +} + +void +ACE::HTBP::Channel::register_notifier (ACE_Reactor *r) +{ + if (r == 0) + return; + if (this->notifier_ == 0) + { + ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this)); + } + else + { + if (notifier_->get_handle() == ACE_INVALID_HANDLE) + { + delete this->notifier_; + ACE_NEW (this->notifier_,ACE::HTBP::Notifier(this)); + } + } + + r->register_handler(notifier_,ACE_Event_Handler::READ_MASK); +} + +ACE::HTBP::Notifier * +ACE::HTBP::Channel::notifier (void) +{ + return this->notifier_; +} + +ACE_HANDLE +ACE::HTBP::Channel::get_handle (void) const +{ + return this->ace_stream_.get_handle (); +} + +void +ACE::HTBP::Channel::data_consumed (size_t n) +{ + this->data_consumed_ += n; + if (this->data_consumed_ == this->data_len_) + { + this->filter_->recv_data_trailer(this); + this->filter_->send_ack(this); + } +} + +int +ACE::HTBP::Channel::load_buffer (void) +{ + this->leftovers_.crunch(); + if (this->state() == Detached || + this->state() == Ack_Sent) + { + this->data_len_ = 0; + this->data_consumed_ = 0; + } + + ssize_t nread = 0; + errno = 0; +#if 0 + if (this->session_ && + (this->session_->sock_flags() & ACE_NONBLOCK == ACE_NONBLOCK)) +#endif + { + nread = + ACE::handle_read_ready (this->ace_stream().get_handle(), + &ACE_Time_Value::zero); + if (nread == -1 && errno == ETIME) + errno = EWOULDBLOCK; + } + if (nread != -1) + nread = this->ace_stream().recv (this->leftovers_.wr_ptr(), + this->leftovers_.space()-1); + if (nread < 1) + { + if (nread == 0 || (errno != EWOULDBLOCK && errno != EAGAIN)) + { + this->state_ = Closed; +#if 0 + ACE_ERROR ((LM_ERROR, + "load_buffer[%d] %p\n", + this->ace_stream_.get_handle(),"recv")); +#endif + } + return nread; + } + this->leftovers_.wr_ptr(nread); + *this->leftovers_.wr_ptr() = '\0'; + return nread; +} + +int +ACE::HTBP::Channel::flush_buffer (void) +{ + if (this->session_) + return this->session_->flush_outbound_queue(); + return 0; +} + +int +ACE::HTBP::Channel::send_ack (void) +{ + return this->filter_->send_ack(this); +} + +int +ACE::HTBP::Channel::recv_ack (void) +{ + if (load_buffer() == -1) + return -1; + return this->filter_->recv_ack(this); +} + +void +ACE::HTBP::Channel::state (ACE::HTBP::Channel::State s) +{ + if (s == Detached) + { + this->session_->detach(this); + this->session_ = 0; + } + this->state_ = s; +} + +int +ACE::HTBP::Channel::consume_error (void) +{ + if (error_buffer_ == 0) + { + ACE_NEW_RETURN (error_buffer_, + ACE_Message_Block (this->data_len_ + 1), + 0); + } + + ssize_t result = 0; + size_t n = error_buffer_->size(); + char *buf = error_buffer_->wr_ptr(); + + if (this->leftovers_.length() > 0) + { + result = ACE_MIN (n,this->leftovers_.length()); + ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result); + this->leftovers_.rd_ptr(result); + buf += result; + } + + if (result < (ssize_t)n && + result < (ssize_t)data_len_) + { + n -= result; + result += this->ace_stream_.recv(buf, n); + } + if (result > 0) + { + this->error_buffer_->wr_ptr(result); + this->data_consumed_ += result; + if (this->data_consumed_ == this->data_len_) + { + *this->error_buffer_->wr_ptr() = '\0'; + if (ACE::debug()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ACE::HTBP::Channel::consume_error ") + ACE_TEXT("Received entire error buffer: \n%s\n"), + this->error_buffer_->rd_ptr())); + delete error_buffer_; + error_buffer_ = 0; + + return 1; + } + } + return 0; +} + +//--------------------------------------------------------------------------- +// = I/O functions. + +/// The ACE::HTBP::Channel is a sibling of the ACE_SOCK_IO class, rather than a +/// decendant. This is due to the requirement to wrap all messages with +/// an HTTP request or reply wrapper, and to send application data in only +/// one direction on one stream. + +int +ACE::HTBP::Channel::pre_recv(void) +{ + if (ACE::debug()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ") + ACE_TEXT ("in initial state = %d\n"),state_)); + if (this->state_ == Init || + this->state_ == Detached || + this->state_ == Header_Pending || + this->state_ == Ack_Sent) + { + if (this->load_buffer() == -1 && this->leftovers_.length() == 0) + { + if (errno != EWOULDBLOCK) + this->state_ = Closed; + if (ACE::debug()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ") + ACE_TEXT ("pre_recv returning -1, state = %d, %p\n"), + state_, ACE_TEXT("load_buffer()"))); + return -1; + } + if (this->filter_->recv_data_header(this) == -1) + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ") + ACE_TEXT ("recv_data_header failed, %p\n"), + ACE_TEXT("pre_recv"))); + } + switch (this->state_) + { + case Data_Queued: + case Ack_Sent: + case Ready: + return 0; + case Header_Pending: + errno = EWOULDBLOCK; + return -1; + default: + if (ACE::debug()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::pre_recv ") + ACE_TEXT("channel[%d] state = %d, %p\n"), + this->get_handle(), + this->state_, + ACE_TEXT("pre_recv"))); + } + return -1; +} + +/// Recv an <n> byte buffer from the connected socket. +ssize_t +ACE::HTBP::Channel::recv (void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->pre_recv() == -1 && this->leftovers_.length() == 0) + return -1; + if (this->leftovers_.length() > 0) + { + result = ACE_MIN (n,this->leftovers_.length()); + ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result); + this->leftovers_.rd_ptr(result); + buf = (char *)buf + result; + } + + if (result < (ssize_t)n && + result < (ssize_t)data_len_) + { + n -= result; + result += this->ace_stream_.recv(buf, n, flags, timeout); + } + if (result > 0) + data_consumed((size_t)result); + return result; +} + + /// Recv an <n> byte buffer from the connected socket. +ssize_t +ACE::HTBP::Channel::recv (void *buf, + size_t n, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->pre_recv() == -1) + return -1; + + result = 0; + if (this->leftovers_.length() > 0) + { + result = ACE_MIN (n,this->leftovers_.length()); + ACE_OS::memcpy (buf,this->leftovers_.rd_ptr(), result); + this->leftovers_.rd_ptr(result); + buf = (char *)buf + result; + } + + if ((size_t)result < n && (size_t)result < this->data_len()) + { + n -= result; + result += this->ace_stream_.recv(buf, n, timeout); + } + + if (result > 0) + this->data_consumed((size_t)result); + return result; +} + + /// Recv an <iovec> of size <n> from the connected socket. +ssize_t +ACE::HTBP::Channel::recvv (iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->pre_recv() == -1) + return -1; + + if (this->leftovers_.length()) + { + int ndx = 0; + iovec *iov2 = new iovec[iovcnt]; + for (int i = 0; i < iovcnt; i++) + { + size_t n = ACE_MIN ((size_t) iov[i].iov_len , + (size_t) this->leftovers_.length()); + if (n > 0) + { + ACE_OS::memcpy (iov[i].iov_base,this->leftovers_.rd_ptr(), n); + this->leftovers_.rd_ptr(n); + result += n; + } + if (n < (size_t) iov[i].iov_len) + { + iov2[ndx].iov_len = iov[i].iov_len - n; + iov2[ndx].iov_base = (char *)iov[i].iov_base + n; + ndx++; + } + } + if (ndx > 0) + result += this->ace_stream_.recvv(iov2,ndx,timeout); + delete [] iov2; + } + else + result = this->ace_stream_.recvv(iov,iovcnt,timeout); + + if (result > 0) + this->data_consumed((size_t)result); + return result; +} + +ssize_t +ACE::HTBP::Channel::recvv (iovec *io_vec, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->pre_recv() == -1) + return -1; + if (ACE::debug()) + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("ACE::HTBP::Channel::recvv ") + ACE_TEXT("recvv, leftover len = %d\n"), + this->leftovers_.length())); + if (this->leftovers_.length()) + { + io_vec->iov_base = 0; + io_vec->iov_len = 0; + ACE_NEW_RETURN (io_vec->iov_base, + char[this->leftovers_.length()],-1); + io_vec->iov_len = this->leftovers_.length(); + ACE_OS::memcpy (io_vec->iov_base, + this->leftovers_.rd_ptr(), + io_vec->iov_len); + result = io_vec->iov_len; + this->leftovers_.length(0); + } + else + result = this->ace_stream_.recvv(io_vec,timeout); + + if (result > 0) + this->data_consumed((size_t)result); + return result; +} + +ssize_t +ACE::HTBP::Channel::send (const void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->filter_->send_data_header(n,this) == -1) + return -1; + result = this->ace_stream_.send(buf,n,flags,timeout); + if (result == -1) + return -1; + if (this->filter_->send_data_trailer(this) == -1) + return -1; + return result; +} + +ssize_t +ACE::HTBP::Channel::send (const void *buf, + size_t n, + const ACE_Time_Value *timeout) +{ + ssize_t result = 0; + if (this->filter_ == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::send: filter ") + ACE_TEXT ("is null\n")),-1); + if (this->filter_->send_data_header(n,this) == -1) + return -1; + result = this->ace_stream_.send (buf,n,timeout); + if (result == -1) + return -1; + if (this->filter_->send_data_trailer(this) == -1) + return -1; + return result; +} + +ssize_t +ACE::HTBP::Channel::sendv (const iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout) +{ + if (this->ace_stream_.get_handle() == ACE_INVALID_HANDLE) + this->session_->inbound(); + + ssize_t result = 0; + size_t n = 0; + for (int i = 0; i < iovcnt; n += iov[i++].iov_len); + + if (this->filter_->send_data_header(n,this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ") + ACE_TEXT("%p\n"), + ACE_TEXT("send_data_header")),-1); + + result = this->ace_stream_.sendv (iov,iovcnt,timeout); + + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ") + ACE_TEXT("%p\n"), + ACE_TEXT("ace_stream_.sendv")),-1); + + if (this->filter_->send_data_trailer(this) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Channel::sendv ") + ACE_TEXT("%p\n"), + ACE_TEXT("send_data_trailer\n")),-1); + return result; +} + +int +ACE::HTBP::Channel::enable (int value) const +{ + this->ace_stream_.enable(value); + + return 0; //this->ace_stream_.enable(value); +} + +int +ACE::HTBP::Channel::disable (int value) const +{ + this->ace_stream_.disable(value); + + return 0;//this->ace_stream_.disable(value); +} + +ACE::HTBP::Filter * +ACE::HTBP::Channel::get_filter () +{ + ACE::HTBP::Filter_Factory *factory = 0; + + // @todo Should I be throwing an exception here if + // memory is not allocated right ? + ACE_NEW_RETURN (factory, + ACE::HTBP::Filter_Factory, + 0); + int inside = (this->session_ != 0); + return factory->get_filter (inside); +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Channel.h b/ACE/protocols/ace/HTBP/HTBP_Channel.h new file mode 100644 index 00000000000..b33d446e40b --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Channel.h @@ -0,0 +1,280 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file HTBP_Channel.h + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= + +#ifndef ACE_HTBP_CHANNEL_H +#define ACE_HTBP_CHANNEL_H +#include /**/ "ace/pre.h" + +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" +#include "HTBP_Addr.h" +#include "HTBP_Filter.h" +#include "HTBP_Notifier.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + // Forward declarations. + class Session; + + /** + * @class ACE::HTBP::Channel + * + * @brief Defines the methods in the <Ht_Channel> abstraction. + * + * This adds additional wrapper methods atop the <IO> + * class. + * + * <buf> is the buffer to write from or receive into. + * <len> is the number of bytes to transfer. + * The <timeout> parameter in the following methods indicates how + * long to blocking trying to transfer data. If <timeout> == 0, + * then the call behaves as a normal send/recv call, i.e., for + * blocking sockets, the call will block until action is possible; + * for non-blocking sockets, EWOULDBLOCK will be returned if no + * action is immediately possible. + * If <timeout> != 0, the call will wait for data to arrive no longer + * than the relative time specified in *<timeout>. + * The "_n()" I/O methods keep looping until all the data has been + * transferred. These methods also work for sockets in non-blocking + * mode i.e., they keep looping on EWOULDBLOCK. <timeout> is used + * to make sure we keep making progress, i.e., the same timeout + * value is used for every I/O operation in the loop and the timeout + * is not counted down. + * The return values for the "*_n()" methods match the return values + * from the non "_n()" methods and are specified as follows: + * - On complete transfer, the number of bytes transferred is returned. + * - On timeout, -1 is returned, errno == ETIME. + * - On error, -1 is returned, errno is set to appropriate error. + * - On EOF, 0 is returned, errno is irrelevant. + * + * On partial transfers, i.e., if any data is transferred before + * timeout/error/EOF, <bytes_transferred> will contain the number of + * bytes transferred. + * Methods with <iovec> parameter are I/O vector variants of the I/O + * operations. + * Methods with the extra <flags> argument will always result in + * <send> getting called. Methods without the extra <flags> argument + * will result in <send> getting called on Win32 platforms, and + * <write> getting called on non-Win32 platforms. + */ + class HTBP_Export Channel + { + public: + friend class Notifier; + // Initialization and termination methods. + /// Constructor. + Channel (Session *s = 0); + + Channel (ACE_SOCK_Stream &s); + Channel (ACE_HANDLE h); + + /// Destructor. + ~Channel (void); + + enum State { + Init, + Ready, + Data_Queued, + Ack_Sent, + Header_Sent, + Closed, + Send_Ack, + Detached, + Wait_For_Ack, + Header_Pending + }; + + // = I/O functions. + + /// The Channel is a sibling of the ACE_SOCK_IO class, rather than a + /// decendant. This is due to the requirement to wrap all messages with + /// an HTTP request or reply wrapper, and to send application data in only + /// one direction on one stream. + + /// Recv an <n> byte buffer from the connected socket. + ssize_t recv (void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout = 0); + + /// Recv an <n> byte buffer from the connected socket. + ssize_t recv (void *buf, + size_t n, + const ACE_Time_Value *timeout = 0); + + /// Recv an <iovec> of size <n> from the connected socket. + ssize_t recvv (iovec iov[], + int n, + const ACE_Time_Value *timeout = 0); + + /// Same as above. Deprecated. + ssize_t recv (iovec iov[], + size_t n, + const ACE_Time_Value *timeout = 0); + + /** + * Allows a client to read from a socket without having to provide a + * buffer to read. This method determines how much data is in the + * socket, allocates a buffer of this size, reads in the data, and + * returns the number of bytes read. The caller is responsible for + * deleting the member in the <iov_base> field of <io_vec> using + * delete [] io_vec->iov_base. + */ + ssize_t recvv (iovec *io_vec, + const ACE_Time_Value *timeout = 0); + + /// Send an <n> byte buffer to the connected socket. + ssize_t send (const void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout = 0); + + /// Send an <n> byte buffer to the connected socket. + ssize_t send (const void *buf, + size_t n, + const ACE_Time_Value *timeout = 0); + + /// Send an <iovec> of size <n> to the connected socket. + ssize_t sendv (const iovec iov[], + int n, + const ACE_Time_Value *timeout = 0); + + // = Selectively close endpoints. + /// Close down the reader. + int close_reader (void); + + /// Close down the writer. + int close_writer (void); + + /** + * Close down the socket (we need this to make things work correctly + * on Win32, which requires use to do a <close_writer> before doing + * the close to avoid losing data). + */ + int close (void); + + // = Meta-type info + typedef ACE_INET_Addr PEER_ADDR; + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + + public: + Session *session (void) const; + void session (Session *); + + Notifier *notifier (void); + + void register_notifier (ACE_Reactor *r); + + ACE_HANDLE get_handle (void) const; + + const ACE_SOCK_Stream &ace_stream (void) const; + ACE_SOCK_Stream &ace_stream (void); + + ///@notes Added the following methods to continue with + /// current compilation of HTIOP. Might not be needed in + /// future. - Priyanka + /// {@ + void set_handle (ACE_HANDLE h); + + /** + * Enable asynchronous I/O (ACE_SIGIO), urgent data (ACE_SIGURG), + * non-blocking I/O (ACE_NONBLOCK), or close-on-exec (ACE_CLOEXEC), + * which is passed as the <value>. + */ + int enable (int value) const; + + /** + * Disable asynchronous I/O (ACE_SIGIO), urgent data (ACE_SIGURG), + * non-blocking I/O (ACE_NONBLOCK), or close-on-exec (ACE_CLOEXEC), + * which is passed as the <value>. + */ + int disable (int value) const; + //@} + + // buffer related methods. + ACE_Message_Block &leftovers (void); + size_t data_len (void) const; + void data_len (size_t n); + size_t data_consumed (void) const; + void data_consumed (size_t n); + int load_buffer (void); + int flush_buffer (void); + int recv_ack (void); + int send_ack (void); + unsigned long request_count(); + Filter *filter (void) const; + void filter (Filter *); + State state (void) const; + void state (Channel::State s); + int pre_recv(void); + int consume_error (void); + + private: + + /// Get the filter from the Filter_Factory + Filter * get_filter (); + + /// The filter_ is a pluggable component used to manage the + /// wrapping of data in a way suitable for the proxy to + /// manage. The actual filter instance is owned by the + /// session. Different filters may be required depending on + /// which side of the proxy we are on, and the direction of data + /// flow. + Filter *filter_; + + /// The session_ is a reference to the persistent session this + /// stream is associated with. On the inside, sessions are + /// created by the Connector, which then create streams on + /// demand. + Session *session_; + + /// Since this stream implements its own send and recv methods, + /// the filter cannot simply call the send and recv methods of + /// this class. Therefore an ACE_SOCK_Stream wrapper around the + /// same socket instance is necessary to avoid recursion. + ACE_SOCK_Stream ace_stream_; + + Notifier *notifier_; + + ACE_Message_Block leftovers_; + size_t data_len_; + size_t data_consumed_; + State state_; + ACE_Message_Block *error_buffer_; + unsigned long request_count_; + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "HTBP_Channel.inl" +#endif + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_CHANNEL_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Channel.inl b/ACE/protocols/ace/HTBP/HTBP_Channel.inl new file mode 100644 index 00000000000..7df1ba8f16e --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Channel.inl @@ -0,0 +1,124 @@ +// -*- C++ -*- +// +// $Id$ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +ACE::HTBP::Session * +ACE::HTBP::Channel::session (void) const +{ + return this->session_; +} + +// session assignment only occurs when the stream is transferred to another +// session. Do *not* delete the old session object. +ACE_INLINE +void +ACE::HTBP::Channel::session (ACE::HTBP::Session *s) +{ + this->session_ = s; +} + +// I'm not sure of the proper use case for getting the filter reference. The +// problem is that the filter is not reference counted, so the returned value +// should not be stored. +ACE_INLINE +ACE::HTBP::Filter * +ACE::HTBP::Channel::filter (void) const +{ + return this->filter_; +} + +// Filters are owned by the channel. If a new filter is applied, the old one +// must be deleted. +ACE_INLINE +void +ACE::HTBP::Channel::filter (ACE::HTBP::Filter *f) +{ + if (this->filter_) + delete this->filter_; + this->filter_ = f; +} + +ACE_INLINE +int +ACE::HTBP::Channel::close_reader (void) +{ + return this->ace_stream_.close_reader(); +} + +ACE_INLINE +int +ACE::HTBP::Channel::close_writer (void) +{ + return this->ace_stream_.close_writer(); +} + +ACE_INLINE +int +ACE::HTBP::Channel::close (void) +{ + return this->ace_stream_.close(); +} + +ACE_INLINE +ACE::HTBP::Channel::State +ACE::HTBP::Channel::state (void) const +{ + return this->state_; +} + +ACE_INLINE +ACE_Message_Block & +ACE::HTBP::Channel::leftovers (void) +{ + return this->leftovers_; +} + +ACE_INLINE +size_t +ACE::HTBP::Channel::data_len(void) const +{ + return this->data_len_; +} + +ACE_INLINE +void +ACE::HTBP::Channel::data_len(size_t n) +{ + this->data_len_ = n; +} + +ACE_INLINE +size_t +ACE::HTBP::Channel::data_consumed(void) const +{ + return this->data_consumed_; +} + + +ACE_INLINE +const ACE_SOCK_Stream& +ACE::HTBP::Channel::ace_stream (void) const +{ + return this->ace_stream_; +} + +ACE_INLINE +ACE_SOCK_Stream& +ACE::HTBP::Channel::ace_stream (void) +{ + return this->ace_stream_; +} + +ACE_INLINE +void +ACE::HTBP::Channel::set_handle (ACE_HANDLE h) +{ + if (h == 0) + return; + this->ace_stream_.set_handle (h); +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Environment.cpp b/ACE/protocols/ace/HTBP/HTBP_Environment.cpp new file mode 100644 index 00000000000..8fa1b0cc397 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Environment.cpp @@ -0,0 +1,203 @@ +// $Id$ + +#include "HTBP_Environment.h" + +ACE_RCSID (HTBP, + ACE_HTBP_Environment, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE::HTBP::Environment::Environment (ACE_Configuration *config, + int using_registry, + const ACE_TCHAR *persistent_file) + : config_ (config), + imp_exp_ (0), + own_config_ (config != 0) +{ + initialize (using_registry, + persistent_file); +} + +ACE::HTBP::Environment::~Environment () +{ + if (!own_config_) + this->clear(); + else + delete this->config_; + + delete this->imp_exp_; +} + +void +ACE::HTBP::Environment::clear () +{ + if (this->config_) + this->config_->remove_section (config_->root_section (), + ACE_TEXT("htbp"), + 1); +} + +int +ACE::HTBP::Environment::initialize (int use_registry, + const ACE_TCHAR *persistent_file) +{ + if (this->config_ == 0) + { + int result = -1; + if (use_registry) + result = this->open_registry_config(); + if (result == -1) + result = this->open_persistent_config (persistent_file); + if (result != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::Environment") + ACE_TEXT("::initialize ") + ACE_TEXT("Open Config failed")), + -1); + } + } + + + ACE_NEW_RETURN (this->imp_exp_, + ACE_Ini_ImpExp (*this->config_), + -1); + + if (this->config_->open_section (config_->root_section (), + ACE_TEXT("htbp"), 1, + this->htbp_key_) != 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::Environment::initialize ") + ACE_TEXT("Open HTBP Section failed")), + -1); + return 0; +} + +int +ACE::HTBP::Environment::open_registry_config () +{ +#if defined (ACE_WIN32) && !defined (ACE_LACKS_WIN32_REGISTRY) + HKEY root = + ACE_Configuration_Win32Registry::resolve_key + (HKEY_LOCAL_MACHINE,ACE_TEXT("Software\\HTBP\\Environment")); + + ACE_NEW_RETURN (this->config_, + ACE_Configuration_Win32Registry (root), + -1); + return 0; +#else + errno = ENOTSUP; + return -1; +#endif /* ACE_WIN32 && !ACE_LACKS_WIN32_REGISTRY */ +} + +int +ACE::HTBP::Environment::open_persistent_config (const ACE_TCHAR *persistent_file) +{ + ACE_Configuration_Heap *heap; + ACE_NEW_RETURN (heap, + ACE_Configuration_Heap, + -1); + // do this before trying to open so it isn't leaked if the open fails. + this->config_ = heap; + if (persistent_file == 0) + heap->open(); + else + if (heap->open (persistent_file) != 0) + ACE_ERROR_RETURN (( LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Environment::") + ACE_TEXT ("open_config: %p\n"), + persistent_file), + -1 ); + return 0; +} + +int +ACE::HTBP::Environment::get_htid_url (ACE_TString &htid_url) const +{ + return this->config_->get_string_value (this->htbp_key_, + ACE_TEXT("htid_url"), + htid_url); +} + +int +ACE::HTBP::Environment::set_htid_url (const ACE_TCHAR *htid_generator_url) +{ + return this->config_->set_string_value (this->htbp_key_, + ACE_TEXT("htid_url"), + htid_generator_url); +} + + +int +ACE::HTBP::Environment::get_htid_via_proxy (int &via_proxy) const +{ + return this->config_->get_integer_value (this->htbp_key_, + ACE_TEXT("htid_via_proxy"), + (u_int &)via_proxy); +} + +int +ACE::HTBP::Environment::set_htid_via_proxy (int via_proxy) +{ + return this->config_->set_integer_value (this->htbp_key_, + ACE_TEXT("htid_via_proxy"), + (u_int)via_proxy); +} + +int +ACE::HTBP::Environment::get_proxy_host (ACE_TString &proxy_host) const +{ + return this->config_->get_string_value (this->htbp_key_, + ACE_TEXT("proxy_host"), + proxy_host); +} + +int +ACE::HTBP::Environment::set_proxy_host (const ACE_TCHAR *proxy_host) +{ + return this->config_->set_string_value (this->htbp_key_, + ACE_TEXT("proxy_host"), + proxy_host); +} + +int +ACE::HTBP::Environment::get_proxy_port (unsigned int &proxy_port) const +{ + int result = this->config_->get_integer_value (this->htbp_key_, + ACE_TEXT("proxy_port"), + proxy_port); + if (result != 0) + { + ACE_TString port_str; + result = this->config_->get_string_value (this->htbp_key_, + ACE_TEXT("proxy_port"), + port_str); + if (result == 0) + proxy_port = ACE_OS::strtol(port_str.c_str(),0,10); + } + return result; +} + +int +ACE::HTBP::Environment::set_proxy_port (unsigned int proxy_port) +{ + return this->config_->set_integer_value (this->htbp_key_, + ACE_TEXT("proxy_port"), + proxy_port); +} + +int +ACE::HTBP::Environment::import_config (const ACE_TCHAR *filename) +{ + return this->imp_exp_->import_config (filename); +} + +int +ACE::HTBP::Environment::export_config (const ACE_TCHAR *filename) +{ + return this->imp_exp_->export_config (filename); +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Environment.h b/ACE/protocols/ace/HTBP/HTBP_Environment.h new file mode 100644 index 00000000000..894a376457a --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Environment.h @@ -0,0 +1,110 @@ +// -*- C++ -*- +// $Id$ + +#ifndef ACE_HTBP_ENVIRONMENT_H +#define ACE_HTBP_ENVIRONMENT_H +#include /**/ "ace/pre.h" + +#include "HTBP_Export.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "ace/Configuration.h" +#include "ace/Configuration_Import_Export.h" +#include "ace/Singleton.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + + /** + * @class Environment + * + * @brief + * + * This class manages the storage of configuration data, either in a + * flat file for Unix systems, or in the Windows registry + */ + class HTBP_Export Environment + { + public: + + /// Constructor + Environment (ACE_Configuration *config = 0, + int using_registry = 0, + const ACE_TCHAR *persistent_file = 0); + + /// Destructor + ~Environment (); + + /// removes all htbp related values from the configuration + void clear (void); + + /// Accessors to HTID URL + /// Returns 0 on success + /// {@ + int get_htid_url (ACE_TString &htid_url) const; + int set_htid_url (const ACE_TCHAR *htid_generator_url); + /// @} + + /// Accessors to HTID via proxy - If value is true, HTID request goes + /// through configured proxy otherwise it uses a direct connection. + /// Returns 0 on success + /// {@ + int get_htid_via_proxy (int &htid_via_proxy) const; + int set_htid_via_proxy (int htid_via_proxy); + /// @} + + /// Accessors to Proxy Address + /// Returns 0 on success + /// {@ + int get_proxy_host (ACE_TString &proxy_host) const; + int set_proxy_host (const ACE_TCHAR *proxy_host); + /// @} + + /// Returns 0 on success + /// {@ + int get_proxy_port (unsigned int &proxy_port) const; + int set_proxy_port (unsigned int proxy_port); + /// @} + + /// Import the configuration from the filename + int import_config (const ACE_TCHAR *filename); + + /// Export the configuration to the filename + int export_config (const ACE_TCHAR *filename); + + private: + /// Open config, set sections etc + int initialize (int use_registry, + const ACE_TCHAR *persistent_file); + + /// Open an ACE_Configuration of the appropriate type. + int open_registry_config (); + int open_persistent_config (const ACE_TCHAR *persistent_file); + + /// Our Proxy and HTID generator URL database + ACE_Configuration *config_; + + /// Flat section of HTBP related items + ACE_Configuration_Section_Key htbp_key_; + + /// the wrapper used to import or export initialization values + ACE_Ini_ImpExp *imp_exp_; + + /// We created the config instance, so we must clean it up + int own_config_; + + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_ENVIRONMENT_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Export.h b/ACE/protocols/ace/HTBP/HTBP_Export.h new file mode 100644 index 00000000000..19ce8a89547 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Export.h @@ -0,0 +1,53 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl -d -s HTBP +// ------------------------------ +#ifndef HTBP_EXPORT_H +#define HTBP_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (HTBP_HAS_DLL) +# define HTBP_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && HTBP_HAS_DLL */ + +#if !defined (HTBP_HAS_DLL) +# define HTBP_HAS_DLL 1 +#endif /* ! HTBP_HAS_DLL */ + +#if defined (HTBP_HAS_DLL) && (HTBP_HAS_DLL == 1) +# if defined (HTBP_BUILD_DLL) +# define HTBP_Export ACE_Proper_Export_Flag +# define HTBP_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define HTBP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* HTBP_BUILD_DLL */ +# define HTBP_Export ACE_Proper_Import_Flag +# define HTBP_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define HTBP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* HTBP_BUILD_DLL */ +#else /* HTBP_HAS_DLL == 1 */ +# define HTBP_Export +# define HTBP_SINGLETON_DECLARATION(T) +# define HTBP_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* HTBP_HAS_DLL == 1 */ + +// Set HTBP_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (HTBP_NTRACE) +# if (ACE_NTRACE == 1) +# define HTBP_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define HTBP_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !HTBP_NTRACE */ + +#if (HTBP_NTRACE == 1) +# define HTBP_TRACE(X) +#else /* (HTBP_NTRACE == 1) */ +# define HTBP_TRACE(X) ACE_TRACE_IMPL(X) +#endif /* (HTBP_NTRACE == 1) */ + +#endif /* HTBP_EXPORT_H */ + +// End of auto generated file. diff --git a/ACE/protocols/ace/HTBP/HTBP_Filter.cpp b/ACE/protocols/ace/HTBP/HTBP_Filter.cpp new file mode 100644 index 00000000000..094539c05a0 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Filter.cpp @@ -0,0 +1,57 @@ +// $Id$ + +#include "ace/Log_Msg.h" + +#include "HTBP_Session.h" +#include "HTBP_Filter.h" + +#if !defined (__ACE_INLINE__) +#include "HTBP_Filter.inl" +#endif + +ACE_RCSID(HTBP,ACE_HTBP_Filter,"$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE::HTBP::Filter::Filter (void) + :http_code_ (0) +{ +} + +ACE::HTBP::Filter::~Filter (void) +{ +} + +char * +ACE::HTBP::Filter::header_complete (ACE::HTBP::Channel *ch) +{ + if (ch->leftovers().length() == 0) + { + return 0; + } + if (ch->leftovers().space() > 0) + *ch->leftovers().wr_ptr() = '\0'; + char *start = ch->leftovers().rd_ptr(); + char *nl = ACE_OS::strchr (start,'\n'); + if (this->http_code_ == 0) + { + char *code = ACE_OS::strstr (start,"HTTP/1."); + if (code && code < nl) + { + code += 9; // skip past "HTTP/1.1 " + this->http_code_ = ACE_OS::strtol(code,0,10); + } + } + + while (nl) + { + if ((nl == start) || + (nl == start +1 && *start == '\r')) + return nl + 1; + start = nl + 1; + nl = ACE_OS::strchr (start,'\n'); + } + return 0; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Filter.h b/ACE/protocols/ace/HTBP/HTBP_Filter.h new file mode 100644 index 00000000000..ebc9ebb895f --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Filter.h @@ -0,0 +1,104 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Filter.h + * + * $Id$ + * + * @author Phil Mesnier + */ +//============================================================================= + +#ifndef ACE_HTBP_FILTER_H +#define ACE_HTBP_FILTER_H +#include /**/ "ace/pre.h" + +#include "ace/OS.h" +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + + // Forward declarations. + class Channel; + + /** + * @class ACE_HTBP_Filter + * + * @brief Defines the methods in the <ACE_HTBP_Filter> abstraction. + * + * A filter is responsible for wrapping / unwrapping messages + * depending on direction of flow on each stream. There will be 4 + * filters participating in a session, inside-send, inside-recv, + * outside-send, ouside-recv. It is anticipated that specialized + * filters will be implemented that tune the filter behavior based + * on proxy particulars. That, or there will be a matrix + * configuration parameters that may be tweeked to match the + * configuration. It remains to be seen which is easier to + * manage. + */ + class HTBP_Export Filter + { + public: + Filter (void); + + virtual ~Filter (void); + + /// Inside and outside Send filters need to supply a data header that + /// sets the stage before sending the data body, and then (maybe) send a + /// data trailer that closes the message. The data header may be formatted + /// differently on the first message, to send optional context information. + virtual ssize_t send_data_header (ssize_t data_len, Channel *ch); + virtual ssize_t send_data_trailer (Channel *ch); + + /// Upon receiving data, the Channel is obliged to send an ack. This is + /// either an empty document if this is the in-to-out stream, or a new + /// long-duration document request if this is the out-to-in stream. + virtual int send_ack (Channel *ch); + virtual int recv_ack (Channel *ch); + + /// receiving data must compliment sending. In the case of an in-to-out + /// stream. It is possible that the stream identity is not known until the + /// first request is received. In this case the filter will have to look + /// lookup the received session ID and either create a new instance or + /// reassign itself to an existing sesion. In that case, the associated + /// stream will have to be reassigned as well. + /// + virtual ssize_t recv_data_header (Channel *ch); + virtual ssize_t recv_data_trailer(Channel *ch); + + /// reset the http_code_ in anticipation of a new message. + void reset_http_code (void); + + /// get the value of the most recent http result code + int http_code (void); + + protected: + char *header_complete (Channel *ch); + + private: + /// Store the most recent HTTP result code. Typically it will be 200, + /// but it could be a 4xx or 5xx code if the proxy reports an error. + int http_code_; + + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "HTBP_Filter.inl" +#endif + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_FILTER_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Filter.inl b/ACE/protocols/ace/HTBP/HTBP_Filter.inl new file mode 100644 index 00000000000..c648393daa4 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Filter.inl @@ -0,0 +1,63 @@ +// -*- C++ -*- +// +// $Id$ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +ssize_t +ACE::HTBP::Filter::send_data_header (ssize_t, ACE::HTBP::Channel * ) +{ + return 0; +} + +ACE_INLINE +ssize_t +ACE::HTBP::Filter::send_data_trailer (ACE::HTBP::Channel *) +{ + return 0; +} + +ACE_INLINE +int +ACE::HTBP::Filter::send_ack (ACE::HTBP::Channel *) +{ + return 0; +} + +ACE_INLINE +int +ACE::HTBP::Filter::recv_ack (ACE::HTBP::Channel *) +{ + return 0; +} + +ACE_INLINE +ssize_t +ACE::HTBP::Filter::recv_data_header (ACE::HTBP::Channel *) +{ + return 0; +} + +ACE_INLINE +ssize_t +ACE::HTBP::Filter::recv_data_trailer(ACE::HTBP::Channel *) +{ + return 0; +} + +ACE_INLINE +void +ACE::HTBP::Filter::reset_http_code (void) +{ + this->http_code_ = 0; +} + +ACE_INLINE +int +ACE::HTBP::Filter::http_code (void) +{ + return this->http_code_; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.cpp b/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.cpp new file mode 100644 index 00000000000..bd9e9751b6d --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.cpp @@ -0,0 +1,56 @@ +// $Id$ + +#include "HTBP_Filter_Factory.h" +#include "HTBP_Inside_Squid_Filter.h" +#include "HTBP_Outside_Squid_Filter.h" + +#include "HTBP_Stream.h" +#include "ace/Service_Object.h" +#include "ace/Dynamic_Service.h" +#include "ace/Log_Msg.h" + +ACE_RCSID (ACE_HTBP_Filter_Factory, HTBP, "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE::HTBP::Filter_Factory::Filter_Factory () +{ +} + +ACE::HTBP::Filter_Factory::~Filter_Factory () +{ +} + +ACE::HTBP::Filter * +ACE::HTBP::Filter_Factory::get_filter (int inside) +{ + ACE::HTBP::Filter *filter = 0; + if (inside) + ACE_NEW_RETURN (filter, + ACE::HTBP::Inside_Squid_Filter (),0); + else + ACE_NEW_RETURN (filter, + ACE::HTBP::Outside_Squid_Filter (),0); + return filter; +} + +int +ACE::HTBP::Filter_Factory::init (int /* argc */, + ACE_TCHAR * /* argv */ []) +{ + return 0; +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +ACE_STATIC_SVC_DEFINE (ACE_HTBP_Filter_Factory, + ACE_TEXT ("Filter_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (ACE_HTBP_Filter_Factory), + ACE_Service_Type::DELETE_THIS | + ACE_Service_Type::DELETE_OBJ, + 0) + +ACE_FACTORY_NAMESPACE_DEFINE (HTBP, ACE_HTBP_Filter_Factory, + ACE::HTBP::Filter_Factory) + diff --git a/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.h b/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.h new file mode 100644 index 00000000000..0681055b679 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Filter_Factory.h @@ -0,0 +1,70 @@ +// -*- C++ -*- +// $Id$ + +//============================================================================= +/** + * @file HTBP_Filter_Factory.h + * + * $Id$ + * + * @author Priyanka Gontla + */ +//============================================================================= + +#ifndef ACE_HTBP_FILTER_FACTORY +#define ACE_HTBP_FILTER_FACTORY +#include /**/ "ace/pre.h" + +#include "HTBP_Export.h" +#include "HTBP_Filter.h" +#include "ace/Service_Config.h" +#include "ace/Service_Object.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + class Stream; + + /** + * @class Filter_Factory + * + * @brief Factory to get the Filters + * + * The Filter Factory is to be used for getting filters based on the + * mode requested. This class is a ACE_Service_Object and should be + * loadable with the help of the service configurator framework. + */ + class HTBP_Export Filter_Factory : public ACE_Service_Object + { + public: + + /// Constructor + Filter_Factory (); + + ~Filter_Factory (); + + /// Initialization hook. + virtual int init (int argc, ACE_TCHAR *argv[]); + + // + Filter *get_filter (int inside); + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +ACE_STATIC_SVC_DECLARE_EXPORT (HTBP, ACE_HTBP_Filter_Factory) +ACE_FACTORY_DECLARE (HTBP, ACE_HTBP_Filter_Factory) + + + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_FILTER_FACTORY */ diff --git a/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.cpp b/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.cpp new file mode 100644 index 00000000000..a048a295459 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.cpp @@ -0,0 +1,141 @@ +// $Id$ + +#include "HTBP_ID_Requestor.h" +#include "HTBP_Environment.h" + +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_string.h" +#include "ace/SOCK_Stream.h" +#include "ace/SOCK_Connector.h" +#include "ace/UUID.h" + +ACE_RCSID(HTBP, + ACE_HTBP_ID_Requestor, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_TString ACE::HTBP::ID_Requestor::htid_; +ACE_SYNCH_MUTEX ACE::HTBP::ID_Requestor::htid_lock_; + +ACE::HTBP::ID_Requestor::ID_Requestor (ACE::HTBP::Environment *env) + : port_ (0), + host_ (), + url_() +{ + if (env) + { + env->get_htid_url(url_); + int via_proxy = 0; + env->get_htid_via_proxy (via_proxy); + if (via_proxy && env->get_proxy_host (host_) == 0) + env->get_proxy_port (port_); + } +} + +int +ACE::HTBP::ID_Requestor::connect_to_server (ACE_SOCK_Stream *cli_stream) +{ + if (port_ == 0 || host_.length() == 0) + { + int host_start = url_.find (ACE_TEXT("http://")) + 7; + int port_sep = 0; + int sep = 0; + if (host_start == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::ID_Requestor::") + ACE_TEXT("connect_to_server: ") + ACE_TEXT("invalid URL: \"%s\"\n"), + url_.c_str()), + -1); + port_sep = url_.find (ACE_TEXT(":"),(size_t)host_start); + sep = url_.find (ACE_TEXT("/"),(size_t)host_start); + if (sep == -1 || sep == host_start +1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::ID_Requestor::") + ACE_TEXT("connect_to_server: ") + ACE_TEXT("invalid URL: \"%s\"\n"), + url_.c_str()), + -1); + if (port_sep == -1) + { + port_sep = sep; + port_ = 80; // should be a default? + } + host_ = url_.substr(host_start,port_sep - host_start); + } + + ACE_INET_Addr remote_addr (static_cast<u_short> (port_), host_.c_str()); + ACE_SOCK_Connector con; + if (con.connect (*cli_stream, + remote_addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::ID_Requestor::") + ACE_TEXT("connect_to_server: ") + ACE_TEXT("%p\n"), + ACE_TEXT("socket connect")), + -1); + return 0; +} + +int +ACE::HTBP::ID_Requestor::send_request (ACE_SOCK_Stream *cli_stream) +{ + char *buffer; + ACE_NEW_RETURN (buffer, char[this->url_.length()+16],-1); + ACE_OS::sprintf (buffer,"GET %s HTTP/1.0\n\n", + ACE_TEXT_ALWAYS_CHAR(url_.c_str())); + int result = cli_stream->send_n (buffer,ACE_OS::strlen(buffer)); + delete [] buffer; + if (result == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::ID_Requestor::") + ACE_TEXT("send_request %p\n"), + ACE_TEXT("socket send")), -1); + return 0; +} + +ACE_TCHAR * +ACE::HTBP::ID_Requestor::get_HTID () +{ + if (ACE::HTBP::ID_Requestor::htid_.length() != 0) + return ACE::HTBP::ID_Requestor::htid_.rep(); + + ACE_Guard<ACE_SYNCH_MUTEX> guard (ACE::HTBP::ID_Requestor::htid_lock_); + + if (ACE::HTBP::ID_Requestor::htid_.length() != 0) + return ACE::HTBP::ID_Requestor::htid_.rep(); + + ACE_SOCK_Stream cli_stream; + ACE_TCHAR * htid = 0; + + if (this->url_.length() == 0 || + this->connect_to_server (&cli_stream) == -1 || + this->send_request (&cli_stream) == -1) + { + ACE_Utils::UUID_Generator gen; + ACE_Utils::UUID *uuid = gen.generate_UUID (); + const ACE_CString *uuidstr = uuid->to_string(); + ACE::HTBP::ID_Requestor::htid_ = ACE_TEXT_CHAR_TO_TCHAR (uuidstr->c_str()); + delete uuid; + return ACE::HTBP::ID_Requestor::htid_.rep(); + } + iovec recv_buf; + ssize_t result = cli_stream.recvv (&recv_buf); + cli_stream.close(); + + if (result > 0) + { + ACE_CString answer ((char *)recv_buf.iov_base,recv_buf.iov_len); + ACE_CString::size_type start = answer.rfind (ACE_TEXT('\n')); + if (start == ACE_CString::npos) + start = 0; + else + start++; + ACE::HTBP::ID_Requestor::htid_ = ACE_TEXT_CHAR_TO_TCHAR(answer.substr (start).c_str()); + htid = ACE::HTBP::ID_Requestor::htid_.rep(); + } + return htid; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.h b/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.h new file mode 100644 index 00000000000..b885f07cae5 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_ID_Requestor.h @@ -0,0 +1,63 @@ +// -*- C++ -*- +// $Id$ + +#ifndef HTID_REQUESTOR_H +#define HTID_REQUESTOR_H +#include /**/ "ace/pre.h" + +#include "HTBP_Export.h" +#include "ace/SString.h" +#include "ace/Synch.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +// Forward declarations +class ACE_SOCK_Stream; + +namespace ACE +{ + namespace HTBP + { + // Forward declarations + class Environment; + + /** + * ACE_HTBP_ID_Requestor + * + * @brief This singleton class is to be used to get a HTID + */ + class HTBP_Export ID_Requestor + { + public: + /// Constructor. + ID_Requestor (Environment * = 0); + + /// Accessor to HTID + ACE_TCHAR *get_HTID (); + + private: + /// Connects to the remote host + int connect_to_server (ACE_SOCK_Stream *cli_stream); + + /// Send HTTP GET request to the server. + int send_request (ACE_SOCK_Stream *cli_stream); + + unsigned port_; + ACE_TString host_; + ACE_TString url_; + + static ACE_TString htid_; + static ACE_SYNCH_MUTEX htid_lock_; + }; + + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* HTID_REQUESTOR_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.cpp b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.cpp new file mode 100644 index 00000000000..1bc9f4a208b --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.cpp @@ -0,0 +1,239 @@ +// ACE_HTBP_Filter.cpp +// $Id$ + +#include "ace/Log_Msg.h" + +//#include <sstream> + +#include "HTBP_Session.h" +#include "HTBP_Inside_Squid_Filter.h" + +#if !defined (__ACE_INLINE__) +#include "HTBP_Inside_Squid_Filter.inl" +#endif + +ACE_RCSID(HTBP,ACE_HTBP_Inside_Squid_Filter,"$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ssize_t +ACE::HTBP::Inside_Squid_Filter::send_data_trailer (ACE::HTBP::Channel *ch) +{ + ch->state(ACE::HTBP::Channel::Wait_For_Ack); + return 0; +} + +int +ACE::HTBP::Inside_Squid_Filter::make_request_header (ACE::HTBP::Channel *ch, + const char *cmd, + char *buffer, + size_t buffer_size) +{ + // the string is formatted as: + // command http://host:port/htid/sessionId/request<requestId>.html HTTP/1.1\n + // host:port is the remote host and port from the channel, + // htid is the local endpoint identifier + // sessionId is the discreet session counter between these peers + // requestId is the discreet request sent for this session. + + ACE::HTBP::Session *session = ch->session(); + + const char * format = "%shttp://%s:%d/%s/%d/request%d.html HTTP/1.1\n"; + char remote_host[ACE_MAX_FULLY_QUALIFIED_NAME_LEN + 1]; + unsigned remote_port = session->peer_addr().get_port_number(); + const char *local_htid = session->local_addr().get_htid(); + + ACE_UINT32 tempId = session->session_id().id_; + size_t sid_size = 1; + size_t rid_size = 1; + while (tempId /= 10) sid_size++; + tempId = ch->request_count(); + while (tempId /= 10) rid_size++; + + // This test was originally get_host_name() == -1, but this is + // problematic if the address doesn't resolve to a name. I think + // that it should be configurable, or maybe the hostname needs to + // be carried independent of the address to work with hosts that may + // have dynamic IP addresses. For now that isn't a problem. + if (session->peer_addr().get_host_addr(remote_host, + sizeof remote_host) == 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter:could not get ") + ACE_TEXT("peer_addr hostname\n")), + -1); + + + size_t size = + ACE_OS::strlen(format) + - 12 // subtract out the size of the format specifiers + + ACE_OS::strlen (cmd) + + ACE_OS::strlen (remote_host) + + 5 // maximum size for a the remote port number + + ACE_OS::strlen (local_htid) + + sid_size // size of session id + + rid_size; // size of request id + + if (size > buffer_size) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter: insufficient ") + ACE_TEXT("buffer space for request header, need %d ") + ACE_TEXT("got %d\n"), + size, buffer_size), + -1); + + ACE_OS::sprintf (buffer,format, + cmd,remote_host,remote_port, + local_htid, session->session_id().id_, + ch->request_count()); + + return ACE_OS::strlen(buffer); +} + +ssize_t +ACE::HTBP::Inside_Squid_Filter::send_data_header (ssize_t data_len, + ACE::HTBP::Channel *ch) +{ + char *buffer = new char[BUFSIZ]; + ssize_t result = -1; + if (this->make_request_header (ch,"POST ",buffer,BUFSIZ) != -1) + { + ACE_CString header (buffer); + header += "Content-Type: application/octet-stream\nContent-Length: "; + char datalenstr[20]; + ACE_OS::itoa (data_len,datalenstr,10); + header += datalenstr; + header += "\n\n"; + result = ch->ace_stream().send(header.c_str(),header.length()); + } + ch->state(result == -1 ? ACE::HTBP::Channel::Closed : + ACE::HTBP::Channel::Header_Sent); + this->reset_http_code(); + return 1; +} + +ssize_t +ACE::HTBP::Inside_Squid_Filter::recv_data_header (ACE::HTBP::Channel *ch) +{ + if (this->http_code() != 200 && this->http_code() != 0) + { + if (ACE::debug()) + ACE_ERROR ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter::recv_data_header, ") + ACE_TEXT("non-OK result code %d recvd\n"), + this->http_code())); + + errno = ch->consume_error () ? EINVAL : EWOULDBLOCK; + return 0; + } + char *header_end = this->header_complete(ch); + if (header_end == 0) + { + if (ch->state() != ACE::HTBP::Channel::Closed) + { + ch->state (ACE::HTBP::Channel::Header_Pending); + errno = EWOULDBLOCK; + } + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter::") + ACE_TEXT("recv_data_header, ") + ACE_TEXT("header not complete\n")), + 0); + } + char *start = ch->leftovers().rd_ptr(); + ACE_CString token = "Content-Length: "; + char *tpos = ACE_OS::strstr(start,token.c_str()); + char *nl = ACE_OS::strchr (start,'\n'); + if (tpos != 0) + { + tpos += token.length(); + *nl = 0; + ch->data_len (ACE_OS::strtol(tpos,0,10)); + start = nl+1; + } + + ch->leftovers().rd_ptr(header_end); + if (this->http_code() != 200) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter::recv_data_header, ") + ACE_TEXT("non-OK result code %d recvd\n"), + this->http_code())); + + errno = ch->consume_error () ? EINVAL : EWOULDBLOCK; + return 0; + } + + ch->state(ACE::HTBP::Channel::Data_Queued); + return 1; +} + +ssize_t +ACE::HTBP::Inside_Squid_Filter::recv_data_trailer (ACE::HTBP::Channel *ch) +{ + ch->state(ACE::HTBP::Channel::Send_Ack); + return 0; +} + +int +ACE::HTBP::Inside_Squid_Filter::send_ack (ACE::HTBP::Channel *ch) +{ + char *buffer = new char[BUFSIZ]; + ssize_t result = -1; + if (ch->state() == ACE::HTBP::Channel::Ack_Sent) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("HTBP::Inside Filter::send_ack: ") + ACE_TEXT("state is already ACK_SENT\n")), + 1); + } + if (this->make_request_header (ch,"GET ",buffer,BUFSIZ) != -1) + { + ACE_CString header (buffer); + header += "\n"; + result = ch->ace_stream().send(header.c_str(),header.length()); + } + ch->state(result == -1 ? + ACE::HTBP::Channel::Closed : ACE::HTBP::Channel::Ack_Sent); + this->reset_http_code(); + return 1; +} + +int +ACE::HTBP::Inside_Squid_Filter::recv_ack (ACE::HTBP::Channel *ch) +{ + + char *header_end = this->header_complete(ch); + if (header_end == 0) + { + if (ch->state() != ACE::HTBP::Channel::Closed) + errno = EWOULDBLOCK; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("HTBP::Inside_Squid_Filter::") + ACE_TEXT("recv_data_header, ") + ACE_TEXT("header not complete\n")),0); + } + if (this->http_code() == 200) + { + ch->leftovers().length(0); + ch->state(ACE::HTBP::Channel::Ready); + return 1; + } + + char *start = ch->leftovers().rd_ptr(); + ACE_CString token = "Content-Length: "; + char *tpos = ACE_OS::strstr(start,token.c_str()); + char *nl = ACE_OS::strchr (start,'\n'); + if (tpos != 0) + { + tpos += token.length(); + *nl = 0; + ch->data_len (ACE_OS::strtol(tpos,0,10)); + start = nl+1; + } + + ch->leftovers().rd_ptr(header_end); + errno = ch->consume_error () ? EINVAL : EWOULDBLOCK; + return 0; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.h b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.h new file mode 100644 index 00000000000..e60a682d81f --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.h @@ -0,0 +1,91 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Inside_Squid_Filter.h + * + * $Id$ + * + * @author Phil Mesnier + */ +//============================================================================= + +#ifndef ACE_HTBP_INSIDE_SQUID_FILTER_H +#define ACE_HTBP_INSIDE_SQUID_FILTER_H +#include /**/ "ace/pre.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" +#include "HTBP_Filter.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + + /** + * @class Inside_Squid_Filter + * + * @brief Defines the methods in the <Filter> abstraction. + * + * A filter is responsible for wrapping / unwrapping messages + * depending on direction of flow on each stream. There will be 4 + * filters participating in a session, inside-send, inside-recv, + * outside-send, ouside-recv. It is anticipated that specialized + * filters will be implemented that tune the filter behavior based + * on proxy particulars. That, or there will be a matrix + * configuration parameters that may be tweeked to match the + * configuration. It remains to be seen which is easier to + * manage. + */ + class HTBP_Export Inside_Squid_Filter : public Filter + { + public: + /// Destructor. + virtual ~Inside_Squid_Filter (void); + + /// Inside and outside Send filters need to supply a data header + /// that sets the stage before sending the data body, and then + /// (maybe) send a data trailer that closes the message. The + /// data header may be formatted differently on the first + /// message, to send optional context information. + virtual ssize_t send_data_header (ssize_t data_len, Channel *); + virtual ssize_t send_data_trailer (Channel *); + + /// Upon receiving data, the Channel is obliged to send an + /// ack. This is either an empty document if this is the + /// in-to-out stream, or a new long-duration document request if + /// this is the out-to-in stream. + virtual int send_ack (Channel *); + virtual int recv_ack (Channel *); + + /// receiving data must compliment sending. In the case of an + /// in-to-out stream. It is possible that the stream identity is + /// not known until the first request is received. In this case + /// the filter will have to look lookup the received session ID + /// and either create a new instance or reassign itself to an + /// existing sesion. In that case, the associated stream will + /// have to be reassigned as well. + /// + virtual ssize_t recv_data_header (Channel *); + virtual ssize_t recv_data_trailer(Channel *); + private: + int make_request_header (Channel *, const char *, char *, size_t ); + + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "HTBP_Inside_Squid_Filter.inl" +#endif + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_FILTER_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.inl b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.inl new file mode 100644 index 00000000000..e8b08950ce6 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Inside_Squid_Filter.inl @@ -0,0 +1,12 @@ +// -*- C++ -*- +// +// $Id$ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +ACE::HTBP::Inside_Squid_Filter::~Inside_Squid_Filter (void) +{ +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Macros.h b/ACE/protocols/ace/HTBP/HTBP_Macros.h new file mode 100644 index 00000000000..0cb1300cc1d --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Macros.h @@ -0,0 +1,10 @@ +// $Id$ + +#ifndef ACE_HTBP_MACROS_H +#define ACE_HTBP_MACROS_H + +#if !defined (ACE_HTBP_ENVIRONMENT_CONFIG_FILE) +#define ACE_HTBP_ENVIRONMENT_CONFIG_FILE "HT_Config.conf" +#endif /* ACE_HTBP_ENVIRONMENT_CONFIG_FILE */ + +#endif /* ACE_HTBP_MACROS_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Notifier.cpp b/ACE/protocols/ace/HTBP/HTBP_Notifier.cpp new file mode 100644 index 00000000000..1e02fbee29c --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Notifier.cpp @@ -0,0 +1,92 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file HTBP_Notifier.cpp + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= +#include "HTBP_Notifier.h" +#include "HTBP_Channel.h" +#include "HTBP_Session.h" +#include "ace/Reactor.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE::HTBP::Notifier::Notifier (ACE::HTBP::Channel *s) + : channel_(s) +{ +} + +int +ACE::HTBP::Notifier::handle_input(ACE_HANDLE ) +{ + switch (this->channel_->state()) + { + case ACE::HTBP::Channel::Detached: + this->channel_->pre_recv(); + break; + case ACE::HTBP::Channel::Wait_For_Ack: + this->channel_->recv_ack(); + break; + default: + this->channel_->load_buffer(); + } + + if (this->channel_->state() == ACE::HTBP::Channel::Closed) + { + this->unregister(); + return 0; + } + + if (this->channel_->session_) + { + if (this->channel_ == this->channel_->session_->inbound()) + { + ACE_Event_Handler *h = this->channel_->session_->handler(); + if (h && this->reactor()) + this->reactor()->notify(h, + ACE_Event_Handler::READ_MASK); + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Notifier::handle_input ") + ACE_TEXT ("Notifier cannot notify, session has no ") + ACE_TEXT ("handler (%x), or reactor (%x)\n"), + h,this->reactor())); + } + else + this->channel_->flush_buffer(); + } + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("(%P|%t) ACE::HTBP::Notifier::handle_input ") + ACE_TEXT ("Notifier has no session to notify!\n"))); + return 0; +} + +int +ACE::HTBP::Notifier::handle_output (ACE_HANDLE ) +{ + return -1; + +} + +void +ACE::HTBP::Notifier::unregister (void) +{ + if (this->reactor()) + this->reactor()->remove_handler(this, + ACE_Event_Handler::READ_MASK | + ACE_Event_Handler::DONT_CALL); +} + +ACE_HANDLE +ACE::HTBP::Notifier::get_handle(void) const +{ + return this->channel_->ace_stream().get_handle(); +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Notifier.h b/ACE/protocols/ace/HTBP/HTBP_Notifier.h new file mode 100644 index 00000000000..0c9dcffa2ed --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Notifier.h @@ -0,0 +1,54 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Notifier.h + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= + +#ifndef ACE_HTBP_NOTIFIER_H +#define ACE_HTBP_NOTIFIER_H +#include /**/ "ace/pre.h" + +#include "ace/Event_Handler.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + // Forward declarations. + class Channel; + + class HTBP_Export Notifier : public ACE_Event_Handler + { + public: + Notifier (Channel *ch); + int handle_input(ACE_HANDLE ); + int handle_output(ACE_HANDLE ); + + void unregister (void); + + ACE_HANDLE get_handle (void) const; + + private: + Channel *channel_; + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_NOTIFIER_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.cpp b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.cpp new file mode 100644 index 00000000000..5e626170457 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.cpp @@ -0,0 +1,200 @@ +// ACE_HTBP_Outside_Squid_Filter.cpp +// $Id$ + +#include "ace/Log_Msg.h" + +#include "HTBP_Session.h" +#include "HTBP_Outside_Squid_Filter.h" + +#if !defined (__ACE_INLINE__) +#include "HTBP_Outside_Squid_Filter.inl" +#endif + +ACE_RCSID(HTBP, + ACE_HTBP_Outside_Squid_Filter, + "$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ssize_t +ACE::HTBP::Outside_Squid_Filter::recv_data_header (ACE::HTBP::Channel *ch) +{ + // on the outside of the firewall, this method must do the details + // necessary to call replace_session or whatever to migrate the sock + // stream to the appropriate ACE::HTBP::Channel, then to finally + // assign the right filter to that stream. That filter will an + // ACE::HTBP::Outside_[Send|Recv]_Filter. + + // in the case of an ACE::HTBP::Outside_Recv_Filter, the assigned + // filter must be a null filter first, with a replacement. + + // recv header details + char *header_end = this->header_complete(ch); + if (header_end == 0) + { + if (ch->state() != ACE::HTBP::Channel::Closed) + { + ch->state(ACE::HTBP::Channel::Header_Pending); + errno = EWOULDBLOCK; + } + return 0; + } + + char *start = ch->leftovers().rd_ptr(); + + int is_inbound = 0; + ACE_CString token ("POST "); + if (ACE_OS::strncmp (start,token.c_str(),token.length()) == 0) + is_inbound = 1; + else + { + token = "GET "; + if (ACE_OS::strncmp (start, + token.c_str(), + token.length()) != 0) + { + ch->leftovers().length(0); + errno = EINVAL; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Outside_Squid_Filter::") + ACE_TEXT("recv_data_header ") + ACE_TEXT("bad request header\n")),0); + } + } + start += token.length(); + // "http://" is stripped by squid, leaving only "/" + start += (ACE_OS::strncmp (start,"http://",7) == 0) ? 7 : 1; + + // set up the actual session and stream + ACE::HTBP::Session_Id_t session_id; + char * slash = ACE_OS::strchr(start,'/'); + char * nl = ACE_OS::strchr (start,'\n'); + if (slash == 0) + { + ch->leftovers().length(0); + errno = EINVAL; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Outside_Squid_Filter::") + ACE_TEXT("recv_data_header ") + ACE_TEXT("missing sender key\n")),0); + } + *slash = 0; + session_id.local_.string_to_addr (start); + start = slash+1; + + slash = ACE_OS::strchr(start,'/'); + if (slash == 0) + { + ch->leftovers().length(0); + errno = EINVAL; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Outside_Squid_Filter::") + ACE_TEXT("recv_data_header ") + ACE_TEXT("missing sender key\n")),0); + } + *slash = 0; + session_id.peer_.string_to_addr (start); + start = slash + 1; + + slash = ACE_OS::strchr(start,' '); + if (slash == 0) + { + ch->leftovers().length (0); + errno = EINVAL; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Outside_Squid_Filter::") + ACE_TEXT("recv_data_header ") + ACE_TEXT("missing sender key")),0); + } + *slash = 0; + session_id.id_ = ACE_OS::strtol(start,0,10); + start = slash + 1; + + if (is_inbound) + { + token = "Content-Length: "; + char *tpos = ACE_OS::strstr(start,token.c_str()); + if (tpos != 0) + { + nl = ACE_OS::strchr(tpos,'\n'); + tpos += token.length(); + *nl = 0; + ch->data_len(ACE_OS::strtol(tpos,0,10)); + start = nl+1; + } + } + ch->leftovers().rd_ptr(header_end); + + ACE::HTBP::Session *session = 0; + if (ACE::HTBP::Session::find_session (session_id, session) == -1) + { + ACE_NEW_RETURN (session, ACE::HTBP::Session (session_id), 0); + if (ACE::HTBP::Session::add_session (session) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Outside_Squid_Filter::") + ACE_TEXT("recv_data_header %p"), + ACE_TEXT("add_session")),0); + } + ch->session(session); + + if (is_inbound) + { + ch->state(ACE::HTBP::Channel::Data_Queued); + session->inbound (ch); + } + else + { + ch->state(ACE::HTBP::Channel::Ready); + session->outbound (ch); + } + return 1; +} + +ssize_t +ACE::HTBP::Outside_Squid_Filter::recv_data_trailer (ACE::HTBP::Channel *ch) +{ + ch->state(ACE::HTBP::Channel::Send_Ack); + return 1; +} + +int +ACE::HTBP::Outside_Squid_Filter::send_ack (ACE::HTBP::Channel *ch) +{ + this->send_data_header (0,ch); + if (ch->state() == ACE::HTBP::Channel::Header_Sent) + ch->state(ACE::HTBP::Channel::Detached); + return 1; +} + +ssize_t +ACE::HTBP::Outside_Squid_Filter::send_data_header (ssize_t data_len, + ACE::HTBP::Channel *ch) +{ + ACE_CString header ("HTTP/1.1 200 OK\n" + "Content-Type: application/octet-stream\n" + "Content-Length: "); + char datalenstr[20]; + ACE_OS::itoa (data_len,datalenstr,10); + header += datalenstr; + header += "\n\n"; + ssize_t result = ch->ace_stream().send(header.c_str(),header.length()); + ch->state(result == -1 ? + ACE::HTBP::Channel::Closed : ACE::HTBP::Channel::Header_Sent); + this->reset_http_code(); + return 1; +} + +ssize_t +ACE::HTBP::Outside_Squid_Filter::send_data_trailer (ACE::HTBP::Channel *ch) +{ + ch->state(ACE::HTBP::Channel::Detached); + return 1; +} + +int +ACE::HTBP::Outside_Squid_Filter::recv_ack (ACE::HTBP::Channel *) +{ + return 1; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.h b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.h new file mode 100644 index 00000000000..92008c3a8b6 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.h @@ -0,0 +1,86 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file HTBP_Outside_Squid_Filter.h + * + * $Id$ + * + * @author Phil Mesnier + */ +//============================================================================= + +#ifndef ACE_HTBP_OUTSIDE_SQUID_FILTER_H +#define ACE_HTBP_OUTSIDE_SQUID_FILTER_H +#include /**/ "ace/pre.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" +#include "HTBP_Filter.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + // Forward declarations + class Channel; + + /** + * @class Outside_Squid_Filter + * + * @brief Defines the methods in the <Filter> abstraction. + * + * A filter is responsible for wrapping / unwrapping messages depending on + * direction of flow on each stream. There will be 4 filters participating in + * a session, outside-send, outside-recv, outside-send, ouside-recv. It is + * anticipated that specialized filters will be implemented that tune the + * filter behavior based on proxy particulars. That, or there will be a matrix + * configuration parameters that may be tweeked to match the configuration. + * It remains to be seen which is easier to manage. + */ + class HTBP_Export Outside_Squid_Filter : public Filter + { + public: + /// Destructor. + virtual ~Outside_Squid_Filter (void); + + /// Outside and outside Send filters need to supply a data header that + /// sets the stage before sending the data body, and then (maybe) send a + /// data trailer that closes the message. The data header may be formatted + /// differently on the first message, to send optional context information. + virtual ssize_t send_data_header (ssize_t data_len, Channel *); + virtual ssize_t send_data_trailer (Channel *); + + /// Upon receiving data, the Channel is obliged to send an ack. This is + /// either an empty document if this is the in-to-out stream, or a new + /// long-duration document request if this is the out-to-in stream. + virtual int send_ack (Channel *); + virtual int recv_ack (Channel *); + + /// receiving data must compliment sending. In the case of an in-to-out + /// stream. It is possible that the stream identity is not known until the + /// first request is received. In this case the filter will have to look + /// lookup the received session ID and either create a new instance or + /// reassign itself to an existing sesion. In that case, the associated + /// stream will have to be reassigned as well. + /// + virtual ssize_t recv_data_header (Channel *); + virtual ssize_t recv_data_trailer(Channel *); + }; + + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "HTBP_Outside_Squid_Filter.inl" +#endif + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_OUTSIDE_FILTER_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.inl b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.inl new file mode 100644 index 00000000000..e0659ae5e13 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Outside_Squid_Filter.inl @@ -0,0 +1,12 @@ +// -*- C++ -*- +// +// $Id$ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +ACE::HTBP::Outside_Squid_Filter::~Outside_Squid_Filter (void) +{ +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Session.cpp b/ACE/protocols/ace/HTBP/HTBP_Session.cpp new file mode 100644 index 00000000000..3abbfbb6c42 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Session.cpp @@ -0,0 +1,297 @@ +// SOCK_Stream.cpp +// $Id$ + +#include "ace/Log_Msg.h" + +#include "HTBP_Session.h" +#include "ace/SOCK_Connector.h" +#include "ace/Event_Handler.h" +#include "HTBP_Filter.h" +#include "HTBP_ID_Requestor.h" + +#if !defined (__ACE_INLINE__) +#include "HTBP_Session.inl" +#endif + +ACE_RCSID(HTBP,HTBP_Session,"$Id$") + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE::HTBP::Session::Session_Map ACE::HTBP::Session::session_map_; +ACE_UINT32 ACE::HTBP::Session::last_session_id_ = 0; +ACE_SYNCH_MUTEX ACE::HTBP::Session::session_id_lock_; + +/// Static method definitions +ACE_UINT32 +ACE::HTBP::Session::next_session_id () +{ + ACE_Guard<ACE_SYNCH_MUTEX> g(ACE::HTBP::Session::session_id_lock_); + return ++last_session_id_; +} + +int +ACE::HTBP::Session::add_session (ACE::HTBP::Session *s) +{ + return session_map_.bind (s->session_id(),s); +} + +int +ACE::HTBP::Session::remove_session (ACE::HTBP::Session *s) +{ + if (session_map_.current_size() > 0) + return session_map_.unbind(s->session_id()); + return 0; +} + +int +ACE::HTBP::Session::find_session (const ACE::HTBP::Session_Id_t &sid, ACE::HTBP::Session *&out) +{ + ACE::HTBP::Session::Map_Entry *e = 0; + if (session_map_.find (sid,e) == -1) + { + out = 0; + return -1; + } + out = e->int_id_; + return 0; +} + +//---------------------------------------------------------------------------- +ACE::HTBP::Session::Session (void) + : proxy_addr_ (0), + destroy_proxy_addr_ (0), + inbound_ (0), + outbound_ (0), + closed_ (0), + handler_ (0), + reactor_(0), + stream_ (0), + sock_flags_(0) +{ + ACE::HTBP::ID_Requestor req; + ACE_TCHAR * htid = req.get_HTID(); + session_id_.local_ = ACE_TEXT_ALWAYS_CHAR(htid); + delete[] htid; + session_id_.id_ = ACE::HTBP::Session::next_session_id(); + ACE_NEW (inbound_, ACE::HTBP::Channel (this)); + ACE_NEW (outbound_, ACE::HTBP::Channel (this)); +} + +ACE::HTBP::Session::Session (const ACE::HTBP::Addr &peer, + const ACE::HTBP::Addr &local, + ACE_UINT32 sid, + ACE_INET_Addr *proxy, + int take_proxy) + : proxy_addr_ (proxy), + destroy_proxy_addr_ (take_proxy), + inbound_ (0), + outbound_ (0), + closed_ (0), + handler_ (0), + reactor_(0), + stream_ (0), + sock_flags_(0) +{ + session_id_.peer_ = peer; + session_id_.local_ = local; + session_id_.id_ = (sid == 0) ? + ACE::HTBP::Session::next_session_id() : sid; + + ACE_NEW (inbound_,ACE::HTBP::Channel (this)); + ACE_NEW (outbound_,ACE::HTBP::Channel (this)); +} + +ACE::HTBP::Session::Session (const ACE::HTBP::Session_Id_t &id, + ACE_INET_Addr *proxy, + int take_proxy) + : proxy_addr_ (proxy), + destroy_proxy_addr_ (take_proxy), + session_id_(id), + inbound_ (0), + outbound_ (0), + closed_ (0), + handler_ (0), + reactor_ (0), + stream_ (0), + sock_flags_(0) +{ + ACE_NEW (inbound_, ACE::HTBP::Channel (this)); + ACE_NEW (outbound_, ACE::HTBP::Channel (this)); +} + +ACE::HTBP::Session::Session (const ACE::HTBP::Session &other) +{ + this->operator=(other); +} + +ACE::HTBP::Session& +ACE::HTBP::Session::operator= (const ACE::HTBP::Session &) +{ + // @TODO: figure out why the assignment operator is here if it is + // unimplemented? Previously there was an ACE_ASSERT(this == 0) + // so apparently something bad had been happening long ago, but I + // have no idea what. + return *this; +} + +ACE::HTBP::Session::~Session (void) +{ + if (destroy_proxy_addr_) + delete proxy_addr_; +} + +int +ACE::HTBP::Session::close (void) +{ + if (this->inbound_) + this->inbound_->close(); + if (this->outbound_) + this->outbound_->close(); + this->closed_= 1; + return ACE::HTBP::Session::remove_session (this); +} + + +ACE::HTBP::Channel * +ACE::HTBP::Session::outbound (void) const +{ + if (!this->closed_ && this->proxy_addr_) + const_cast<ACE::HTBP::Session *> (this)->reconnect(); + if ( this->outbound_ == 0) + return 0; + ACE::HTBP::Channel::State s =this->outbound_->state(); + return s == ACE::HTBP::Channel::Init || s == ACE::HTBP::Channel::Ready ? this->outbound_ : 0; +} + +void +ACE::HTBP::Session::reconnect_i (ACE::HTBP::Channel *s) +{ + ACE_SOCK_Connector conn; + char host[100]; + this->proxy_addr_->get_host_name(host,100); + if (conn.connect (s->ace_stream(),*this->proxy_addr_) == -1) + { + ACE_TCHAR buffer[128]; + this->proxy_addr_->addr_to_string(buffer,128, 0); + ACE_ERROR ((LM_ERROR, + ACE_TEXT("(%P|%t) ACE::HTBP::Session::reconnect") + ACE_TEXT(" failed to %s, %p\n"), + buffer, s == this->inbound_ ? + ACE_TEXT("inbound") : ACE_TEXT ("outbound"))); + } + s->register_notifier(this->reactor_); + if (s == this->inbound_) + s->send_ack(); +} + +ACE_Event_Handler * +ACE::HTBP::Session::handler (void) +{ + return this->handler_; +} + +void +ACE::HTBP::Session::handler (ACE_Event_Handler * h) +{ + this->handler_ = h; +} + +void +ACE::HTBP::Session::detach (ACE::HTBP::Channel *ch) +{ + if (this->inbound_ == ch) + this->inbound_ = 0; + else if (this->outbound_ == ch) + this->outbound_ = 0; + else + ACE_ERROR ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Session::detach ") + ACE_TEXT("called with unknown channel\n"))); +} + +void +ACE::HTBP::Session::reactor (ACE_Reactor *r) +{ + this->reactor_ = r; + this->inbound_->register_notifier(r); + this->outbound_->register_notifier(r); +} + +int +ACE::HTBP::Session::enqueue (ACE_Message_Block *msg) +{ + this->outbound_queue_.enqueue_tail(msg); + return msg->length(); +} + +int +ACE::HTBP::Session::flush_outbound_queue (void) +{ + int result = 0; + if (this->outbound_queue_.message_count() > 0) + { + ACE_Message_Block *msg = 0; + iovec *iov = 0; + ACE_NEW_RETURN (iov, + iovec[this->outbound_queue_.message_count()], + -1); + this->outbound_queue_.peek_dequeue_head (msg); + for (size_t i = 0; i < this->outbound_queue_.message_count(); i++) + { + iov[i].iov_base = msg->rd_ptr(); + iov[i].iov_len = msg->length(); + msg = msg->next(); + } + if (this->outbound_->state() == ACE::HTBP::Channel::Wait_For_Ack) + this->outbound_->recv_ack(); + result = this->outbound_->sendv (iov,this->outbound_queue_.message_count(),0); + delete [] iov; + while (this->outbound_queue_.dequeue_head(msg)) + msg->release(); + } + return result; +} + +int +ACE::HTBP::Session::close_inbound (void) const +{ + return this->inbound_ ? this->inbound_->close() : 0; +} + +int +ACE::HTBP::Session::close_outbound (void) const +{ + return this->outbound_ ? this->outbound_->close() : 0; +} + +int +ACE::HTBP::Session::enable (int flags) +{ + this->sock_flags_ |= flags; + int result = this->inbound_ ? this->inbound_->enable(flags) : 0; + result |= this->outbound_ ? this->outbound_->enable (flags) : 0; + return result; +} + +int +ACE::HTBP::Session::disable (int flags) +{ + this->sock_flags_ &= ~flags; + int result = this->inbound_ ? this->inbound_->disable(flags) : 0; + result |= this->outbound_ ? this->outbound_->disable (flags) : 0; + return result; +} + +ACE::HTBP::Stream * +ACE::HTBP::Session::stream (void)const +{ + return this->stream_; +} + +void +ACE::HTBP::Session::stream (ACE::HTBP::Stream *s) +{ + this->stream_ = s; +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Session.h b/ACE/protocols/ace/HTBP/HTBP_Session.h new file mode 100644 index 00000000000..5b46c6e86b3 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Session.h @@ -0,0 +1,198 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Session.h + * + * $Id$ + * + * @author Phil Mesnier + */ +//============================================================================= + +#ifndef ACE_HTBP_SESSION_H +#define ACE_HTBP_SESSION_H +#include /**/ "ace/pre.h" + +#include "ace/SOCK_IO.h" +#include "ace/Hash_Map_Manager.h" +#include "ace/Synch.h" +#include "ace/Message_Queue.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Addr.h" +#include "HTBP_Export.h" +#include "HTBP_Channel.h" + +#include "HTBP_Stream.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +// Forward declarations. +class ACE_HTBP_Filter; +class ACE_Event_Handler; + +namespace ACE +{ + namespace HTBP + { + + class Session_Id_t + { + public: + ACE_UINT32 id_; + Addr local_; + Addr peer_; + + u_long hash () const; + bool operator ==(const Session_Id_t &other) const; + }; + + /** + * @class Session + * + * @brief Defines the methods in the <Session> abstraction. + * + * A session is an entity that combines two Ht_Channels that connect directly + * to a proxy to manage communication with a remote peer. The Session may + * persist longer than either stream, assuming that the proxy is libel to + * close a connection at any time. + * + * This means that the session needs to be able to reconnect to the remote + * peer. This also means that the session needs to be aware of its location + * If it is outside the proxy and looses a stream, oh well. If it is inside, + * then the next time a stream is required, then it must reconnect before + * returning the stream. + * + * The session does not queue outbound messages. That is going to be the + * responsibility of the application, or a higher level protocol wrapper. + */ + class HTBP_Export Session + { + public: + // Initialization and termination methods. + /// Constructor. + Session (void); + + /// Constructor (sets the underlying session id with <sid>). + Session (const Addr& peer, + const Addr& local, + ACE_UINT32 sid = 0, + ACE_INET_Addr *proxy = 0, + int take_proxy = 0); + Session (const Session_Id_t &id, + ACE_INET_Addr *proxy = 0, + int take_proxy = 0); + + Session (const Session &other); + Session& operator= (const Session &other); + + /// Destructor. + ~Session (void); + + /// The following methods are specific to the Session + static ACE_UINT32 next_session_id (); + + static int add_session (Session *); + static int remove_session (Session *); + static int find_session (const Session_Id_t&, + Session *&out); + + Stream *stream (void) const; + void stream (Stream *); + + int enqueue (ACE_Message_Block *msg); + int flush_outbound_queue (void); + + int close_inbound (void) const; + int close_outbound (void) const; + + /// get references to the actual streams based on the direction + /// of data flow if this session is on the inside of the proxy + /// ie, has a non-null proxy addr, then the inbound stream is + /// the out_to_in stream, otherwise it is the in_to_out + /// stream. The outbound is the opposite of the inbound. + /// Whenever an application wishes to send data, whether that is + /// request or reply data, it uses the outbound stream, and it + /// should associate an event handler with the inbound stream + /// for receiving data. + Channel *inbound (void) const; + Channel *outbound (void) const; + void inbound (Channel *); + void outbound (Channel *); + + int enable (int value); + int disable (int value); + + const Session_Id_t& session_id(void) const; + void session_id (ACE_UINT32 ); + + const ACE_INET_Addr *proxy_addr (void) const; + void proxy_addr (ACE_INET_Addr *, int destroy = 0); + + const Addr &peer_addr (void) const; + const Addr &local_addr (void) const; + + void peer_addr (const Addr &); + void local_addr (const Addr &); + + /// invoke close on both streams, then remove self from session map + int close (void); + + ACE_Event_Handler *handler (void); + void handler (ACE_Event_Handler *); + void reactor (ACE_Reactor *); + void detach (Channel *); + + int sock_flags(void) const; + + private: + /// Connected Stream ensures that the particular stream is + /// connected to the proxy, if possible. The result is same as + /// the reference passed in, so that it may be used inline for + /// the inboundor outbound methods + + void reconnect (); + void reconnect_i (Channel *); + + typedef ACE_Hash_Map_Manager<Session_Id_t, Session*, + ACE_SYNCH_MUTEX> Session_Map; + typedef ACE_Hash_Map_Entry <Session_Id_t, Session*> Map_Entry; + static Session_Map session_map_; + static ACE_UINT32 last_session_id_; + static ACE_SYNCH_MUTEX session_id_lock_; + + ACE_INET_Addr *proxy_addr_; + int destroy_proxy_addr_; + + Session_Id_t session_id_; + + Channel *inbound_; + Channel *outbound_; + + Filter *inbound_filter_; + Filter *outbound_filter_; + + int closed_; + + ACE_Event_Handler *handler_; + ACE_Reactor *reactor_; + + ACE_Message_Queue<ACE_SYNCH> outbound_queue_; + Stream * stream_; + int sock_flags_; + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#if defined (__ACE_INLINE__) +#include "HTBP_Session.inl" +#endif + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_SESSION_H */ diff --git a/ACE/protocols/ace/HTBP/HTBP_Session.inl b/ACE/protocols/ace/HTBP/HTBP_Session.inl new file mode 100644 index 00000000000..06c510650c9 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Session.inl @@ -0,0 +1,114 @@ +// -*- C++ -*- +// +// $Id$ + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +ACE_INLINE +void +ACE::HTBP::Session::reconnect () +{ + if (!this->closed_ && this->proxy_addr_) + { + if (inbound_ && + inbound_->ace_stream().get_handle() == ACE_INVALID_HANDLE) + reconnect_i (inbound_); + if (outbound_ && + outbound_->ace_stream().get_handle() == ACE_INVALID_HANDLE) + reconnect_i (outbound_); + } +} + +ACE_INLINE +void +ACE::HTBP::Session::inbound (ACE::HTBP::Channel *s) +{ + this->inbound_ = s; +} + + +ACE_INLINE +void +ACE::HTBP::Session::outbound (ACE::HTBP::Channel *s) +{ + this->outbound_ = s; +} + +ACE_INLINE +ACE::HTBP::Channel * +ACE::HTBP::Session::inbound (void) const +{ + return this->inbound_; +} + +ACE_INLINE +const ACE::HTBP::Session_Id_t& +ACE::HTBP::Session::session_id(void) const +{ + return this->session_id_; +} + +// I'm wondering if this should be coupled with a map update...? +ACE_INLINE +void +ACE::HTBP::Session::session_id (ACE_UINT32 id) +{ + this->session_id_.id_ = id; +} + +ACE_INLINE +const ACE_INET_Addr * +ACE::HTBP::Session::proxy_addr (void) const +{ + return this->proxy_addr_; +} + +ACE_INLINE +const ACE::HTBP::Addr & +ACE::HTBP::Session::peer_addr (void) const +{ + return this->session_id_.peer_; +} + +ACE_INLINE +const ACE::HTBP::Addr & +ACE::HTBP::Session::local_addr (void) const +{ + return this->session_id_.local_; +} + +ACE_INLINE +void +ACE::HTBP::Session::proxy_addr (ACE_INET_Addr *pa, int destroy) +{ + if (this->destroy_proxy_addr_) + delete this->proxy_addr_; + this->proxy_addr_ = pa; + this->destroy_proxy_addr_ = destroy; +} + +ACE_INLINE +int +ACE::HTBP::Session::sock_flags (void) const +{ + return this->sock_flags_; +} + +//--------------------------------------------------------------------------- +ACE_INLINE +u_long +ACE::HTBP::Session_Id_t::hash () const +{ + return id_; // + local.hash() + peer.hash(); +} + +ACE_INLINE +bool +ACE::HTBP::Session_Id_t::operator ==(const ACE::HTBP::Session_Id_t &other) const +{ + return ((this->id_ == other.id_) && + (this->local_ == other.local_) && + (this->peer_ == other.peer_)); +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Stream.cpp b/ACE/protocols/ace/HTBP/HTBP_Stream.cpp new file mode 100644 index 00000000000..63e8c0df671 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Stream.cpp @@ -0,0 +1,359 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file HTBP_Stream.cpp + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= +#include "HTBP_Stream.h" + +#include "HTBP_Session.h" +#include "HTBP_Filter_Factory.h" + +#include "ace/Message_Block.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +// Initialization and termination methods. +/// Constructor. +ACE::HTBP::Stream::Stream (ACE::HTBP::Session *s) + : session_ (s) +{ + if (s == 0) + // create a temporary session to be replaced on first recv. + ACE_NEW (session_, ACE::HTBP::Session); + session_->stream (this); +} + +/// Destructor. +ACE::HTBP::Stream::~Stream (void) +{ +} + + /// Dump the state of an object. +void +ACE::HTBP::Stream::dump (void) const +{ +} + + + +//--------------------------------------------------------------------------- +// = I/O functions. + +/// Recv an <n> byte buffer from the connected socket. +ssize_t +ACE::HTBP::Stream::recv (void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout) const +{ + if (this->session_->inbound() == 0) + { + errno = EWOULDBLOCK; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream::") + ACE_TEXT("recv(buf,n,flags) called, but no ") + ACE_TEXT("inbound channel connected to stream\n")), + -1); + } + return this->session_->inbound()->recv(buf,n,flags,timeout); +} + + /// Recv an <n> byte buffer from the connected socket. +ssize_t +ACE::HTBP::Stream::recv (void *buf, + size_t n, + const ACE_Time_Value *timeout) const +{ + if (this->session_->inbound() == 0) + { + errno = EWOULDBLOCK; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream::") + ACE_TEXT("recv(buf,n) called, but no ") + ACE_TEXT("inbound channel connected to stream\n")), + -1); + } + return this->session_->inbound()->recv(buf,n,timeout); +} + + /// Recv an <iovec> of size <n> from the connected socket. +ssize_t +ACE::HTBP::Stream::recvv (iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout) const +{ + if (this->session_->inbound() == 0) + { + errno = EWOULDBLOCK; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream::") + ACE_TEXT("recv(iov,iovcnt) called, but no ") + ACE_TEXT("inbound channel connected to stream\n")), + -1); + } + return this->session_->inbound()->recvv(iov,iovcnt,timeout); +} + +ssize_t +ACE::HTBP::Stream::recvv (iovec *io_vec, + const ACE_Time_Value *timeout) const +{ + if (this->session_->inbound() == 0) + { + errno = EWOULDBLOCK; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream::") + ACE_TEXT("recv(io_vec) called, but no ") + ACE_TEXT("inbound channel connected to stream\n")), + -1); + } + return this->session_->inbound()->recvv(io_vec,timeout); +} + +ssize_t +ACE::HTBP::Stream::recv (void *, + size_t, + ACE_OVERLAPPED *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: Asynch ") + ACE_TEXT("recv not supported\n")),-1); +} + +ssize_t +ACE::HTBP::Stream::send (const void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout) const +{ + if (this->session_->outbound() == 0) + { + ACE_Message_Block *msg = 0; + ACE_NEW_RETURN (msg,ACE_Message_Block(n),-1); + msg->copy ((const char *)buf,n); + // probably ought to separately enqueue the flags and put the data buf + // in a continuation message + // Also, the timeout poses another interesting problem. + return this->session_->enqueue(msg); + } + return this->session_->outbound()->send(buf,n,flags,timeout); +} + +ssize_t +ACE::HTBP::Stream::send (const void *buf, + size_t n, + const ACE_Time_Value *timeout) const +{ + if (this->session_->outbound() == 0) + { + ACE_Message_Block *msg = 0; + ACE_NEW_RETURN (msg,ACE_Message_Block(n),-1); + msg->copy ((const char *)buf,n); + return this->session_->enqueue(msg); + } + return this->session_->outbound()->send(buf,n,timeout); +} + +ssize_t +ACE::HTBP::Stream::sendv (const iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout) const +{ + if (this->session_->outbound() == 0) + { + ACE_Message_Block *msg = 0; + size_t total = 0; + int i = 0; + for (; i < iovcnt; i++) + total += iov[i].iov_len; + ACE_NEW_RETURN (msg,ACE_Message_Block(total),-1); + for (i = 0; i < iovcnt; i++) + msg->copy ((const char *)iov[i].iov_base,iov[i].iov_len); + return this->session_->enqueue(msg); + } + return this->session_->outbound()->sendv(iov,iovcnt,timeout); +} + +ssize_t +ACE::HTBP::Stream::send (const void *, + size_t , + ACE_OVERLAPPED *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: Asynch ") + ACE_TEXT("send not supported\n")), + -1); +} + +ssize_t +ACE::HTBP::Stream::recv_n (void *, + size_t , + int , + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: recv_n not supported\n")), + -1); +} + + /// Try to recv exactly <len> bytes into <buf> from the connected socket. +ssize_t +ACE::HTBP::Stream::recv_n (void *, + size_t , + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: recv_n not supported\n")), + -1); +} + + /// Receive an <iovec> of size <iovcnt> from the connected socket. +ssize_t +ACE::HTBP::Stream::recvv_n (iovec [], + int , + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: recvv_n not supported\n")), + -1); +} + + /// Try to send exactly <len> bytes from <buf> to the connection socket. +ssize_t +ACE::HTBP::Stream::send_n (const void *, + size_t , + int , + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: send_n not supported\n")), + -1); +} + + /// Try to send exactly <len> bytes from <buf> to the connected socket. +ssize_t +ACE::HTBP::Stream::send_n (const void *, + size_t , + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: send_n not supported\n")), + -1); +} + + /// Send all the <message_block>s chained through their <next> and + /// <cont> pointers. This call uses the underlying OS gather-write + /// operation to reduce the domain-crossing penalty. +ssize_t +ACE::HTBP::Stream::send_n (const ACE_Message_Block *, + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: send_n not supported\n")), + -1); +} + + /// Send an <iovec> of size <iovcnt> to the connected socket. +ssize_t +ACE::HTBP::Stream::sendv_n (const iovec [], + int, + const ACE_Time_Value *, + size_t *) const +{ + errno = ENOTSUP; + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT("ACE::HTBP::Stream: sendv_n not supported\n")), + -1); +} + +int +ACE::HTBP::Stream::close_reader (void) +{ + return this->session_->close_inbound(); +} + +int +ACE::HTBP::Stream::close_writer (void) +{ + return this->session_->close_outbound(); +} + +int +ACE::HTBP::Stream::close (void) +{ + return this->session_->close(); +} + +int +ACE::HTBP::Stream::enable (int value) const +{ + return this->session_->enable(value); +} + +int +ACE::HTBP::Stream::disable (int value) const +{ + return this->session_->disable(value); +} + +int +ACE::HTBP::Stream::get_local_addr (ACE::HTBP::Addr &local_addr) const +{ + local_addr = this->session_->local_addr (); + return 0; +} + +int +ACE::HTBP::Stream::get_remote_addr (ACE::HTBP::Addr &peer_addr) const +{ + peer_addr = this->session_->peer_addr(); + return 0; +} + +ACE::HTBP::Session * +ACE::HTBP::Stream::session (void) const +{ + return this->session_; +} + +void +ACE::HTBP::Stream::session (ACE::HTBP::Session *s) +{ + delete this->session_; + this->session_ = s; + s->stream (this); +} + + +ACE_HANDLE +ACE::HTBP::Stream::get_handle (void) const +{ + return ACE_INVALID_HANDLE; +} + +void +ACE::HTBP::Stream::set_handle (ACE_HANDLE ) +{ +} + +ACE_END_VERSIONED_NAMESPACE_DECL diff --git a/ACE/protocols/ace/HTBP/HTBP_Stream.h b/ACE/protocols/ace/HTBP/HTBP_Stream.h new file mode 100644 index 00000000000..0527ab06632 --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTBP_Stream.h @@ -0,0 +1,291 @@ +// -*- C++ -*- + +//============================================================================= +/** + * @file HTBP_Stream.h + * + * $Id$ + * + * @author Phil Mesnier, Priyanka Gontla + */ +//============================================================================= + +#ifndef ACE_HTBP_STREAM_H +#define ACE_HTBP_STREAM_H +#include /**/ "ace/pre.h" + +#include "ace/SOCK_Stream.h" +#include "ace/Message_Block.h" + +#if !defined (ACE_LACKS_PRAGMA_ONCE) +# pragma once +#endif /* ACE_LACKS_PRAGMA_ONCE */ + +#include "HTBP_Export.h" +#include "HTBP_Addr.h" +#include "HTBP_Filter.h" +#include "HTBP_Notifier.h" + +ACE_BEGIN_VERSIONED_NAMESPACE_DECL + +namespace ACE +{ + namespace HTBP + { + // Forward declarations. + class Session; + + /** + * @class Stream + * + * @brief A stream facade for the HTTP Tunneling Bidirectional Protocol + * + * This adds provides the common interface for applications to do + * I/O that ultimately is wrapped in HTTP for tunneling through + * firewalls. + * + * This class is modelled after the ACE_SOCK_Stream class, and + * provides all of the I/O methods available in that class. Since + * this is a facade, it does not derive from ACE_SOCK_Stream. It + * also does not provide the same performance as ACE_SOCK_Stream, + * as some data may have to be stored by the HTBP classes prior to + * transmission. + * + * <buf> is the buffer to write from or receive into. + * <len> is the number of bytes to transfer. + * The <timeout> parameter in the following methods indicates how + * long to blocking trying to transfer data. If <timeout> == 0, + * then the call behaves as a normal send/recv call, i.e., for + * blocking sockets, the call will block until action is possible; + * for non-blocking sockets, EWOULDBLOCK will be returned if no + * action is immediately possible. + * If <timeout> != 0, the call will wait for data to arrive no longer + * than the relative time specified in *<timeout>. + * The "_n()" I/O methods keep looping until all the data has been + * transferred. These methods also work for sockets in non-blocking + * mode i.e., they keep looping on EWOULDBLOCK. <timeout> is used + * to make sure we keep making progress, i.e., the same timeout + * value is used for every I/O operation in the loop and the timeout + * is not counted down. + * The return values for the "*_n()" methods match the return values + * from the non "_n()" methods and are specified as follows: + * - On complete transfer, the number of bytes transferred is returned. + * - On timeout, -1 is returned, errno == ETIME. + * - On error, -1 is returned, errno is set to appropriate error. + * - On EOF, 0 is returned, errno is irrelevant. + * + * On partial transfers, i.e., if any data is transferred before + * timeout/error/EOF, <bytes_transferred> will contain the number of + * bytes transferred. + * Methods with <iovec> parameter are I/O vector variants of the I/O + * operations. + * Methods with the extra <flags> argument will always result in + * <send> getting called. Methods without the extra <flags> argument + * will result in <send> getting called on Win32 platforms, and + * <write> getting called on non-Win32 platforms. + */ + class HTBP_Export Stream + { + public: + // Initialization and termination methods. + /// Constructor. + Stream (Session *s = 0); + + /// Destructor. + ~Stream (void); + + // = I/O functions. + + /// The Stream is a sibling of the ACE_SOCK_IO class, rather + /// than a decendant. This is due to the requirement to wrap all + /// messages with an HTTP request or reply wrapper, and to send + /// application data in only one direction on one stream. + + /// Recv an <n> byte buffer from the connected socket. + ssize_t recv (void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout = 0) const; + + /// Recv an <n> byte buffer from the connected socket. + ssize_t recv (void *buf, + size_t n, + const ACE_Time_Value *timeout = 0) const; + + /// Recv an <iovec> of size <n> from the connected socket. + ssize_t recvv (iovec iov[], + int n, + const ACE_Time_Value *timeout = 0) const; + + /// Same as above. Deprecated. + ssize_t recv (iovec iov[], + size_t n, + const ACE_Time_Value *timeout = 0) const; + + /** + * Allows a client to read from a socket without having to + * provide a buffer to read. This method determines how much + * data is in the socket, allocates a buffer of this size, reads + * in the data, and returns the number of bytes read. The + * caller is responsible for deleting the member in the + * <iov_base> field of <io_vec> using delete [] + * io_vec->iov_base. + */ + ssize_t recvv (iovec *io_vec, + const ACE_Time_Value *timeout = 0) const; + + /// Recv <n> bytes via Win32 <ReadFile> using overlapped I/O. + ssize_t recv (void *buf, + size_t n, + ACE_OVERLAPPED *overlapped) const; + + /// Send an <n> byte buffer to the connected socket. + ssize_t send (const void *buf, + size_t n, + int flags, + const ACE_Time_Value *timeout = 0) const; + + /// Send an <n> byte buffer to the connected socket. + ssize_t send (const void *buf, + size_t n, + const ACE_Time_Value *timeout = 0) const; + + /// Send an <iovec> of size <n> to the connected socket. + ssize_t sendv (const iovec iov[], + int n, + const ACE_Time_Value *timeout = 0) const; + + /// Send <n> bytes via Win32 <WriteFile> using overlapped I/O. + ssize_t send (const void *buf, + size_t n, + ACE_OVERLAPPED *overlapped) const; + + + /// Try to recv exactly <len> bytes into <buf> from the + /// connected socket. + ssize_t recv_n (void *buf, + size_t len, + int flags, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Try to recv exactly <len> bytes into <buf> from the + /// connected socket. + ssize_t recv_n (void *buf, + size_t len, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Receive an <iovec> of size <iovcnt> from the connected + /// socket. + ssize_t recvv_n (iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Try to send exactly <len> bytes from <buf> to the connection + /// socket. + ssize_t send_n (const void *buf, + size_t len, + int flags, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Try to send exactly <len> bytes from <buf> to the connected + /// socket. + ssize_t send_n (const void *buf, + size_t len, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Send all the <message_block>s chained through their <next> + /// and <cont> pointers. This call uses the underlying OS + /// gather-write operation to reduce the domain-crossing + /// penalty. + ssize_t send_n (const ACE_Message_Block *message_block, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + /// Send an <iovec> of size <iovcnt> to the connected socket. + ssize_t sendv_n (const iovec iov[], + int iovcnt, + const ACE_Time_Value *timeout = 0, + size_t *bytes_transferred = 0) const; + + // = Selectively close endpoints. / Close down the reader. + int close_reader (void); + + /// Close down the writer. + int close_writer (void); + + /** + * Close down the socket (we need this to make things work correctly + * on Win32, which requires use to do a <close_writer> before doing + * the close to avoid losing data). + */ + int close (void); + + // = Meta-type info + typedef Addr PEER_ADDR; + + /// Dump the state of an object. + void dump (void) const; + + /// Declare the dynamic allocation hooks. + ACE_ALLOC_HOOK_DECLARE; + + public: + ///@notes Added the following methods to continue with + /// current compilation of HTIOP. Might not be needed in + /// future. - Priyanka + /// {@ + void set_handle (ACE_HANDLE h); + ACE_HANDLE get_handle (void) const; + + Session *session (void) const; + void session (Session *s); + + /** + * Enable asynchronous I/O (ACE_SIGIO), urgent data (ACE_SIGURG), + * non-blocking I/O (ACE_NONBLOCK), or close-on-exec (ACE_CLOEXEC), + * which is passed as the <value>. + */ + int enable (int value) const; + + /** + * Disable asynchronous I/O (ACE_SIGIO), urgent data (ACE_SIGURG), + * non-blocking I/O (ACE_NONBLOCK), or close-on-exec (ACE_CLOEXEC), + * which is passed as the <value>. + */ + int disable (int value) const; + + /** + * Return the address of the remotely connected peer (if there is + * one), in the referenced <ACE_Addr>. Returns 0 if successful, else + * -1. + */ + int get_remote_addr (Addr &) const; + + /** + * Return the address of the remotely connected peer (if there is + * one), in the referenced <ACE_Addr>. Returns 0 if successful, else + * -1. + */ + int get_local_addr (Addr &) const; + + //@} + + private: + /// The session_ is a reference to the persistent session this stream is + /// associated with. On the inside, sessions are created by the Connector, + /// which then create streams on demand. + Session *session_; + + }; + } +} + +ACE_END_VERSIONED_NAMESPACE_DECL + +#include /**/ "ace/post.h" +#endif /* ACE_HTBP_STREAM_H */ diff --git a/ACE/protocols/ace/HTBP/HTID_Generator.cgi b/ACE/protocols/ace/HTBP/HTID_Generator.cgi new file mode 100644 index 00000000000..0d57a552d6c --- /dev/null +++ b/ACE/protocols/ace/HTBP/HTID_Generator.cgi @@ -0,0 +1,78 @@ +#!/usr/bin/perl + +# This is a sample script that may be used to create domain specific +# unique ID's rather than using UUID. In a typical scenario, this HTID +# generator would reside in the same domain as the target server, so +# that it could be used by any clients of the server. While use of UUID +# as a unique identifier is perfectly fine, use of a remote generator +# gives additional benefits, such as the ability to collect metrics on +# identity requestors. + +print "Content-type:text/html\n\n"; + +$htid = $ENV{'REMOTE_ADDR'}; + +open(IN,"counts.txt") or dienice("Can't open counts.txt for writing: $!"); + +flock(IN,2); # lock the file +seek(IN,0,0); # rewind it to the beginning + +$oldtimestamp = <IN>; # read only the first line. +$save_time = $oldtimestamp; + +close(IN); + + +$timestamp = time; + +if ($oldtimestamp == $timestamp) + { + open (IN2, "sec_id.txt") or + dienice ("Can't open sec_id.txt for writing: $!"); + + flock (IN2, 2); + seek (IN2, 0, 0); + + $secondary_id = <IN2>; + + # increment the secondary id + ++$secondary_id; + + + close (IN2); + } +else { + + $secondary_id = 0; +} + + open(OUT2,">sec_id.txt") or + dienice("Can't open outdata.txt for writing: $!"); + + flock(OUT2,2); # lock the file + seek(OUT2,0,0); # rewind it to the beginning + + print OUT2 "$secondary_id"; + + close(OUT2); + + +#save the timestamp for next time +$oldtimestamp = $timestamp; + + +open(OUT,">counts.txt") or dienice("Can't open outdata.txt for writing: $!"); + +flock(OUT,2); # lock the file +seek(OUT,0,0); # rewind it to the beginning +print OUT "$oldtimestamp\n"; +close(OUT); + + +$result_string = join (".$timestamp", $htid, ".$secondary_id"); + +print <<EndOfHTML; + +$result_string + +EndOfHTML diff --git a/ACE/protocols/ace/HTBP/Makefile.am b/ACE/protocols/ace/HTBP/Makefile.am new file mode 100644 index 00000000000..bd824576b86 --- /dev/null +++ b/ACE/protocols/ace/HTBP/Makefile.am @@ -0,0 +1,89 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +includedir = @includedir@/ace/HTBP +pkgconfigdir = @libdir@/pkgconfig + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.HTBP.am + +if !BUILD_ACE_FOR_TAO + +lib_LTLIBRARIES = libACE_HTBP.la + +libACE_HTBP_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DHTBP_BUILD_DLL + +libACE_HTBP_la_SOURCES = \ + HTBP_Addr.cpp \ + HTBP_Channel.cpp \ + HTBP_Environment.cpp \ + HTBP_Filter.cpp \ + HTBP_Filter_Factory.cpp \ + HTBP_ID_Requestor.cpp \ + HTBP_Inside_Squid_Filter.cpp \ + HTBP_Notifier.cpp \ + HTBP_Outside_Squid_Filter.cpp \ + HTBP_Session.cpp \ + HTBP_Stream.cpp + +libACE_HTBP_la_LDFLAGS = \ + -release @ACE_VERSION_NAME@ + +libACE_HTBP_la_LIBADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +nobase_include_HEADERS = \ + HTBP_Addr.h \ + HTBP_Channel.h \ + HTBP_Channel.inl \ + HTBP_Environment.h \ + HTBP_Export.h \ + HTBP_Filter.h \ + HTBP_Filter.inl \ + HTBP_Filter_Factory.h \ + HTBP_ID_Requestor.h \ + HTBP_Inside_Squid_Filter.h \ + HTBP_Inside_Squid_Filter.inl \ + HTBP_Macros.h \ + HTBP_Notifier.h \ + HTBP_Outside_Squid_Filter.h \ + HTBP_Outside_Squid_Filter.inl \ + HTBP_Session.h \ + HTBP_Session.inl \ + HTBP_Stream.h + +pkgconfig_DATA = \ + ACE_HTBP.pc + +CLEANFILES = \ + ACE_HTBP.pc + +ACE_HTBP.pc: ${top_builddir}/config.status ${srcdir}/ACE_HTBP.pc.in + ${top_builddir}/config.status --file $@:${srcdir}/ACE_HTBP.pc.in + +endif !BUILD_ACE_FOR_TAO + +EXTRA_DIST = \ + ACE_HTBP.pc.in + + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/protocols/ace/HTBP/README b/ACE/protocols/ace/HTBP/README new file mode 100644 index 00000000000..577aaf15866 --- /dev/null +++ b/ACE/protocols/ace/HTBP/README @@ -0,0 +1,186 @@ +// $Id$ + +This directory contains the HTTP Tunneling, Bidirectional, Protocol +implementation. This is a new streaming abstraction layered over an +HTTP document request/reply mechanism. It is designed to allow clients +that are inside a of a corporate firewall to communicate with servers +that are outside, hence HTTP Tunneling. Also, once a connection is +established, the outside peer is able to send asynchronous messages to +the inside peer, hence Bidirectional. + +HTBP provides Acceptor, Connector, and Stream classes that follow the +interface defined for the ACE_Acceptor and ACE_Connector +templates. This means that HTBP can be used right away with +applications designed to use these templates. + +Bidirectionality is achieved in the context of the proxy's restriction +by using two channels between the peers. One channel is defined for +data flow from the inside to the outside, data flow from the outside +in occurs on the other channel. In-to-out data is passed in the form +of a PUT command with the data payload marshalled into an +"application/gzip" buffer. On this channel, the outside peer always +responds with a simple document which serves as an ack. On the +out-to-in channel, the inside client must send a token request in the +form of a GET command. This request goes unfulfilled until the outside +peer has data to send, which it does by supplying an HTML document +that again encapsulates the data so that it may pass through the proxy +uncorrupted. + +The connections from the inside peer to the proxy, or from the proxy +to the outside peer may be closed by the proxy at any time. The inside +peer will automatically reconnect as needed when this happens. Since +the outside peer cannot actively connect to the proxy, it will queue +outbound messages as long as it can until a new out-to-in channel is +made available, at which time it will flush its queue. The sense of +channels may change over time, as the proxy may choose any local +connection to the server to send any request (GET or POST). + +The outside peer is identified using an ordinary INET addr, however +the inside peer uses a simple, transient unique ID to identify +itself. Inside peers will never have any type of persistent identity. +The unique ID used to identify the inside peer is usually a UUID value +as composed by ACE_UUID_Generator. However, a domain based unique ID +may also be obtainedusing HTBP::ID_Requestor::get_HTID(). If no domain +based ID generator is configured, get_HTID() will return a UUID value. + +As there are a variety of HTTP proxies available, HTBP uses a +pluggable filter class that defines the particular characteristics of +a proxy and is responsible for marshalling and unmarshalling binary +data. As of now there is a single filter available that works with a +defaulted Squid proxy and may also be used as a null filter, directly +connecting the inside and outside peers. This mode is useful for +testing. + +CONFIGURING HTBP +This is done through the ACE_Configuration framework. On windows +platforms, the Windows Registry may be used, whereas on non-windows, a +flat file is used to configure. Configuration data may also be +persisted in a memory mapped file. + +The configuration map contains a single section, HTBP, that contains +all the configurable parameters in name=value form. The following is +an example of a configuration file: + +[htbp] +proxy_host=<hostname> This is the hostname of the http + proxy through which all requests + will be sent. +proxy_port=<port> This is the proxy's port. +htid_url=<url> If a domain based unique id is + required, this is the URL of the + ID generator. +htid_via_proxy=<1|0> If the htid_url must be reached + via the proxy, set this to 1. + Default is 0, meaning the ID + generator is directly accessible. + +COMPANION DIRECTORIES: +$ACE_ROOT/tests/HTBP. These are the test drivers, which + also serve as example code. +$TAO_ROOT/orbsvcs/orbsvcs/HTIOP This is a TAO pluggable protocol + based on HTBP. +$TAO_ROOT/orbsvcs/tests/HTIOP The tests for HTIOP. + + +BACKGROUND INFORMATION + +HT Addresses + +The class HT_Addr is a subclass of ACE_INET_Addr class. The interface +for the HT_Addr is a common interface to be used with the inside and +outside peers. The inside peer is identified by a HTID while the +outside peer is identified with an ip address. Constructors are +provided to initialize the inside and outside peers in addition to the +default and copy constructors. addr_to_string and string_to_addr +methods from the base class are overridden to help convert the HT_Addr +to a string and vice versa. Finally, the class provides accessor +methods for the default local address and the default proxy addresses. + +The local address is the address of the inside peer and is obtained +using the singleton HTID_Requestor class. The HTID_Requestor class +sends a request to the web server that is running at the htid_url to +get the HTID unique to each inside peer. + +The proxy address is of ACE_INET_Addr type as it is no different to a +regular server. It is obtained using the singleton HT_Environment +class. The HT_Environment class helps read the HT configuration file +and provides acccessors to the proxy address, port and the htid url. + +The code below illustrates the initialization of a local or inside, +remote or outside and the proxy addresses using the classes +aforementioned. + + HT_Addr local(HTID_REQUESTOR::instance()->get_HTID()); + + char hostname [1000]; + if (ACE_OS::hostname (hostname, + 1000) == -1) + { + ACE_DEBUG ((LM_DEBUG, "Could not get the host name\n")); + return -1; + } + + HT_Addr remote (8088, hostname); + + char proxy_address [1000]; + HT_ENVIRONMENT::instance ()->get_proxy_address (proxy_address); + + unsigned int proxy_port; + HT_ENVIRONMENT::instance ()->get_proxy_port (proxy_port); + + ACE_INET_Addr proxy(port, proxy_address); + + +HT Streams + +The class HT_Stream is a sibling of the ACE_SOCK_IO class. It is used +to send and receive messages between two peers identified by their HT +addresses. It is made a sibling of the ACE_SOCK_IO class rather than +a decendant. This is due to the requirement in the HTBP protocol to +wrap all messages with an HTTP request or reply wrapper, and to send +application data in only one direction on one stream. + +HT Sessions + +A session is an entity that combines two HT_Streams that connect +directly to a proxy to manage communication with a remote peer. The +session may persist longer than either stream, assuming that the proxy +is libel to close a connection at any time. This means that the +session needs to be able to reconnect to the remote peer. This also +means that the session needs to be aware of its location. If it is +outside the proxy and looses a stream, nothing can really be done. If +it is inside, then the next time a stream is required, then it must +reconnect before returning the stream. The session does not queue +outbound messages. It will be the responsibility of the application or +a higher level protocol wrapper. + +Each session is identified by a special type, +HT_Session_Id_t. HT_Session_Id_t is a class with three members, the +local address, the peer address and an id of type ACE_UINT32. A +session map, .... + +Besides the default constructor and copy constructors, two other +constructors are provided to initialize a session and are shown below. + + + /// Constructor (sets the underlying session id with <sid>). + HT_Session (const HT_Addr& peer, + const HT_Addr& local = HT_Addr::default_local(), + ACE_UINT32 sid = 0, + ACE_INET_Addr *proxy = 0, + int take_proxy = 0); + + HT_Session (const HT_Session_Id_t &id, + ACE_INET_Addr *proxy = 0, + int take_proxy = 0); + + + +If a session id (sid) is not provided by the user, it is generated +using the static method HT_Session::next_session_id(). + +The following code illustrates the usage of HT_Stream and HT_Session +classes. + +HT_Filters +<TBD> diff --git a/ACE/protocols/ace/Makefile.am b/ACE/protocols/ace/Makefile.am new file mode 100644 index 00000000000..c476b3583bc --- /dev/null +++ b/ACE/protocols/ace/Makefile.am @@ -0,0 +1,15 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +SUBDIRS = \ + HTBP \ + RMCast \ + TMCast + diff --git a/ACE/protocols/ace/RMCast/ACE_RMCast.pc.in b/ACE/protocols/ace/RMCast/ACE_RMCast.pc.in new file mode 100644 index 00000000000..3eb6428d28b --- /dev/null +++ b/ACE/protocols/ace/RMCast/ACE_RMCast.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: ACE_RMCast +Description: ACE Reliable Multicast Library +Requires: ACE +Version: @VERSION@ +Libs: -L${libdir} -lACE_RMCast +Cflags: -I${includedir} diff --git a/ACE/protocols/ace/RMCast/Acknowledge.cpp b/ACE/protocols/ace/RMCast/Acknowledge.cpp new file mode 100644 index 00000000000..055041c0287 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Acknowledge.cpp @@ -0,0 +1,393 @@ +// file : ace/RMCast/Acknowledge.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_stdlib.h" // abort +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "Acknowledge.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Acknowledge:: + Acknowledge (Parameters const& params) + : params_ (params), + hold_ (params.addr_map_size ()), + cond_ (mutex_), + nrtm_timer_ (params_.nrtm_timeout ()), + stop_ (false) + { + } + + void Acknowledge:: + in_start (In_Element* in) + { + Element::in_start (in); + } + + void Acknowledge:: + out_start (Out_Element* out) + { + Element::out_start (out); + + tracker_mgr_.spawn (track_thunk, this); + } + + void Acknowledge:: + out_stop () + { + { + Lock l (mutex_); + stop_ = true; + cond_.signal (); + } + + tracker_mgr_.wait (); + + Element::out_stop (); + } + + void Acknowledge:: + collapse (Queue& q) + { + // I would normally use iterators in the logic below but ACE_Map_Manager + // iterates over entries in no particular order so it is pretty much + // unusable here. Instead we will do slow and cumbersome find's. + // + + u64 sn (q.sn () + 1); + + for (;; ++sn) + { + Queue::ENTRY* e; + + if (q.find (sn, e) == -1 || e->int_id_.lost ()) break; + + Message_ptr m (e->int_id_.msg ()); + q.unbind (sn); + + in_->recv (m); + } + + q.sn (sn - 1); + } + + void Acknowledge:: + track () + { + while (true) + { + Messages msgs; + + { + Lock l (mutex_); + + if (stop_) + break; + + if (hold_.current_size () != 0) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); + i != e; + ++i) + { + Queue& q = (*i).int_id_; + + if (q.current_size () == 0) continue; + + track_queue ((*i).ext_id_, q, msgs); + } + } + + if (--nrtm_timer_ == 0) + { + nrtm_timer_ = params_.nrtm_timeout (); + + // Send NRTM. + // + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + + u32 max_elem (NRTM::max_count (max_payload_size)); + + Profile_ptr nrtm (create_nrtm (max_elem)); + + if (!nrtm.null ()) + { + Message_ptr m (new Message); + m->add (nrtm); + msgs.push_back (m); + + } + } + } + + // Send stuff off. + // + for (Messages::Iterator i (msgs); !i.done (); i.advance ()) + { + Message_ptr* ppm; + i.next (ppm); + + //FUZZ: disable check_for_lack_ACE_OS + send (*ppm); + //FUZZ: enable check_for_lack_ACE_OS + } + + // Go to sleep but watch for "manual cancellation" request. + // + { + //FUZZ: disable check_for_lack_ACE_OS + ACE_Time_Value time (ACE_OS::gettimeofday ()); + //FUZZ: enable check_for_lack_ACE_OS + + time += params_.tick (); + + Lock l (mutex_); + + while (!stop_) + { + if (cond_.wait (&time) == -1) + { + if (errno != ETIME) + ACE_OS::abort (); + else + break; + } + } + + if (stop_) + break; + } + } + } + + void Acknowledge:: + track_queue (Address const& addr, Queue& q, Messages& msgs) + { + unsigned short max_payload_size ( + params_.max_packet_size () - max_service_size); + + u32 max_elem (NAK::max_count (max_payload_size)); + u32 count (0); + + Queue::iterator i (q.begin ()), e (q.end ()); + + // Track existing losses. + // + while (i != e) + { + auto_ptr<NAK> nak (new NAK (addr)); + + // Inner loop that fills NAK profile with up to max_elem elements. + // + for (; i != e && nak->count () < max_elem; ++i) + { + u64 sn ((*i).ext_id_); + Descr& d = (*i).int_id_; + + if (d.lost ()) + { + d.timer (d.timer () - 1); + + if (d.timer () == 0) + { + //@@ Need exp fallback. + // + d.nak_count (d.nak_count () + 1); + d.timer ((d.nak_count () + 1) * params_.nak_timeout ()); + + nak->add (sn); + + ++count; + + // cerr << 6 << "NAK # " << d.nak_count () << ": " + // << addr << " " << sn << endl; + } + } + } + + // Send this NAK. + // + if (nak->count ()) + { + // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns" + // << endl; + + Message_ptr m (new Message); + + m->add (Profile_ptr (nak.release ())); + + msgs.push_back (m); + } + } + + // Detect and record new losses. + // + for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn) + { + if (q.find (sn) == -1) + { + q.bind (sn, Descr (1)); + } + } + } + + void Acknowledge::recv (Message_ptr m) + { + // Handle NRTM. There could be some nasty interaction with code + // that handles data below (like missing message and NAK). This + // is why I hold the lock at the beginning (which may be not very + // efficient). + // + Lock l (mutex_); + + if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id))) + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + u64 sn (nrtm->find ((*i).ext_id_)); + + if (sn != 0) + { + Queue& q = (*i).int_id_; + + u64 old (q.max_sn ()); + + if (old < sn) + { + // Mark as lost. + // + q.bind (sn, Descr (1)); + } + } + } + } + + if (m->find (Data::id) || m->find (NoData::id)) + { + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + u64 sn (static_cast<SN const*> (m->find (SN::id))->num ()); + + Map::ENTRY* e; + + if (hold_.find (from, e) == -1) + { + // First message from this source. + // + hold_.bind (from, Queue (sn)); + in_->recv (m); + } + else + { + Queue& q = e->int_id_; + + if (sn <= q.sn ()) + { + // Duplicate. + // + //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn + // << endl; + } + else if (sn == q.sn () + 1) + { + // Next message. + // + + q.rebind (sn, Descr (m)); + collapse (q); + } + else + { + // Some messages are missing. Insert this one into the queue. + // + q.rebind (sn, Descr (m)); + } + } + } + else + { + l.release (); + + // Just forward it up. + // + in_->recv (m); + } + } + + void Acknowledge::send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + size_t max_payload_size ( + params_.max_packet_size () - max_service_size); + + if (max_payload_size > data->size ()) + { + u32 max_size (max_payload_size - data->size ()); + u32 max_elem (NRTM::max_count (max_size)); + + if (max_elem > 0) + { + Lock l (mutex_); + + Profile_ptr nrtm (create_nrtm (max_elem)); + + if (nrtm.get ()) + m->add (nrtm); + } + } + + nrtm_timer_ = params_.nrtm_timeout (); // Reset timer. + } + + out_->send (m); + } + + Profile_ptr Acknowledge:: + create_nrtm (u32 max_elem) + { + // Prepare NRTM. + // + auto_ptr<NRTM> nrtm (new NRTM ()); + + // Gather the information. + // + { + for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i) + { + Address addr ((*i).ext_id_); + Queue& q = (*i).int_id_; + + //@@ Should look for the highest known number. + // + nrtm->insert (addr, q.sn ()); + + if (--max_elem == 0) + break; + } + } + + if (nrtm->empty ()) + return Profile_ptr (0); + else + return Profile_ptr (nrtm.release ()); + } + + ACE_THR_FUNC_RETURN Acknowledge:: + track_thunk (void* obj) + { + reinterpret_cast<Acknowledge*> (obj)->track (); + return 0; + } +} diff --git a/ACE/protocols/ace/RMCast/Acknowledge.h b/ACE/protocols/ace/RMCast/Acknowledge.h new file mode 100644 index 00000000000..3d7654b1fec --- /dev/null +++ b/ACE/protocols/ace/RMCast/Acknowledge.h @@ -0,0 +1,248 @@ +// file : ace/RMCast/Acknowledge.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_ACKNOWLEDGE_H +#define ACE_RMCAST_ACKNOWLEDGE_H + +#include "ace/Hash_Map_Manager.h" +#include "ace/Thread_Manager.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +#if !defined (ACE_RMCAST_DEFAULT_MAP_SIZE) +#define ACE_RMCAST_DEFAULT_MAP_SIZE 10 +#endif /* ACE_RMCAST_DEFAULT_MAP_SIZE */ + +#if !defined (ACE_RMCAST_DEFAULT_QUEUE_SIZE) +#define ACE_RMCAST_DEFAULT_QUEUE_SIZE 10 +#endif /* ACE_RMCAST_DEFAULT_QUEUE_SIZE */ + +namespace ACE_RMCast +{ + class Acknowledge : public Element + { + public: + Acknowledge (Parameters const& params); + + virtual void + in_start (In_Element* in); + + virtual void + out_start (Out_Element* out); + + virtual void + out_stop (); + + public: + virtual void + recv (Message_ptr m); + + virtual void + send (Message_ptr m); + + // Sun C++ 5.4 can't handle private here. + // + // private: + public: + struct Descr + { + //@@ There should be no default c-tor. + // + Descr () + : nak_count_ (0), timer_ (1) + { + } + + Descr (unsigned long timer) + : nak_count_ (0), timer_ (timer) + { + } + + Descr (Message_ptr m) + : m_ (m) + { + } + + public: + bool + lost () const + { + return m_.get () == 0; + } + + public: + Message_ptr + msg () + { + return m_; + } + + void + msg (Message_ptr m) + { + m_ = m; + } + + public: + unsigned long + nak_count () const + { + return nak_count_; + } + + void + nak_count (unsigned long v) + { + nak_count_ = v; + } + + unsigned long + timer () const + { + return timer_; + } + + void + timer (unsigned long v) + { + timer_ = v; + } + + private: + Message_ptr m_; + + unsigned long nak_count_; + unsigned long timer_; + }; + + private: + struct Queue : ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> + { + typedef ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> Base; + + // Should never be here but required by ACE_Hash_Blah_Blah. + // + Queue () + : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (0), max_sn_ (0) + { + } + + Queue (u64 sn) + : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (sn), max_sn_ (sn) + { + } + + Queue (Queue const& q) + : Base (ACE_RMCAST_DEFAULT_MAP_SIZE), sn_ (q.sn_), max_sn_ (sn_) + { + for (Queue::const_iterator i (q), e (q, 1); i != e; ++i) + { + bind ((*i).ext_id_, (*i).int_id_); + } + } + + public: + int + bind (u64 sn, Descr const& d) + { + int r (Base::bind (sn, d)); + + if (r == 0 && sn > max_sn_) max_sn_ = sn; + + return r; + } + + int + rebind (u64 sn, Descr const& d) + { + int r (Base::rebind (sn, d)); + + if (r == 0 && sn > max_sn_) max_sn_ = sn; + + return r; + } + + int + unbind (u64 sn) + { + int r (Base::unbind (sn)); + + if (r == 0 && sn == max_sn_) + { + for (--max_sn_; max_sn_ >= sn_; --max_sn_) + { + if (find (max_sn_) == 0) break; + } + } + + return r; + } + + public: + u64 + sn () const + { + return sn_; + } + + void + sn (u64 sn) + { + sn_ = sn; + } + + u64 + max_sn () const + { + if (current_size () == 0) return sn_; + + return max_sn_; + } + + private: + u64 sn_, max_sn_; + }; + + typedef + ACE_Hash_Map_Manager_Ex<Address, + Queue, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + private: + void + collapse (Queue& q); + + void + track (); + + void + track_queue (Address const& addr, Queue& q, Messages& msgs); + + Profile_ptr + create_nrtm (u32 max_elem); + + static ACE_THR_FUNC_RETURN + track_thunk (void* obj); + + private: + Parameters const& params_; + + Map hold_; + Mutex mutex_; + Condition cond_; + + unsigned long nrtm_timer_; + + bool stop_; + ACE_Thread_Manager tracker_mgr_; + }; +} + +#endif // ACE_RMCAST_ACKNOWLEDGE_H diff --git a/ACE/protocols/ace/RMCast/Bits.h b/ACE/protocols/ace/RMCast/Bits.h new file mode 100644 index 00000000000..1b45580647b --- /dev/null +++ b/ACE/protocols/ace/RMCast/Bits.h @@ -0,0 +1,26 @@ +// file : ace/RMCast/Bits.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_BITS_H +#define ACE_RMCAST_BITS_H + +#include "ace/Auto_Ptr.h" +#include "ace/Thread_Mutex.h" +#include "ace/Condition_T.h" +#include "ace/Synch_Traits.h" + +namespace ACE_RMCast +{ + typedef ACE_SYNCH_MUTEX Mutex; + typedef ACE_Guard<Mutex> Lock; + typedef ACE_Condition<Mutex> Condition; + + // tmp + // + //using std::cerr; + //using std::endl; +} + + +#endif // ACE_RMCAST_BITS_H diff --git a/ACE/protocols/ace/RMCast/Flow.cpp b/ACE/protocols/ace/RMCast/Flow.cpp new file mode 100644 index 00000000000..c5d9f63a754 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Flow.cpp @@ -0,0 +1,136 @@ +// file : ace/RMCast/Flow.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Flow.h" + +#include "ace/OS_NS_unistd.h" // sleep +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "ace/os_include/os_math.h" // exp + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Flow:: + Flow (Parameters const& params) + : params_ (params), + nak_time_ (0, 0), + sample_start_time_ (0, 0), + sample_bytes_ (0), + current_tput_ (0.0), + cap_tput_ (0.0) + { + } + + void Flow::send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + ACE_Time_Value now_time (ACE_OS::gettimeofday ()); + + Lock l (mutex_); + sample_bytes_ += data->size (); + + if (sample_start_time_ == ACE_Time_Value (0, 0)) + { + sample_start_time_ = now_time; + } + else + { + ACE_Time_Value delta (now_time - sample_start_time_); + + if (delta > ACE_Time_Value (0, 2000)) + { + current_tput_ = + double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ()); + + // cerr << "tput: " << current_tput_ << " bytes/usec" << endl; + + sample_bytes_ = 0; + sample_start_time_ = ACE_Time_Value (0, 0); + } + } + + if (cap_tput_ != 0.0 + && current_tput_ != 0.0 + && current_tput_ > cap_tput_) + { + double dev = (current_tput_ - cap_tput_) / current_tput_; + + // cerr << "deviation: " << dev << endl; + + // Cap decay algorithm. + // + { + ACE_Time_Value delta (now_time - nak_time_); + + unsigned long msec = delta.msec (); + + double x = msec / -16000.0; + double y = 1.0 * exp (x); + cap_tput_ = cap_tput_ / y; + + // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl; + } + + l.release (); + + + timespec time; + time.tv_sec = 0; + time.tv_nsec = static_cast<unsigned long> (dev * 500000.0); + + // Don't bother to sleep if the time is less than 10 usec. + // + if (time.tv_nsec > 10000) + ACE_OS::sleep (ACE_Time_Value (time)); + } + } + + out_->send (m); + } + + void Flow::recv (Message_ptr m) + { + if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + if (nak->address () == to) + { + // This one is for us. + // + + //cerr << "NAK from " + // << static_cast<From const*> (m->find (From::id))->address () + // << " for " << nak->count () << " sns." << endl; + + + ACE_Time_Value nak_time (ACE_OS::gettimeofday ()); + + Lock l (mutex_); + + nak_time_ = nak_time; + + if (cap_tput_ == 0.0) + cap_tput_ = current_tput_; + + if (cap_tput_ != 0.0) + { + cap_tput_ = cap_tput_ - cap_tput_ / 6.0; + + // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl; + } + } + } + + in_->recv (m); + } +} + diff --git a/ACE/protocols/ace/RMCast/Flow.h b/ACE/protocols/ace/RMCast/Flow.h new file mode 100644 index 00000000000..e64d0d438a2 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Flow.h @@ -0,0 +1,43 @@ +// file : ace/RMCast/Flow.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_FLOW_H +#define ACE_RMCAST_FLOW_H + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Flow : public Element + { + public: + Flow (Parameters const& params); + + public: + virtual void + send (Message_ptr m); + + virtual void + recv (Message_ptr m); + + private: + Parameters const& params_; + + Mutex mutex_; + ACE_Time_Value nak_time_; + + // Throughput sampling. + // + ACE_Time_Value sample_start_time_; + unsigned long sample_bytes_; + double current_tput_; + double cap_tput_; + }; +} + + +#endif // ACE_RMCAST_FLOW_H diff --git a/ACE/protocols/ace/RMCast/Fragment.cpp b/ACE/protocols/ace/RMCast/Fragment.cpp new file mode 100644 index 00000000000..9a483e7cfbe --- /dev/null +++ b/ACE/protocols/ace/RMCast/Fragment.cpp @@ -0,0 +1,77 @@ +// file : ace/RMCast/Fragment.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Fragment.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Fragment:: + Fragment (Parameters const& params) + : params_ (params), + sn_ (1) + { + } + + void Fragment::send (Message_ptr m) + { + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + size_t max_payload_size ( + params_.max_packet_size () - max_service_size); + + if (data->size () <= max_payload_size) + { + u64 sn; + { + Lock l (mutex_); + sn = sn_++; + } + + m->add (Profile_ptr (new SN (sn))); + + out_->send (m); + return; + } + + char const* p = data->buf (); + size_t size (data->size ()); + + // Need fragmentation. + // + u32 packets (size / max_payload_size + (size % max_payload_size ? 1 : 0)); + + // cerr << "size : " << size << endl + // << "packs: " << packets << endl; + + for (u32 i (1); i <= packets; ++i) + { + Message_ptr part (new Message); + + size_t s (i == packets ? size % max_payload_size : max_payload_size); + + // cerr << "pack: " << s << endl; + + u64 sn; + { + Lock l (mutex_); + sn = sn_++; + } + + part->add (Profile_ptr (new SN (sn))); + part->add (Profile_ptr (new Part (i, packets, size))); + part->add (Profile_ptr (new Data (p, s))); + + out_->send (part); + + p += s; + } + } + } +} diff --git a/ACE/protocols/ace/RMCast/Fragment.h b/ACE/protocols/ace/RMCast/Fragment.h new file mode 100644 index 00000000000..bfaa3044c83 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Fragment.h @@ -0,0 +1,32 @@ +// file : ace/RMCast/Fragment.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_FRAGMENT_H +#define ACE_RMCAST_FRAGMENT_H + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Fragment : public Element + { + public: + Fragment (Parameters const& params); + + public: + virtual void + send (Message_ptr m); + + Parameters const& params_; + + private: + Mutex mutex_; + u64 sn_; + }; +} + +#endif // ACE_RMCAST_FRAGMENT_H diff --git a/ACE/protocols/ace/RMCast/Link.cpp b/ACE/protocols/ace/RMCast/Link.cpp new file mode 100644 index 00000000000..12e6003d456 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Link.cpp @@ -0,0 +1,337 @@ +// file : ace/RMCast/Link.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_time.h" +#include "ace/OS_NS_sys_socket.h" + +#include "Link.h" + +namespace ACE_RMCast +{ + Link:: + ~Link () + { + ssock_.close (); + rsock_.close (); + } + + Link:: + Link (Address const& addr, Parameters const& params) + : params_ (params), + addr_ (addr), + ssock_ (Address (static_cast<unsigned short> (0), + static_cast<ACE_UINT32> (INADDR_ANY)), + AF_INET, + IPPROTO_UDP, + 1), + stop_ (false) + + { + ACE_OS::srand ((unsigned int) ACE_OS::time (0)); + + + rsock_.set_option (IP_MULTICAST_LOOP, 0); + // rsock_.set_option (IP_MULTICAST_TTL, 0); + + // Set recv/send buffers. + // + { + int r (131070); + int s (sizeof (r)); + + static_cast<ACE_SOCK&> (rsock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + static_cast<ACE_SOCK&> (ssock_).set_option ( + SOL_SOCKET, SO_RCVBUF, &r, s); + + rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "recv buffer size: " << r << endl; + + ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s); + //cerr << 5 << "send buffer size: " << r << endl; + + } + + // Bind address and port. + // + if (ACE_OS::connect (ssock_.get_handle (), + reinterpret_cast<sockaddr*> (addr_.get_addr ()), + addr_.get_addr_size ()) == -1) + { + ACE_OS::perror ("connect: "); + ACE_OS::abort (); + } + + + ssock_.get_local_addr (self_); + + //cerr << 5 << "self: " << self_ << endl; + } + + void Link:: + in_start (In_Element* in) + { + Element::in_start (in); + + rsock_.join (addr_); + + // Start receiving thread. + // + recv_mgr_.spawn (recv_thunk, this); + } + + void Link:: + out_start (Out_Element* out) + { + Element::out_start (out); + } + + void Link:: + in_stop () + { + // Stop receiving thread. + // + { + Lock l (mutex_); + stop_ = true; + } + recv_mgr_.wait (); + + Element::in_stop (); + } + + void Link::send (Message_ptr m) + { + // Simulate message loss and reordering. + // + if (params_.simulator ()) + { + if ((ACE_OS::rand () % 17) != 0) + { + Lock l (mutex_); + + if (hold_.get ()) + { + send_ (m); + send_ (hold_); + hold_ = Message_ptr (0); + } + else + { + if ((ACE_OS::rand () % 17) != 0) + { + send_ (m); + } + else + { + hold_ = m; + + // Make a copy in M so that the reliable loop below + // won't add FROM and TO to HOLD_. + // + m = hold_->clone (); + } + } + } + } + else + send_ (m); + + // Reliable loop. + // + m->add (Profile_ptr (new From (self_))); + m->add (Profile_ptr (new To (self_))); + + in_->recv (m); + } + + void Link:: + send_ (Message_ptr m) + { + ostream os (m->size (), 1); // Always little-endian. + + os << *m; + + if (os.length () > size_t (params_.max_packet_size ())) + { + ACE_ERROR ((LM_ERROR, + "packet length (%d) exceeds max_poacket_size (%d)\n", + os.length (), params_.max_packet_size ())); + + for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ()) + { + ACE_ERROR ((LM_ERROR, + "profile id: %d; size: %d\n", + (*i).ext_id_, (*i).int_id_->size ())); + } + + ACE_OS::abort (); + } + + ssock_.send (os.buffer (), os.length (), addr_); + + /* + if (m->find (nrtm::id)) + { + ACE_OS::write (1, os.buffer (), os.length ()); + ACE_OS::exit (1); + } + */ + } + + void Link::recv () + { + size_t max_packet_size (params_.max_packet_size ()); + + // This is wicked. + // + ACE_Auto_Ptr<char> holder ( + reinterpret_cast<char*> ( + operator new (max_packet_size + ACE_CDR::MAX_ALIGNMENT))); + + char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT); + + size_t size (0); + + while (true) + { + //@@ Should I lock here? + // + + Address addr; + + // Block for up to one tick waiting for an incomming message. + // + for (;;) + { + ACE_Time_Value t (params_.tick ()); + ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t); + + + // Check for cancellation request. + // + { + Lock l (mutex_); + if (stop_) + return; + } + + if (r == -1) + { + if (errno != ETIME) + ACE_OS::abort (); + } + else + { + size = static_cast<size_t> (r); + break; + } + } + + + if (size != 4 || addr == self_) + { + // Discard bad messages and ones from ourselvs since + // we are using reliable loopback. + // + rsock_.recv (data, 0, addr); + continue; + } + + u32 msg_size; + { + istream is (data, size, 1); // Always little-endian. + is >> msg_size; + } + + if (msg_size <= 4 || msg_size > max_packet_size) + { + // Bad message. + // + rsock_.recv (data, 0, addr); + continue; + } + + size = rsock_.recv (data, max_packet_size, addr); + + if (msg_size != size) + { + // Bad message. + // + continue; + } + + //cerr << 6 << "from: " << addr << endl; + + Message_ptr m (new Message ()); + + m->add (Profile_ptr (new From (addr))); + m->add (Profile_ptr (new To (self_))); + + istream is (data, size, 1); // Always little-endian. + + is >> msg_size; + + while (true) + { + u16 id, size; + + if (!((is >> id) && (is >> size))) break; + + //cerr << 6 << "reading profile with id " << id << " " + // << size << " bytes long" << endl; + + Profile::Header hdr (id, size); + + if (id == SN::id) + { + m->add (Profile_ptr (new SN (hdr, is))); + } + else if (id == Data::id) + { + m->add (Profile_ptr (new Data (hdr, is))); + } + else if (id == NAK::id) + { + m->add (Profile_ptr (new NAK (hdr, is))); + } + else if (id == NRTM::id) + { + m->add (Profile_ptr (new NRTM (hdr, is))); + } + else if (id == NoData::id) + { + m->add (Profile_ptr (new NoData (hdr, is))); + } + else if (id == Part::id) + { + m->add (Profile_ptr (new Part (hdr, is))); + } + else + { + //cerr << 0 << "unknown profile id " << hdr.id () << endl; + ACE_OS::abort (); + } + } + + in_->recv (m); + } + } + + ACE_THR_FUNC_RETURN Link:: + recv_thunk (void* obj) + { + reinterpret_cast<Link*> (obj)->recv (); + return 0; + } + + void Link::recv (Message_ptr) + { + ACE_OS::abort (); + } +} diff --git a/ACE/protocols/ace/RMCast/Link.h b/ACE/protocols/ace/RMCast/Link.h new file mode 100644 index 00000000000..6397322556f --- /dev/null +++ b/ACE/protocols/ace/RMCast/Link.h @@ -0,0 +1,73 @@ +// file : ace/RMCast/Link.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_LINK_H +#define ACE_RMCAST_LINK_H + +#include "ace/SOCK_Dgram.h" +#include "ace/SOCK_Dgram_Mcast.h" + +#include "ace/Thread_Manager.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Link : public Element + { + public: + ~Link (); + + Link (Address const& addr, Parameters const& params); + + virtual void + in_start (In_Element* in); + + virtual void + out_start (Out_Element* out); + + virtual void + in_stop (); + + public: + virtual void + send (Message_ptr m); + + private: + virtual void + send_ (Message_ptr m); + + private: + void + recv (); + + static ACE_THR_FUNC_RETURN + recv_thunk (void* obj); + + private: + virtual void + recv (Message_ptr); + + private: + Parameters const& params_; + + Address addr_, self_; + ACE_SOCK_Dgram_Mcast rsock_; + ACE_SOCK_Dgram ssock_; + + bool stop_; + ACE_Thread_Manager recv_mgr_; + + // Simulator. + // + Message_ptr hold_; + + Mutex mutex_; + }; +} + + +#endif // ACE_RMCAST_LINK_H diff --git a/ACE/protocols/ace/RMCast/Makefile.am b/ACE/protocols/ace/RMCast/Makefile.am new file mode 100644 index 00000000000..51e73d4041e --- /dev/null +++ b/ACE/protocols/ace/RMCast/Makefile.am @@ -0,0 +1,85 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +includedir = @includedir@/ace/RMCast +pkgconfigdir = @libdir@/pkgconfig + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.RMCast.am + +if BUILD_THREADS +if !BUILD_ACE_FOR_TAO + +lib_LTLIBRARIES = libACE_RMCast.la + +libACE_RMCast_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DACE_RMCAST_BUILD_DLL + +libACE_RMCast_la_SOURCES = \ + Acknowledge.cpp \ + Flow.cpp \ + Fragment.cpp \ + Link.cpp \ + Protocol.cpp \ + Reassemble.cpp \ + Retransmit.cpp \ + Simulator.cpp \ + Socket.cpp \ + Stack.cpp + +libACE_RMCast_la_LDFLAGS = \ + -release @ACE_VERSION_NAME@ + +libACE_RMCast_la_LIBADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +nobase_include_HEADERS = \ + Acknowledge.h \ + Bits.h \ + Flow.h \ + Fragment.h \ + Link.h \ + Parameters.h \ + Protocol.h \ + RMCast_Export.h \ + Reassemble.h \ + Retransmit.h \ + Simulator.h \ + Socket.h \ + Stack.h + +pkgconfig_DATA = \ + ACE_RMCast.pc + +CLEANFILES = \ + ACE_RMCast.pc + +ACE_RMCast.pc: ${top_builddir}/config.status ${srcdir}/ACE_RMCast.pc.in + ${top_builddir}/config.status --file $@:${srcdir}/ACE_RMCast.pc.in + +endif !BUILD_ACE_FOR_TAO +endif BUILD_THREADS + +EXTRA_DIST = \ + ACE_RMCast.pc.in + + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/protocols/ace/RMCast/Parameters.h b/ACE/protocols/ace/RMCast/Parameters.h new file mode 100644 index 00000000000..d254bd00fad --- /dev/null +++ b/ACE/protocols/ace/RMCast/Parameters.h @@ -0,0 +1,109 @@ +// file : ace/RMCast/Parameters.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_PARAMETERS_H +#define ACE_RMCAST_PARAMETERS_H + +#include "ace/Time_Value.h" + + +namespace ACE_RMCast +{ + class Parameters + { + public: + Parameters ( + // Loss and reordering simulator. + // + bool simulator = false, + + // MTU(1500) - IP-hdr - UDP-hdr + // + unsigned short max_packet_size = 1470, + + // How often various worker threads wakeup to do + // cacellation checks, queue run-throughs, etc. + // + ACE_Time_Value const& tick = ACE_Time_Value (0, 2000), + + // How long to wait, in ticks, before sending NAK. + // + unsigned long nak_timeout = 1, + + // How long to wait, in ticks, before sending NRTM. + // + unsigned long nrtm_timeout = 10, + + // How long to retain a message for retransmission, in ticks. + // + unsigned long retention_timeout = 500, // 1 sec + + size_t addr_map_size = 50 + ) + : simulator_ (simulator), + max_packet_size_ (max_packet_size), + tick_ (tick), + nak_timeout_ (nak_timeout), + nrtm_timeout_ (nrtm_timeout), + retention_timeout_ (retention_timeout), + addr_map_size_(addr_map_size) + { + } + + public: + bool + simulator () const + { + return simulator_; + } + + unsigned short + max_packet_size () const + { + return max_packet_size_; + } + + ACE_Time_Value const& + tick () const + { + return tick_; + } + + unsigned long + nak_timeout () const + { + return nak_timeout_; + } + + unsigned long + nrtm_timeout () const + { + return nrtm_timeout_; + } + + unsigned long + retention_timeout () const + { + return retention_timeout_; + } + + size_t + addr_map_size () const + { + return addr_map_size_; + } + + private: + bool simulator_; + unsigned short max_packet_size_; + ACE_Time_Value tick_; + unsigned long nak_timeout_; + unsigned long nrtm_timeout_; + unsigned long retention_timeout_; + size_t addr_map_size_; + }; +} + + +#endif // ACE_RMCAST_PARAMETERS_H diff --git a/ACE/protocols/ace/RMCast/Protocol.cpp b/ACE/protocols/ace/RMCast/Protocol.cpp new file mode 100644 index 00000000000..755872371cb --- /dev/null +++ b/ACE/protocols/ace/RMCast/Protocol.cpp @@ -0,0 +1,17 @@ +// file : ace/RMCast/Protocol.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Protocol.h" + +namespace ACE_RMCast +{ + u16 const From:: id = 0x0001; + u16 const To:: id = 0x0002; + u16 const Data:: id = 0x0003; + u16 const SN:: id = 0x0004; + u16 const NAK:: id = 0x0005; + u16 const NRTM:: id = 0x0006; + u16 const NoData::id = 0x0007; + u16 const Part:: id = 0x0008; +} diff --git a/ACE/protocols/ace/RMCast/Protocol.h b/ACE/protocols/ace/RMCast/Protocol.h new file mode 100644 index 00000000000..cf2c2d2289d --- /dev/null +++ b/ACE/protocols/ace/RMCast/Protocol.h @@ -0,0 +1,1285 @@ +// file : ace/RMCast/Protocol.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_PROTOCOL_H +#define ACE_RMCAST_PROTOCOL_H + +#include "ace/Auto_Ptr.h" +#include "ace/Bound_Ptr.h" + +#include "ace/Vector_T.h" +#include "ace/Hash_Map_Manager.h" + +#include "ace/CDR_Stream.h" +#include "ace/CDR_Size.h" + +#include "ace/INET_Addr.h" +#include "ace/Null_Mutex.h" + +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_stdlib.h" + +#include "Bits.h" + +/* +#include <iostream> +*/ + +namespace ACE_RMCast +{ + // Basic types. + // + typedef ACE_CDR::UShort u16; + typedef ACE_CDR::ULong u32; + typedef ACE_CDR::ULongLong u64; + + // Protocol parameters + // + // + unsigned short const max_service_size = 60; // service profiles (Part, SN, + // etc), sizes plus message size. + + // + // + // + typedef ACE_INET_Addr Address; + + struct AddressHasher + { + unsigned long + operator() (Address const& a) const + { + unsigned long port (a.get_port_number ()); + unsigned long ip (a.get_ip_address ()); + + port <<= sizeof (unsigned long) - sizeof (unsigned short); + + return port ^ ip; + } + }; + + //@@ Provide stream<< (Address const&) + // + + typedef ACE_OutputCDR ostream; + typedef ACE_SizeCDR sstream; + typedef ACE_InputCDR istream; + + struct Profile; + + typedef + ACE_Strong_Bound_Ptr<Profile, Mutex> + Profile_ptr; + + struct Profile + { + public: + class Header + { + public: + Header (u16 id, u16 size) + : id_ (id), size_ (size) + { + } + + Header (istream& is) + { + (void) (is >> id_ >> size_); + } + + public: + u16 + id () const + { + return id_; + } + + u16 + size () const + { + return size_; + } + + protected: + void + size (u16 s) + { + size_ = s; + } + + friend struct Profile; + + private: + u16 id_; + u16 size_; + }; + + public: + virtual + ~Profile () + { + } + + Profile_ptr + clone () + { + Profile_ptr p (clone_ ()); + return p; + } + + protected: + Profile (u16 id) + : header_ (id, 0) + { + } + + Profile (Header const& h) + : header_ (h) + { + } + + virtual Profile_ptr + clone_ () = 0; + + private: + Profile& + operator= (Profile const&); + + public: + u16 + id () const + { + return header_.id (); + } + + u16 + size () const + { + return header_.size (); + } + + protected: + void + size (u16 s) + { + header_.size (s); + } + + u16 + calculate_size () + { + sstream ss; + + serialize_body (ss); + + return static_cast<u16> (ss.total_length ()); + } + + public: + virtual void + serialize_body (ostream&) const = 0; + + virtual void + serialize_body (sstream&) const = 0; + + friend + ostream& + operator<< (ostream& os, Profile const& p); + + friend + sstream& + operator<< (sstream& ss, Profile const& p); + + private: + Header header_; + }; + + inline + ostream& + operator<< (ostream& os, Profile::Header const& hdr) + { + os << hdr.id (); + os << hdr.size (); + + return os; + } + + inline + sstream& + operator<< (sstream& ss, Profile::Header const& hdr) + { + ss << hdr.id (); + ss << hdr.size (); + + return ss; + } + + inline + ostream& + operator<< (ostream& os, Profile const& p) + { + os << p.header_; + p.serialize_body (os); + + return os; + } + + inline + sstream& + operator<< (sstream& ss, Profile const& p) + { + ss << p.header_; + p.serialize_body (ss); + + return ss; + } + + + // + // + // + class Message; + + typedef + ACE_Strong_Bound_Ptr<Message, Mutex> + Message_ptr; + + class Message + { + typedef + ACE_Hash_Map_Manager<u16, Profile_ptr, ACE_Null_Mutex> + Profiles; + + public: + Message () + : profiles_ (4) + { + } + + Message_ptr + clone () + { + Message_ptr cloned (new Message (*this)); + return cloned; + } + + protected: + Message (Message const& m) + : profiles_ (4) + { + for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ()) + { + // Shallow copy of profiles. This implies that profiles are not + // modified as they go up/down the stack. + // + profiles_.bind ((*i).ext_id_, (*i).int_id_); + } + } + + private: + Message& + operator= (Message const&); + + public: + bool + add (Profile_ptr p) + { + u16 id (p->id ()); + + if (profiles_.find (id) == 0) + { + return false; + } + + profiles_.bind (id, p); + + return true; + } + + void + replace (Profile_ptr p) + { + profiles_.rebind (p->id (), p); + } + + void + remove (u16 id) + { + profiles_.unbind (id); + } + + Profile const* + find (u16 id) const + { + Profiles::ENTRY* e = 0; + + if (profiles_.find (id, e) == -1) return 0; + + return e->int_id_.get (); + } + + typedef + Profiles::const_iterator + ProfileIterator; + + ProfileIterator + begin () const + { + return ProfileIterator (profiles_); + } + + public: + size_t + size () const + { + sstream ss; + + u32 s (0); + + ss << s; + + for (Profiles::const_iterator i (profiles_); !i.done (); i.advance ()) + { + ss << *((*i).int_id_); + } + + return ss.total_length (); + } + + friend + ostream& + operator<< (ostream& os, Message const& m) + { + u32 s (m.size ()); + + os << s; + + for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ()) + { + os << *((*i).int_id_); + } + + return os; + } + + private: + Profiles profiles_; + }; + + typedef ACE_Vector<Message_ptr, ACE_VECTOR_DEFAULT_SIZE> Messages; + + // + // + // + struct From; + + typedef + ACE_Strong_Bound_Ptr<From, Mutex> + From_ptr; + + struct From : Profile + { + static u16 const id; + + public: + From (Header const& h, istream& is) + : Profile (h) + { + u32 addr; + u16 port; + + is >> addr; + is >> port; + + address_ = Address (port, addr); + } + + From (Address const& addr) + : Profile (id), address_ (addr) + { + size (calculate_size ()); + } + + From_ptr + clone () + { + return From_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new From (*this)); + return p; + } + + From (From const& from) + : Profile (from), + address_ (from.address_) + { + } + + public: + Address const& + address () const + { + return address_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + } + + virtual void + serialize_body (sstream& ss) const + { + u32 addr (0); + u16 port (0); + + ss << addr; + ss << port; + } + + private: + Address address_; + }; + + + // + // + // + struct To; + + typedef + ACE_Strong_Bound_Ptr<To, Mutex> + To_ptr; + + struct To : Profile + { + static u16 const id; + + public: + To (Header const& h, istream& is) + : Profile (h) + { + u32 addr; + u16 port; + + is >> addr; + is >> port; + + address_ = Address (port, addr); + } + + To (Address const& addr) + : Profile (id), address_ (addr) + { + size (calculate_size ()); + } + + To_ptr + clone () + { + return To_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new To (*this)); + return p; + } + + To (To const& to) + : Profile (to), + address_ (to.address_) + { + } + + public: + Address const& + address () const + { + return address_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + } + + virtual void + serialize_body (sstream& ss) const + { + u32 addr (0); + u16 port (0); + + ss << addr; + ss << port; + } + + private: + Address address_; + }; + + + // + // + // + struct Data; + + typedef + ACE_Strong_Bound_Ptr<Data, Mutex> + Data_ptr; + + struct Data : Profile + { + static u16 const id; + + public: + virtual + ~Data () + { + if (buf_) + operator delete (buf_); + } + + Data (Header const& h, istream& is) + : Profile (h), + buf_ (0), + size_ (h.size ()), + capacity_ (size_) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (capacity_)); + is.read_char_array (buf_, size_); + } + } + + Data (void const* buf, size_t s, size_t capacity = 0) + : Profile (id), + buf_ (0), + size_ (s), + capacity_ (capacity < size_ ? size_ : capacity) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (capacity_)); + ACE_OS::memcpy (buf_, buf, size_); + } + + Profile::size (calculate_size ()); + } + + Data_ptr + clone () + { + return Data_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new Data (*this)); + return p; + } + + Data (Data const& d) + : Profile (d), + buf_ (0), + size_ (d.size_), + capacity_ (d.capacity_) + { + if (size_) + { + buf_ = reinterpret_cast<char*> (operator new (capacity_)); + ACE_OS::memcpy (buf_, d.buf_, size_); + } + + Profile::size (calculate_size ()); + } + + public: + char const* + buf () const + { + return buf_; + } + + char* + buf () + { + return buf_; + } + + size_t + size () const + { + return size_; + } + + void + size (size_t s) + { + if (s > capacity_) + ACE_OS::abort (); + + size_ = s; + + Profile::size (calculate_size ()); + } + + size_t + capacity () const + { + return capacity_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os.write_char_array (buf_, size_); + } + + virtual void + serialize_body (sstream& ss) const + { + ss.write_char_array (buf_, size_); + } + + private: + char* buf_; + size_t size_; + size_t capacity_; + }; + + + // + // + // + struct SN; + + typedef + ACE_Strong_Bound_Ptr<SN, Mutex> + SN_ptr; + + struct SN : Profile + { + static u16 const id; + + public: + SN (Header const& h, istream& is) + : Profile (h) + { + is >> n_; + } + + SN (u64 n) + : Profile (id), n_ (n) + { + size (calculate_size ()); + } + + SN_ptr + clone () + { + return SN_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new SN (*this)); + return p; + } + + SN (SN const& sn) + : Profile (sn), + n_ (sn.n_) + { + } + + public: + u64 + num () const + { + return n_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os << n_; + } + + virtual void + serialize_body (sstream& ss) const + { + ss << n_; + } + + private: + u64 n_; + }; + + + // + // + // + class NAK; + + typedef + ACE_Strong_Bound_Ptr<NAK, Mutex> + NAK_ptr; + + class NAK : public Profile + { + public: + + static u16 const id; + + typedef ACE_Vector<u64, ACE_VECTOR_DEFAULT_SIZE> SerialNumbers; + typedef SerialNumbers::Iterator iterator; + + NAK (Header const& h, istream& is) + : Profile (h) + { + u64 sn (0); + u32 addr (0); + u16 port (0); + + sstream ss; + + ss << sn; + size_t sn_size (ss.total_length ()); + + ss.reset (); + + ss << addr; + ss << port; + + size_t addr_size (ss.total_length ()); + + + is >> addr; + is >> port; + + // num_of_sns = (size - addr_size) / sn_size + // + for (unsigned long i (0); i < ((h.size () - addr_size) / sn_size); ++i) + { + is >> sn; + sns_.push_back (sn); + } + + + address_ = Address (port, addr); + } + + NAK (Address const& src) + : Profile (id), address_ (src) + { + size (calculate_size ()); + } + + NAK_ptr + clone () + { + return NAK_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new NAK (*this)); + return p; + } + + NAK (NAK const& nak) + : Profile (nak), + address_ (nak.address_), + sns_ (nak.sns_) + { + } + + public: + void + add (u64 sn) + { + sns_.push_back (sn); + size (calculate_size ()); + } + + public: + Address const& + address () const + { + return address_; + } + + + iterator + begin () /* const */ + { + return iterator (sns_); + } + + /* + iterator + end () const + { + return sns_.end (); + } + */ + + size_t + count () const + { + return sns_.size (); + } + + public: + // Count max number of elements that will fit into NAK profile + // with size <= max_size. + // + static u32 + max_count (u32 max_size) + { + u32 n (0); + + sstream ss; + + Profile::Header hdr (0, 0); + ss << hdr; + + u32 addr (0); + u16 port (0); + ss << addr; + ss << port; + + while (true) + { + u64 sn (0); + ss << sn; + + if (ss.total_length () <= max_size) + ++n; + + if (ss.total_length () >= max_size) + break; + } + + return n; + } + + public: + virtual void + serialize_body (ostream& os) const + { + NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM. + + u32 addr (address_.get_ip_address ()); + u16 port (address_.get_port_number ()); + + os << addr; + os << port; + + // Stone age iteration. + // + for (iterator i (this_.begin ()); !i.done (); i.advance ()) + { + u64* psn; + i.next (psn); + os << *psn; + } + } + + virtual void + serialize_body (sstream& ss) const + { + NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM. + + u32 addr (0); + u16 port (0); + + ss << addr; + ss << port; + + // Stone age iteration. + // + for (iterator i (this_.begin ()); !i.done (); i.advance ()) + { + u64 sn (0); + ss << sn; + } + } + + private: + Address address_; + SerialNumbers sns_; + }; + + // + // + // + struct NRTM; + + typedef + ACE_Strong_Bound_Ptr<NRTM, Mutex> + NRTM_ptr; + + struct NRTM : Profile + { + static u16 const id; + + public: + NRTM (Header const& h, istream& is) + : Profile (h), map_ (10) + { + u32 addr (0); + u16 port (0); + u64 sn (0); + + sstream ss; + + ss << sn; + ss << addr; + ss << port; + + size_t block_size (ss.total_length ()); + + + // num_of_blocks = size / block_size + // + for (size_t i (0); i < (h.size () / block_size); ++i) + { + is >> sn; + is >> addr; + is >> port; + + map_.bind (Address (port, addr), sn); + } + } + + NRTM () + : Profile (id), map_ (10) + { + size (calculate_size ()); + } + + NRTM_ptr + clone () + { + return NRTM_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new NRTM (*this)); + return p; + } + + NRTM (NRTM const& nrtm) + : Profile (nrtm) + { + for (Map::const_iterator i (nrtm.map_); !i.done (); i.advance ()) + { + map_.bind ((*i).ext_id_, (*i).int_id_); + } + } + + public: + void + insert (Address const& addr, u64 sn) + { + map_.bind (addr, sn); + + size (calculate_size ()); + } + + u64 + find (Address const& addr) const + { + u64 sn; + + if (map_.find (addr, sn) == -1) return 0; + + return sn; + } + + bool + empty () const + { + return map_.current_size () == 0; + } + + public: + // Count max number of elements that will fit into NRTM profile + // with size <= max_size. + // + static u32 + max_count (u32 max_size) + { + u32 n (0); + + sstream ss; + + Profile::Header hdr (0, 0); + ss << hdr; + + while (true) + { + u32 addr (0); + u16 port (0); + u64 sn (0); + + ss << sn; + ss << addr; + ss << port; + + if (ss.total_length () <= max_size) + ++n; + + if (ss.total_length () >= max_size) + break; + } + + return n; + } + + public: + virtual void + serialize_body (ostream& os) const + { + for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i) + { + u32 addr ((*i).ext_id_.get_ip_address ()); + u16 port ((*i).ext_id_.get_port_number ()); + u64 sn ((*i).int_id_); + + os << sn; + os << addr; + os << port; + + } + } + + virtual void + serialize_body (sstream& ss) const + { + for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i) + { + u32 addr (0); + u16 port (0); + u64 sn (0); + + ss << sn; + ss << addr; + ss << port; + } + } + + private: + typedef + ACE_Hash_Map_Manager_Ex<Address, + u64, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + Map map_; + }; + + + // + // + // + struct NoData; + + typedef + ACE_Strong_Bound_Ptr<NoData, Mutex> + NoData_ptr; + + struct NoData : Profile + { + static u16 const id; + + public: + NoData (Header const& h, istream&) + : Profile (h) + { + } + + NoData () + : Profile (id) + { + Profile::size (0); + } + + NoData_ptr + clone () + { + return NoData_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new NoData (*this)); + return p; + } + + NoData (NoData const& no_data) + : Profile (no_data) + { + } + + public: + virtual void + serialize_body (ostream&) const + { + } + + virtual void + serialize_body (sstream&) const + { + } + }; + + + // + // + // + struct Part; + + typedef + ACE_Strong_Bound_Ptr<Part, Mutex> + Part_ptr; + + struct Part : Profile + { + static u16 const id; + + public: + Part (Header const& h, istream& is) + : Profile (h) + { + is >> num_; + is >> of_; + is >> total_size_; + } + + Part (u32 num, u32 of, u64 total_size) + : Profile (id), + num_ (num), + of_ (of), + total_size_ (total_size) + { + size (calculate_size ()); + } + + Part_ptr + clone () + { + return Part_ptr (clone_ ()); + } + + protected: + virtual Profile_ptr + clone_ () + { + Profile_ptr p (new Part (*this)); + return p; + } + + Part (Part const& part) + : Profile (part), + num_ (part.num_), + of_ (part.of_), + total_size_ (part.total_size_) + { + } + + public: + u32 + num () const + { + return num_; + } + + u32 + of () const + { + return of_; + } + + u64 + total_size () const + { + return total_size_; + } + + public: + virtual void + serialize_body (ostream& os) const + { + os << num_; + os << of_; + os << total_size_; + } + + virtual void + serialize_body (sstream& ss) const + { + ss << num_; + ss << of_; + ss << total_size_; + } + + + private: + u32 num_; + u32 of_; + u64 total_size_; + }; + +} + +/* +inline +std::ostream& +operator<< (std::ostream& os, ACE_RMCast::Address const& a) +{ + char buf[64]; + a.addr_to_string (buf, 64, 1); + return os << buf; +} +*/ + +#endif // ACE_RMCAST_PROTOCOL_H diff --git a/ACE/protocols/ace/RMCast/README b/ACE/protocols/ace/RMCast/README new file mode 100644 index 00000000000..9eda58871b0 --- /dev/null +++ b/ACE/protocols/ace/RMCast/README @@ -0,0 +1,197 @@ + +Introduction +------------ + +RMCast is a reliable source-ordered multicast protocol implementation +for message-oriented multi-sender group communication built on top of +IPv4 multicast. It uses sequence numbers for re-ordering, duplicate +suppression and loss detection. Negative acknowledgments (NAK) with +retransmissions are used to recover from losses. + +One new and interesting idea implemented in this protocol is history +transmission (dubbed negative retransmission). In a nutshell, each +sender, along with normal payload, transmits a list of other sender's +IDs along with sequence numbers of last messages received from those +senders by this member. This, in some sense, builds a pyramid of +information: each subsequent message carries some information for a +number of previous messages (from other senders). This helps other +receivers detect losses. + +The protocol does not track group membership. Messages are retained +for retransmission for a predefined amount of time. The "last message +or lost message" dilemma is solved by periodic history transmissions +in cases when there is no useful traffic (idle network). + + +Examples +-------- + +There is a simple example available in examples/RMCast/Send_Msg with +the corresponding README file. + + +Protocol +-------- + +Over-the-wire representation is little-endian CDR. The protocol is +message-based with information encapsulated into one or multiple +profiles (Protocol.h): + + + +struct Profile +{ + u16 id; // Profile id. + u16 size; // Profile size. +}; + +struct Message +{ + u32 size; // Total size of the message. + sequence<Profile> profiles; // Sequence of profiles. +} + + + +The following profiles are defined (Protocol.h): + + + +struct From: Profile +{ + u32 addr; // IPv4 address. + u16 port; +}; + +The 'From' profile appears in each message. It is never transmitted +over the wire. Instead the 'Link' layer (see below) adds it. + + + +struct To: Profile +{ + u32 addr; // IPv4 address. + u16 port; +}; + +The 'To' profile also appears in each message. It is also never +transmitted over the wire since all communications are done via +well-known group address. It is added by the 'Link' layer and is used +by a group member to identify messages from itself ('From' == 'To'). + + + +struct Data: Profile +{ + sequence<octet> data; +}; + +The 'Data' profile is used to transmit user payload. + + + +struct SN: Profile +{ + u64 sn; +}; + +The 'SN' profile carries sequence number for 'Data' and 'NoData' profiles. + + + +struct NAK: Profile +{ + u32 addr; // IPv4 address. + u16 port; + + sequence<u64> sns; +}; + +The 'NAK' profile carries sequence numbers of all the messages originated +from the member identified by addr:port that the receiver detected were +lost. + + + +struct NRTM: Profile +{ + struct Pair + { + u32 addr; // IPv4 address. + u16 port; + + u64 max_sn; + }; + + sequence<Pair> nrtm; +}; + +The 'NRTM' profile carries highest sequence numbers known to this member +for (some sub-) set of members. It is used by other members to detect loses. +This profile is normally combined with 'Data' transmission. + + + +struct NoData: Profile +{ +}; + +The 'NoData' profile is send in reply to 'NAK' when the lost message is +no longer available. + + + +Below is the list of actions that trigger messages with various +profiles: + + +user calls send(): SN + Data + NRTM + +detected loss: NAK + + +received NAK: SN or SN + Data NoData + NRTM NRTM + + +Implementation +-------------- + +This section describes high-level architecture of the implementation. +The protocol is implemented as a stack (Stack.h) of the following +elements: + +'Socket' +'Acknowledge' +'Retransmit' +'Link' + +The 'Socket' element is the user interface of the member. When a user +calls send() 'Socket' creates a new message with 'SN' and 'Data' profiles +and forwards it to the 'Acknowledge' element. It also receives (from +'Acknowledge') and queues incoming messages that are delivered to the user +when recv() is called. + +The 'Acknowledge' element is responsible for re-ordering, duplicate +suppression, loss detection and negative acknowledgments. It maintains a +dynamically changing "window" (which slides toward higher sequence +numbers) of received messages. Messages that arrive out of order are held +in this window. Presence of a hole in the windows for a long period of time +indicates loss and triggers a negative acknowledgment. + +The 'Retransmit' element is responsible for message retention, aging and +retransmission in response to NAKs. Each message received from the 'Socket' +element is held for predetermined amount of time in case retransmission is +required. Upon reception of a NAK duplicate is send if the requested message +is still available. Otherwise 'NoData' profile is sent. + + +The 'Link' element is responsible for interfacing with the IPv4 multicast +socket. It also parses over-the-wire representation into in-memory messages +with individually-accessible profiles. + +-- +Boris Kolpackov <boris@kolpackov.net> diff --git a/ACE/protocols/ace/RMCast/RMCast.mpc b/ACE/protocols/ace/RMCast/RMCast.mpc new file mode 100644 index 00000000000..de7da3ddda5 --- /dev/null +++ b/ACE/protocols/ace/RMCast/RMCast.mpc @@ -0,0 +1,12 @@ +// -*- MPC -*- +// $Id$ + +project(RMCast) : acelib, ace_output, install, threads { + avoids = ace_for_tao + sharedname = ACE_RMCast + dynamicflags = ACE_RMCAST_BUILD_DLL + + Pkgconfig_Files { + ACE_RMCast.pc.in + } +} diff --git a/ACE/protocols/ace/RMCast/RMCast_Export.h b/ACE/protocols/ace/RMCast/RMCast_Export.h new file mode 100644 index 00000000000..51257c4d682 --- /dev/null +++ b/ACE/protocols/ace/RMCast/RMCast_Export.h @@ -0,0 +1,44 @@ +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by +// generate_export_file.pl +// ------------------------------ +#if !defined (ACE_RMCAST_EXPORT_H) +#define ACE_RMCAST_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (ACE_RMCAST_HAS_DLL) +# define ACE_RMCAST_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && ACE_RMCAST_HAS_DLL */ + +#if !defined (ACE_RMCAST_HAS_DLL) +#define ACE_RMCAST_HAS_DLL 1 +#endif /* ! ACE_RMCAST_HAS_DLL */ + +#if defined (ACE_RMCAST_HAS_DLL) +# if (ACE_RMCAST_HAS_DLL == 1) +# if defined (ACE_RMCAST_BUILD_DLL) +# define ACE_RMCast_Export ACE_Proper_Export_Flag +# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else +# define ACE_RMCast_Export ACE_Proper_Import_Flag +# define ACE_RMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* ACE_RMCAST_BUILD_DLL */ +# else +# define ACE_RMCast_Export +# define ACE_RMCAST_SINGLETON_DECLARATION(T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* ! ACE_RMCAST_HAS_DLL == 1 */ +#else +# define ACE_RMCast_Export +# define ACE_RMCAST_SINGLETON_DECLARATION(T) +# define ACE_RMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* ACE_RMCAST_HAS_DLL */ + +#endif /* ACE_RMCAST_EXPORT_H */ + +// End of auto generated file. diff --git a/ACE/protocols/ace/RMCast/Reassemble.cpp b/ACE/protocols/ace/RMCast/Reassemble.cpp new file mode 100644 index 00000000000..e5aeab92519 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Reassemble.cpp @@ -0,0 +1,135 @@ +// file : ace/RMCast/Reassemble.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Reassemble.h" +#include "ace/OS_NS_stdlib.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Reassemble:: + Reassemble (Parameters const& params) + : params_ (params) + { + } + + void Reassemble::recv (Message_ptr m) + { + Map::ENTRY* e; + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + if (Data const* data = static_cast<Data const*> (m->find (Data::id))) + { + if (Part const* part = static_cast<Part const*> (m->find (Part::id))) + { + if (map_.find (from, e) == -1) + { + // First part of the message. + // + + if (part->num () != 1) + { + // We assume that we received NoData for one of the preceding + // fragments. Ignore this one. + return; + } + + Data_ptr new_data (new Data (data->buf (), + static_cast<size_t> (data->size ()), + static_cast<size_t> (part->total_size ()))); + + //std::cerr << "part->total_size (): " << part->total_size () << endl; + + map_.bind (from, new_data); + } + else + { + // Next part of the message. + // + + if (part->num () == 1) + ACE_OS::abort (); + + + Data const* data = static_cast<Data const*> (m->find (Data::id)); + + Data_ptr& new_data = e->int_id_; + + ACE_OS::memcpy (new_data->buf () + new_data->size (), + data->buf (), + data->size ()); + + //std::cerr << "data->size (): " << data->size () << endl + // << "new_data->size (): " << new_data->size () << endl + // << "new_data->capa (): " << new_data->capacity () << endl; + + new_data->size (new_data->size () + data->size ()); + + + if (part->num () == part->of ()) + { + // Reassembly is complete. + // + if (part->total_size () != new_data->size ()) + ACE_OS::abort (); + + Message_ptr new_msg (new Message ()); + + Address to ( + static_cast<To const*> (m->find (To::id))->address ()); + + new_msg->add (Profile_ptr (new To (to))); + new_msg->add (Profile_ptr (new From (from))); + /* + * Heads up... we need to add the new_data to new_msg then + * unbind the entry that maps to new_data, which will decrement + * its reference count. If the bound/refcounted pointer acted + * polymorphically like a regular pointer does, we'd be able to + * just pass new_data to add(Profile_Ptr) and it would work. + * However, Profile_Ptr and Data_Ptr are not compatible, but + * we can use the secret knowledge that both are instances of the + * same template and that the pointers they contain really are + * hierarchically compatible, and do this funky cast to get + * the result we want. + */ + //new_msg->add (*(reinterpret_cast<Profile_ptr*> (&new_data))); + + new_msg->add (Profile_ptr (new_data)); + + map_.unbind (from); + + in_->recv (new_msg); + } + } + } + else + { + // Non-fragmented message. Make sure we are in the consistent state + // and forward it up. + // + if (map_.find (from, e) != -1) + ACE_OS::abort (); + + in_->recv (m); + } + } + else if (m->find (NoData::id) != 0) + { + if (map_.find (from, e) != -1) + { + // We already received some fragments. Clean everyhting up. + // + map_.unbind (from); + } + + in_->recv (m); + } + } +} diff --git a/ACE/protocols/ace/RMCast/Reassemble.h b/ACE/protocols/ace/RMCast/Reassemble.h new file mode 100644 index 00000000000..cffa4fdc359 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Reassemble.h @@ -0,0 +1,42 @@ +// file : ace/RMCast/Reassemble.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_REASSEMBLE_H +#define ACE_RMCAST_REASSEMBLE_H + +#include "ace/Hash_Map_Manager.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Reassemble : public Element + { + public: + Reassemble (Parameters const& params); + + public: + virtual void + recv (Message_ptr m); + + private: + Parameters const& params_; + + typedef + ACE_Hash_Map_Manager_Ex<Address, + Data_ptr, + AddressHasher, + ACE_Equal_To<Address>, + ACE_Null_Mutex> + Map; + + Map map_; + }; +} + + +#endif // ACE_RMCAST_REASSEMBLE_H diff --git a/ACE/protocols/ace/RMCast/Retransmit.cpp b/ACE/protocols/ace/RMCast/Retransmit.cpp new file mode 100644 index 00000000000..6611bb14b52 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Retransmit.cpp @@ -0,0 +1,159 @@ +// file : ace/RMCast/Retransmit.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/Time_Value.h" // ACE_Time_Value +#include "ace/OS_NS_stdlib.h" // abort +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "Retransmit.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + Retransmit:: + Retransmit (Parameters const& params) + : params_ (params), + cond_ (mutex_), + stop_ (false) + { + } + + void Retransmit:: + out_start (Out_Element* out) + { + Element::out_start (out); + + tracker_mgr_.spawn (track_thunk, this); + } + + void Retransmit:: + out_stop () + { + { + Lock l (mutex_); + stop_ = true; + cond_.signal (); + } + + tracker_mgr_.wait (); + + Element::out_stop (); + } + + void Retransmit::send (Message_ptr m) + { + if (m->find (Data::id) != 0) + { + SN const* sn = static_cast<SN const*> (m->find (SN::id)); + + Lock l (mutex_); + queue_.bind (sn->num (), Descr (m->clone ())); + } + + out_->send (m); + } + + void Retransmit::recv (Message_ptr m) + { + if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id))) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + if (nak->address () == to) + { + Lock l (mutex_); + + for (NAK::iterator j (const_cast<NAK*> (nak)->begin ()); + !j.done (); + j.advance ()) + { + u64* psn; + j.next (psn); + + Message_ptr m; + + Queue::ENTRY* pair; + + if (queue_.find (*psn, pair) == 0) + { + //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl; + + m = pair->int_id_.message (); + + pair->int_id_.reset (); + } + else + { + //cerr << 4 << "message " << *psn << " not available" << endl; + + m = Message_ptr (new Message); + m->add (Profile_ptr (new SN (*psn))); + m->add (Profile_ptr (new NoData)); + } + + out_->send (m); + } + } + } + + in_->recv (m); + } + + ACE_THR_FUNC_RETURN Retransmit:: + track_thunk (void* obj) + { + reinterpret_cast<Retransmit*> (obj)->track (); + return 0; + } + + void Retransmit:: + track () + { + while (true) + { + Lock l (mutex_); + + for (Queue::iterator i (queue_); !i.done ();) + { + if ((*i).int_id_.inc () >= params_.retention_timeout ()) + { + u64 sn ((*i).ext_id_); + i.advance (); + queue_.unbind (sn); + } + else + { + i.advance (); + } + } + + //FUZZ: disable check_for_lack_ACE_OS + // Go to sleep but watch for "manual cancellation" request. + // + ACE_Time_Value time (ACE_OS::gettimeofday ()); + //FUZZ: enable check_for_lack_ACE_OS + + time += params_.tick (); + + while (!stop_) + { + if (cond_.wait (&time) == -1) + { + if (errno != ETIME) + ACE_OS::abort (); + else + break; + } + } + + if (stop_) + break; + } + } +} diff --git a/ACE/protocols/ace/RMCast/Retransmit.h b/ACE/protocols/ace/RMCast/Retransmit.h new file mode 100644 index 00000000000..0a3a8f72bd7 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Retransmit.h @@ -0,0 +1,98 @@ +// file : ace/RMCast/Retransmit.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_RETRANSMIT_H +#define ACE_RMCAST_RETRANSMIT_H + +#include "ace/Hash_Map_Manager.h" +#include "ace/Thread_Manager.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" +#include "Parameters.h" + +namespace ACE_RMCast +{ + class Retransmit : public Element + { + public: + Retransmit (Parameters const& params); + + virtual void + out_start (Out_Element* out); + + virtual void + out_stop (); + + public: + virtual void + send (Message_ptr m); + + virtual void + recv (Message_ptr m); + + private: + struct Descr + { + // Shouldn't be available but ACE_Hash_Map needs it. + // + Descr () + : msg_ (), count_ (0) + { + } + + Descr (Message_ptr msg) + : msg_ (msg), count_ (0) + { + } + + unsigned long + inc () + { + return ++count_; + } + + void + reset () + { + count_ = 0; + } + + Message_ptr + message () const + { + return msg_->clone (); + } + + private: + Message_ptr msg_; + unsigned long count_; + }; + + typedef + ACE_Hash_Map_Manager<u64, Descr, ACE_Null_Mutex> + Queue; + + private: + void + track (); + + static ACE_THR_FUNC_RETURN + track_thunk (void* obj); + + private: + Parameters const& params_; + + Queue queue_; + Mutex mutex_; + Condition cond_; + + bool stop_; + ACE_Thread_Manager tracker_mgr_; + }; +} + + +#endif // ACE_RMCAST_RETRANSMIT_H diff --git a/ACE/protocols/ace/RMCast/Simulator.cpp b/ACE/protocols/ace/RMCast/Simulator.cpp new file mode 100644 index 00000000000..ec879c865ba --- /dev/null +++ b/ACE/protocols/ace/RMCast/Simulator.cpp @@ -0,0 +1,39 @@ +// file : ace/RMCast/Simulator.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/OS_NS_time.h" +#include "Simulator.h" + +namespace ACE_RMCast +{ + Simulator::Simulator () + { + ACE_OS::srand ((unsigned int)ACE_OS::time (0)); + } + + void Simulator::send (Message_ptr m) + { + // Note: Simulator may work in unpredictable ways mainly due + // to the "reliable loopback" mechanism. + // + out_->send (m); + return; + + /*int r (rand ()); + + if ((r % 3) == 0) return; + + Lock l (mutex_); + + if (hold_.get ()) + { + out_->send (m); + out_->send (hold_); + } + else + { + hold_ = m; + }*/ + } +} diff --git a/ACE/protocols/ace/RMCast/Simulator.h b/ACE/protocols/ace/RMCast/Simulator.h new file mode 100644 index 00000000000..27b227f488b --- /dev/null +++ b/ACE/protocols/ace/RMCast/Simulator.h @@ -0,0 +1,30 @@ +// file : ace/RMCast/Simulator.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_SIMULATOR_H +#define ACE_RMCAST_SIMULATOR_H + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" + +namespace ACE_RMCast +{ + class Simulator : public Element + { + public: + Simulator (); + + public: + virtual void + send (Message_ptr m); + + private: + Message_ptr hold_; + Mutex mutex_; + }; +} + + +#endif // ACE_RMCAST_SIMULATOR_H diff --git a/ACE/protocols/ace/RMCast/Socket.cpp b/ACE/protocols/ace/RMCast/Socket.cpp new file mode 100644 index 00000000000..51e6d450173 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Socket.cpp @@ -0,0 +1,396 @@ +// file : ace/RMCast/Socket.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "ace/OS_Memory.h" +#include "ace/OS_NS_stdio.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_unistd.h" +#include "ace/OS_NS_sys_time.h" // gettimeofday + +#include "ace/Unbounded_Queue.h" + +#include "ace/Pipe.h" + +#include "Stack.h" +#include "Protocol.h" +#include "Bits.h" + +#include "Fragment.h" +#include "Reassemble.h" +#include "Acknowledge.h" +#include "Retransmit.h" +#include "Flow.h" +#include "Link.h" + +#include "Socket.h" + +/* +#include <iostream> +using std::cerr; +using std::endl; +*/ + +namespace ACE_RMCast +{ + class Socket_Impl : protected Element + { + public: + ~Socket_Impl (); + + Socket_Impl (Address const& a, bool loop, Parameters const& params); + + public: + void + send_ (void const* buf, size_t s); + + ssize_t + recv_ (void* buf, + size_t s, + ACE_Time_Value const* timeout, + ACE_INET_Addr* from); + + ssize_t + size_ (ACE_Time_Value const* timeout); + + ACE_HANDLE + get_handle_ (); + + private: + //FUZZ: disable check_for_lack_ACE_OS + virtual void recv (Message_ptr m); + //FUZZ: enable check_for_lack_ACE_OS + + private: + bool loop_; + Parameters const params_; + + Mutex mutex_; + Condition cond_; + + ACE_Unbounded_Queue<Message_ptr> queue_; + + ACE_Pipe signal_pipe_; + + ACE_Auto_Ptr<Fragment> fragment_; + ACE_Auto_Ptr<Reassemble> reassemble_; + ACE_Auto_Ptr<Acknowledge> acknowledge_; + ACE_Auto_Ptr<Retransmit> retransmit_; + ACE_Auto_Ptr<Flow> flow_; + ACE_Auto_Ptr<Link> link_; + }; + + + Socket_Impl:: + Socket_Impl (Address const& a, bool loop, Parameters const& params) + : loop_ (loop), + params_ (params), + cond_ (mutex_) + { + fragment_.reset (new Fragment (params_)); + reassemble_.reset (new Reassemble (params_)); + acknowledge_.reset (new Acknowledge (params_)); + retransmit_.reset (new Retransmit (params_)); + flow_.reset (new Flow (params_)); + link_.reset (new Link (a, params_)); + + // Start IN stack from top to bottom. + // + in_start (0); + fragment_->in_start (this); + reassemble_->in_start (fragment_.get ()); + acknowledge_->in_start (reassemble_.get ()); + retransmit_->in_start (acknowledge_.get ()); + flow_->in_start (retransmit_.get ()); + link_->in_start (flow_.get ()); + + // Start OUT stack from bottom up. + // + link_->out_start (0); + flow_->out_start (link_.get ()); + retransmit_->out_start (flow_.get ()); + acknowledge_->out_start (retransmit_.get ()); + reassemble_->out_start (acknowledge_.get ()); + fragment_->out_start (reassemble_.get ()); + out_start (fragment_.get ()); + } + + Socket_Impl:: + ~Socket_Impl () + { + // Stop OUT stack from top to bottom. + // + out_stop (); + fragment_->out_stop (); + reassemble_->out_stop (); + acknowledge_->out_stop (); + retransmit_->out_stop (); + flow_->out_stop (); + link_->out_stop (); + + // Stop IN stack from bottom up. + // + link_->in_stop (); + flow_->in_stop (); + retransmit_->in_stop (); + acknowledge_->in_stop (); + reassemble_->in_stop (); + fragment_->in_stop (); + in_stop (); + + // Close signal pipe. + // + if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) + signal_pipe_.close (); + } + + + void Socket_Impl:: + send_ (void const* buf, size_t s) + { + Message_ptr m (new Message); + + m->add (Profile_ptr (new Data (buf, s))); + + // Qualification is for VC6 and VxWorks. + // + Element::send (m); + } + + ssize_t Socket_Impl:: + recv_ (void* buf, + size_t s, + ACE_Time_Value const* timeout, + ACE_INET_Addr* from) + { + ACE_Time_Value abs_time; + + if (timeout) + abs_time = ACE_OS::gettimeofday () + *timeout; + + Lock l (mutex_); + + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } + + + Message_ptr m; + + if (queue_.dequeue_head (m) == -1) + ACE_OS::abort (); + + + if (queue_.is_empty ()) + { + // Remove data from the pipe. + // + if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE) + { + char c; + + if (signal_pipe_.recv (&c, 1) != 1) + { + ACE_OS::perror ("read: "); + ACE_OS::abort (); + } + } + } + + if (from) + *from = static_cast<From const*> (m->find (From::id))->address (); + + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } + + Data const* d = static_cast<Data const*>(m->find (Data::id)); + + ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s)); + + ACE_OS::memcpy (buf, d->buf (), r); + + return r; + } + + ssize_t Socket_Impl:: + size_ (ACE_Time_Value const* timeout) + { + ACE_Time_Value abs_time; + + if (timeout) + abs_time = ACE_OS::gettimeofday () + *timeout; + + Lock l (mutex_); + + while (queue_.is_empty ()) + { + if (timeout) + { + if (cond_.wait (&abs_time) != -1) + break; + } + else + { + if (cond_.wait () != -1) + break; + } + + return -1; // errno is already set + } + + // I can't get the head of the queue without actually dequeuing + // the element. + // + Message_ptr m; + + if (queue_.dequeue_head (m) == -1) + ACE_OS::abort (); + + if (queue_.enqueue_head (m) == -1) + ACE_OS::abort (); + + if (m->find (NoData::id) != 0) + { + errno = ENOENT; + return -1; + } + + Data const* d = static_cast<Data const*>(m->find (Data::id)); + + return static_cast<ssize_t> (d->size ()); + } + + ACE_HANDLE Socket_Impl:: + get_handle_ () + { + if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE) + { + signal_pipe_.open (); + } + + return signal_pipe_.read_handle (); + } + + + void Socket_Impl::recv (Message_ptr m) + { + if (m->find (Data::id) != 0 || m->find (NoData::id) != 0) + { + if (!loop_) + { + Address to (static_cast<To const*> (m->find (To::id))->address ()); + + Address from ( + static_cast<From const*> (m->find (From::id))->address ()); + + if (to == from) + return; + } + + Lock l (mutex_); + + //if (queue_.size () != 0) + // cerr << "recv socket queue size: " << queue_.size () << endl; + + //FUZZ: disable check_for_lack_ACE_OS + bool signal (queue_.is_empty ()); + //FUZZ: enable check_for_lack_ACE_OS + + queue_.enqueue_tail (m); + + if (signal) + { + // Also write to the pipe. + if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE) + { + char c; + + if (signal_pipe_.send (&c, 1) != 1) + { + // perror ("write: "); + ACE_OS::abort (); + } + } + + cond_.signal (); + } + } + } + + + // Socket + // + // + Socket:: + ~Socket () + { + } + + Socket:: + Socket (Address const& a, bool loop, Parameters const& params) + : impl_ (new Socket_Impl (a, loop, params)) + { + } + + void Socket::send (void const* buf, size_t s) + { + impl_->send_ (buf, s); + } + + ssize_t Socket::recv (void* buf, size_t s) + { + return impl_->recv_ (buf, s, 0, 0); + } + + ssize_t Socket::recv (void* buf, size_t s, ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, 0, &from); + } + + ssize_t Socket::recv (void* buf, size_t s, ACE_Time_Value const& timeout) + { + return impl_->recv_ (buf, s, &timeout, 0); + } + + ssize_t Socket::recv (void* buf, + size_t s, + ACE_Time_Value const& timeout, + ACE_INET_Addr& from) + { + return impl_->recv_ (buf, s, &timeout, &from); + } + + ssize_t Socket:: + size () + { + return impl_->size_ (0); + } + + ssize_t Socket:: + size (ACE_Time_Value const& timeout) + { + return impl_->size_ (&timeout); + } + + ACE_HANDLE Socket:: + get_handle () + { + return impl_->get_handle_ (); + } +} diff --git a/ACE/protocols/ace/RMCast/Socket.h b/ACE/protocols/ace/RMCast/Socket.h new file mode 100644 index 00000000000..3468df15d4d --- /dev/null +++ b/ACE/protocols/ace/RMCast/Socket.h @@ -0,0 +1,99 @@ +// file : ace/RMCast/Socket.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_SOCKET_H +#define ACE_RMCAST_SOCKET_H + +#include "ace/config-lite.h" // ACE_HANDLE + +#include "ace/Auto_Ptr.h" +#include "ace/INET_Addr.h" +#include "ace/Time_Value.h" + +#include "RMCast_Export.h" +#include "Parameters.h" + + +namespace ACE_RMCast +{ + class Socket_Impl; + + class ACE_RMCast_Export Socket + { + public: + virtual + ~Socket (); + + // If 'simulator' is 'true' then internal message loss and + // reordering simulator (on IPv4 level) is turned on. + // + Socket (ACE_INET_Addr const& a, + bool loop = true, + Parameters const& params = Parameters ()); + + public: + virtual void + send (void const* buf, size_t s); + + + // Block if message is not available. Upon successful completion + // return the next message. Otherwise return -1 and errno set to + // indicate the error. ENOENT indicates that the sender does not + // retain the data for this message anymore. + // + virtual ssize_t + recv (void* buf, size_t s); + + virtual ssize_t + recv (void* buf, size_t s, ACE_INET_Addr& from); + + + // Block for up to <timeout> until message is available. Upon + // successful completion return the next message. Otherwise + // return -1 and errno set to indicate the error. ETIME indicates + // that the operation timed out. ENOENT indicates that the sender + // does not retain the data for this message anymore. + // + virtual ssize_t + recv (void* buf, size_t s, ACE_Time_Value const& timeout); + + virtual ssize_t + recv (void* buf, + size_t s, + ACE_Time_Value const& timeout, + ACE_INET_Addr& from); + + + // Block if message is not available. Upon successful completion + // return the size of the next message. Otherwise return -1 and + // errno set to indicate the error. ENOENT indicates that the + // sender does not retain the data for this message anymore. + // + virtual ssize_t + size (); + + + // Block for up to <timeout> until message is available. Upon + // successful completion return the size of the next message. + // Otherwise return -1 and errno set to indicate the error. + // ETIME indicates that the operation timed out. ENOENT indicates + // that the sender does not retain the data for this message anymore. + // + virtual ssize_t + size (ACE_Time_Value const& timeout); + + public: + // Reactor interface. Note that the handle returned by get_handle() + // is for signalling purposes only. + // + ACE_HANDLE + get_handle (); + + private: + ACE_Auto_Ptr<Socket_Impl> impl_; + }; +} + + +#endif // ACE_RMCAST_SOCKET_H diff --git a/ACE/protocols/ace/RMCast/Stack.cpp b/ACE/protocols/ace/RMCast/Stack.cpp new file mode 100644 index 00000000000..19d592f5c21 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Stack.cpp @@ -0,0 +1,71 @@ +// file : ace/RMCast/Stack.cpp +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#include "Stack.h" + +namespace ACE_RMCast +{ + // Out_Element + // + + Out_Element:: + ~Out_Element () + { + } + + Out_Element:: + Out_Element () + : out_ (0) + { + } + + void Out_Element:: + out_start (Out_Element* out) + { + out_ = out; + } + + void Out_Element::send (Message_ptr m) + { + if (out_) out_->send (m); + } + + void Out_Element:: + out_stop () + { + out_ = 0; + } + + + // In_Element + // + + In_Element:: + ~In_Element () + { + } + + In_Element:: + In_Element () + : in_ (0) + { + } + + void In_Element:: + in_start (In_Element* in) + { + in_ = in; + } + + void In_Element::recv (Message_ptr m) + { + if (in_) in_->recv (m); + } + + void In_Element:: + in_stop () + { + in_ = 0; + } +} diff --git a/ACE/protocols/ace/RMCast/Stack.h b/ACE/protocols/ace/RMCast/Stack.h new file mode 100644 index 00000000000..322fe0dc501 --- /dev/null +++ b/ACE/protocols/ace/RMCast/Stack.h @@ -0,0 +1,59 @@ +// file : ace/RMCast/Stack.h +// author : Boris Kolpackov <boris@kolpackov.net> +// cvs-id : $Id$ + +#ifndef ACE_RMCAST_STACK_H +#define ACE_RMCAST_STACK_H + +#include "Protocol.h" + +namespace ACE_RMCast +{ + struct Out_Element + { + virtual + ~Out_Element (); + + Out_Element (); + + virtual void + out_stop (); + + virtual void + out_start (Out_Element* out); + + virtual void + send (Message_ptr m); + + protected: + Out_Element* out_; + }; + + + struct In_Element + { + virtual + ~In_Element (); + + In_Element (); + + virtual void + in_stop (); + + virtual void + in_start (In_Element* in); + + virtual void + recv (Message_ptr m); + + protected: + In_Element* in_; + }; + + + struct Element : In_Element, Out_Element + { + }; +} + +#endif // ACE_RMCAST_STACK_H diff --git a/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in new file mode 100644 index 00000000000..a56956f81b2 --- /dev/null +++ b/ACE/protocols/ace/TMCast/ACE_TMCast.pc.in @@ -0,0 +1,11 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: ACE_TMCast +Description: ACE Transaction Multicast Library +Requires: ACE +Version: @VERSION@ +Libs: -L${libdir} -lACE_TMCast +Cflags: -I${includedir} diff --git a/ACE/protocols/ace/TMCast/Export.hpp b/ACE/protocols/ace/TMCast/Export.hpp new file mode 100644 index 00000000000..bf04f7ee114 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Export.hpp @@ -0,0 +1,58 @@ + +// -*- C++ -*- +// $Id$ +// Definition for Win32 Export directives. +// This file is generated automatically by generate_export_file.pl ACE_TMCast +// ------------------------------ +#ifndef TMCAST_EXPORT_H +#define TMCAST_EXPORT_H + +#include "ace/config-all.h" + +#if defined (ACE_AS_STATIC_LIBS) && !defined (TMCAST_HAS_DLL) +# define TMCAST_HAS_DLL 0 +#endif /* ACE_AS_STATIC_LIBS && TMCAST_HAS_DLL */ + +#if !defined (TMCAST_HAS_DLL) +#define TMCAST_HAS_DLL 1 +#endif /* ! TMCAST_HAS_DLL */ + +#if defined (TMCAST_HAS_DLL) && (TMCAST_HAS_DLL == 1) +# if defined (TMCAST_BUILD_DLL) +# define ACE_TMCast_Export ACE_Proper_Export_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_EXPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_EXPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# else /* TMCAST_BUILD_DLL */ +# define ACE_TMCast_Export ACE_Proper_Import_Flag +# define TMCAST_SINGLETON_DECLARATION(T) ACE_IMPORT_SINGLETON_DECLARATION (T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) ACE_IMPORT_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +# endif /* TMCAST_BUILD_DLL */ +#else /* TMCAST_HAS_DLL == 1 */ +# define ACE_TMCast_Export +# define TMCAST_SINGLETON_DECLARATION(T) +# define TMCAST_SINGLETON_DECLARE(SINGLETON_TYPE, CLASS, LOCK) +#endif /* TMCAST_HAS_DLL == 1 */ + +// Set TMCAST_NTRACE = 0 to turn on library specific tracing even if +// tracing is turned off for ACE. +#if !defined (TMCAST_NTRACE) +# if (ACE_NTRACE == 1) +# define TMCAST_NTRACE 1 +# else /* (ACE_NTRACE == 1) */ +# define TMCAST_NTRACE 0 +# endif /* (ACE_NTRACE == 1) */ +#endif /* !TMCAST_NTRACE */ + +#if (TMCAST_NTRACE == 1) +# define TMCAST_TRACE(X) +#else /* (TMCAST_NTRACE == 1) */ +# if !defined (ACE_HAS_TRACE) +# define ACE_HAS_TRACE +# endif /* ACE_HAS_TRACE */ +# define TMCAST_TRACE(X) ACE_TRACE_IMPL(X) +# include "ace/Trace.h" +#endif /* (TMCAST_NTRACE == 1) */ + +#endif /* TMCAST_EXPORT_H */ + +// End of auto generated file. diff --git a/ACE/protocols/ace/TMCast/FaultDetector.hpp b/ACE/protocols/ace/TMCast/FaultDetector.hpp new file mode 100644 index 00000000000..d91540ba6f6 --- /dev/null +++ b/ACE/protocols/ace/TMCast/FaultDetector.hpp @@ -0,0 +1,45 @@ +// file : ACE_TMCast/FaultDetector.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + class FaultDetector + { + public: + FaultDetector () + : alone_ (true), silence_period_ (-1) + { + } + + public: + class Failed {}; + + + void + insync () + { + if (alone_) + alone_ = false; + + silence_period_ = 0; + } + + void + outsync () + { + if (!alone_ && ++silence_period_ >= Protocol::FATAL_SILENCE_FRAME) + { + // cerr << "Silence period has been passed." << endl; + // cerr << "Decalring the node failed." << endl; + throw Failed (); + } + } + + private: + bool alone_; // true if we haven't heard from any members yet. + short silence_period_; + }; +} diff --git a/ACE/protocols/ace/TMCast/Group.cpp b/ACE/protocols/ace/TMCast/Group.cpp new file mode 100644 index 00000000000..1165aa34e98 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Group.cpp @@ -0,0 +1,506 @@ +// file : ACE_TMCast/Group.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Group.hpp" + +#include <typeinfo> + +// OS primitives +#include <ace/OS.h> +#include <ace/OS_NS_stdlib.h> +#include <ace/Synch.h> +#include <ace/Time_Value.h> +#include <ace/SOCK_Dgram_Mcast.h> + +#include "Messaging.hpp" + +#include "Protocol.hpp" + +// Components + +#include "LinkListener.hpp" +#include "FaultDetector.hpp" +#include "TransactionController.hpp" + +namespace ACE_TMCast +{ + bool + operator== (std::type_info const* pa, std::type_info const& b) + { + return *pa == b; + } + + // + // + // + class Terminate : public virtual Message {}; + + + // + // + // + class Failure : public virtual Message {}; + + + // + // + // + class Scheduler + { + public: + Scheduler (ACE_INET_Addr const& addr, + char const* id, + MessageQueue& out_send_data, + MessageQueue& out_recv_data, + MessageQueue& out_control) + + : cond_ (mutex_), + + addr_ (addr), + sock_ (), + + out_control_ (out_control), + + in_data_ (mutex_), + in_link_data_(mutex_), + in_control_ (mutex_), + + sync_schedule (ACE_OS::gettimeofday ()), + + transaction_controller_ (in_data_, out_send_data, out_recv_data) + { + ACE_OS::strncpy (id_, id, Protocol::MEMBER_ID_LENGTH); + id_[Protocol::MEMBER_ID_LENGTH - 1] = '\0'; + + sock_.set_option (IP_MULTICAST_TTL, 32); // @@ ttl is hardcoded + + in_data_.subscribe (cond_); + in_link_data_.subscribe (cond_); + in_control_.subscribe (cond_); + + ACE_thread_t unused; + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &unused, + &thread_) != 0) ACE_OS::abort (); + } + + virtual ~Scheduler () + { + { + MessageQueueAutoLock lock (in_control_); + + in_control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); + + // cerr << "Scheduler is down." << endl; + } + + public: + MessageQueue& + in_data () + { + return in_data_; + } + + private: + static ACE_THR_FUNC_RETURN + thread_thunk (void* arg) + { + Scheduler* obj = reinterpret_cast<Scheduler*> (arg); + obj->execute (); + return 0; + } + + void + execute () + { + try + { + sock_.join (addr_); + auto_ptr<LinkListener> ll (new LinkListener (sock_, in_link_data_)); + + { + AutoLock lock (mutex_); + + // Loop + // + // + + while (true) + { + cond_.wait (&sync_schedule); + + // "Loop of Fairness" + + bool done = false; + + do + { + // control message + // + // + if (!in_control_.empty ()) + { + done = true; + break; + } + + // outsync + // + // + if (sync_schedule < ACE_OS::gettimeofday ()) + { + // OUTSYNC + + outsync (); + + // schedule next outsync + sync_schedule = + ACE_OS::gettimeofday () + + ACE_Time_Value (0, Protocol::SYNC_PERIOD); + } + + // link message + // + // + if (!in_link_data_.empty ()) + { + MessagePtr m (in_link_data_.front ()); + in_link_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (LinkFailure)) + { + // cerr << "link failure" << endl; + throw false; + } + else if (exp == typeid (LinkData)) + { + + LinkData* data = dynamic_cast<LinkData*> (m.get ()); + + // INSYNC, TL, CT + + // Filter out loopback. + // + if (ACE_OS::strcmp (data->header().member_id.id, id_) != 0) + { + insync (); + transaction_list (); + current_transaction (data->header().current, + data->payload (), + data->size ()); + } + } + else + { + // cerr << "unknown message type from link listener: " + // << typeid (*m).name () << endl; + ACE_OS::abort (); + } + } + + // api message + // + // + if (!in_data_.empty ()) + { + // API + + api (); + } + + } while (!in_link_data_.empty() || + sync_schedule < ACE_OS::gettimeofday ()); + + if (done) break; + } + } + } + catch (...) + { + // cerr << "Exception in scheduler loop." << endl; + MessageQueueAutoLock lock (out_control_); + out_control_.push (MessagePtr (new Failure)); + } + } + + // Events + // + // Order: + // + // INSYNC, TSL, VOTE, BEGIN + // API + // OUTSYNC + // + + void + insync () + { + fault_detector_.insync (); + } + + void + outsync () + { + char buf[Protocol::MAX_MESSAGE_SIZE]; + + Protocol::MessageHeader* hdr = + reinterpret_cast<Protocol::MessageHeader*> (buf); + + void* data = buf + sizeof (Protocol::MessageHeader); + + hdr->length = sizeof (Protocol::MessageHeader); + hdr->check_sum = 0; + + ACE_OS::strcpy (hdr->member_id.id, id_); + + size_t size (0); + + transaction_controller_.outsync (hdr->current, data, size); + + hdr->length += size; + + fault_detector_.outsync (); + + // sock_.send (buf, hdr->length, addr_); + sock_.send (buf, hdr->length); + } + + void + transaction_list () + { + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + transaction_controller_.current_transaction (t, payload, size); + } + + void + api () + { + transaction_controller_.api (); + } + + private: + ACE_hthread_t thread_; + + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + char id_[Protocol::MEMBER_ID_LENGTH]; + + ACE_INET_Addr addr_; + ACE_SOCK_Dgram_Mcast sock_; + + MessageQueue& out_control_; + + MessageQueue in_data_; + MessageQueue in_link_data_; + MessageQueue in_control_; + + // Protocol state + // + // + + ACE_Time_Value sync_schedule; + + FaultDetector fault_detector_; + TransactionController transaction_controller_; + }; + + + // + // + // + class Group::GroupImpl + { + public: + virtual ~GroupImpl () + { + } + + GroupImpl (ACE_INET_Addr const& addr, char const* id) + : send_cond_ (mutex_), + recv_cond_ (mutex_), + failed_ (false), + in_send_data_ (mutex_), + in_recv_data_ (mutex_), + in_control_ (mutex_), + scheduler_ (new Scheduler (addr, + id, + in_send_data_, + in_recv_data_, + in_control_)), + out_data_ (scheduler_->in_data ()) + { + in_send_data_.subscribe (send_cond_); + in_recv_data_.subscribe (recv_cond_); + + in_control_.subscribe (send_cond_); + in_control_.subscribe (recv_cond_); + } + + //FUZZ: disable check_for_lack_ACE_OS + void send (void const* msg, size_t size) + { + //FUZZ: enable check_for_lack_ACE_OS + + if (size > Protocol::MAX_PAYLOAD_SIZE) throw InvalidArg (); + + // Note the potential deadlock if I lock mutex_ and out_data_ in + // reverse order. + + MessageQueueAutoLock l1 (out_data_); + AutoLock l2 (mutex_); + + throw_if_failed (); + + out_data_.push (MessagePtr (new Send (msg, size))); + + l1.unlock (); // no need to keep it locked + + while (true) + { + throw_if_failed (); + + if (!in_send_data_.empty ()) + { + MessagePtr m (in_send_data_.front ()); + in_send_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (ACE_TMCast::Aborted)) + { + throw Group::Aborted (); + } + else if (exp == typeid (Commited)) + { + return; + } + else + { + // cerr << "send: group-scheduler messaging protocol violation; " + // << "unexpected message " << typeid (*m).name () + // << " " << typeid (Aborted).name () << endl; + + ACE_OS::abort (); + } + } + + // cerr << "send: waiting on condition" << endl; + send_cond_.wait (); + // cerr << "send: wokeup on condition" << endl; + } + } + + //FUZZ: disable check_for_lack_ACE_OS + size_t recv (void* msg, size_t size) + { + //FUZZ: enable check_for_lack_ACE_OS + + AutoLock lock (mutex_); + + while (true) + { + throw_if_failed (); + + if (!in_recv_data_.empty ()) + { + MessagePtr m (in_recv_data_.front ()); + in_recv_data_.pop (); + + std::type_info const* exp = &typeid (*m); + + if (exp == typeid (Recv)) + { + Recv* data = dynamic_cast<Recv*> (m.get ()); + + if (size < data->size ()) + throw Group::InsufficienSpace (); + + ACE_OS::memcpy (msg, data->payload (), data->size ()); + + return data->size (); + } + else + { + // cerr << "recv: group-scheduler messaging protocol violation. " + // << "unexpected message " << typeid (*m).name () << endl; + + ACE_OS::abort (); + } + } + + recv_cond_.wait (); + } + } + + private: + void + throw_if_failed () + { + if (!failed_ && !in_control_.empty ()) failed_ = true; + + if (failed_) throw Group::Failed (); + } + + private: + ACE_Thread_Mutex mutex_; + ACE_Condition<ACE_Thread_Mutex> send_cond_; + ACE_Condition<ACE_Thread_Mutex> recv_cond_; + + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool failed_; + + MessageQueue in_send_data_; + MessageQueue in_recv_data_; + MessageQueue in_control_; + + auto_ptr<Scheduler> scheduler_; + + MessageQueue& out_data_; + }; + + + // Group + // + // + Group:: + Group (ACE_INET_Addr const& addr, char const* id) + : pimpl_ (new GroupImpl (addr, id)) + { + } + + Group:: + ~Group () + { + } + + void + Group::send (void const* msg, size_t size) + { + pimpl_->send (msg, size); + } + + size_t + Group::recv (void* msg, size_t size) + { + return pimpl_->recv (msg, size); + } +} + diff --git a/ACE/protocols/ace/TMCast/Group.hpp b/ACE/protocols/ace/TMCast/Group.hpp new file mode 100644 index 00000000000..1ec39ca4f22 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Group.hpp @@ -0,0 +1,51 @@ +// file : ACE_TMCast/Group.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_HPP +#define TMCAST_GROUP_HPP + +#include <ace/Auto_Ptr.h> +#include <ace/INET_Addr.h> + +#include "Export.hpp" + +namespace ACE_TMCast +{ + class ACE_TMCast_Export Group + { + public: + class Aborted {}; + class Failed {}; + class InvalidArg {}; + class InsufficienSpace {}; + + public: + ~Group (); + + Group (ACE_INET_Addr const& addr, char const* id); + + public: + void + send (void const* msg, size_t size); + + size_t + recv (void* msg, size_t size); + + private: + bool + failed (); + + private: + class GroupImpl; + auto_ptr<GroupImpl> pimpl_; + + private: + Group (Group const&); + + Group& + operator= (Group const&); + }; +} + +#endif // TMCAST_GROUP_HPP diff --git a/ACE/protocols/ace/TMCast/GroupFwd.hpp b/ACE/protocols/ace/TMCast/GroupFwd.hpp new file mode 100644 index 00000000000..d224586545e --- /dev/null +++ b/ACE/protocols/ace/TMCast/GroupFwd.hpp @@ -0,0 +1,15 @@ +// file : ACE_TMCast/GroupFwd.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_GROUP_FWD_HPP +#define TMCAST_GROUP_FWD_HPP + +#include "Export.hpp" + +namespace ACE_TMCast +{ + class Group; +} + +#endif // TMCAST_GROUP_FWD_HPP diff --git a/ACE/protocols/ace/TMCast/LinkListener.hpp b/ACE/protocols/ace/TMCast/LinkListener.hpp new file mode 100644 index 00000000000..983f7828f3e --- /dev/null +++ b/ACE/protocols/ace/TMCast/LinkListener.hpp @@ -0,0 +1,171 @@ +// file : ACE_TMCast/LinkListener.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +// OS primitives +#include <ace/OS_NS_string.h> +#include <ace/OS_NS_stdlib.h> +#include <ace/Synch.h> +#include <ace/SOCK_Dgram_Mcast.h> +#include <ace/Refcounted_Auto_Ptr.h> + + +#include "Messaging.hpp" +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + // + // + // + class LinkFailure : public virtual Message {}; + + + // + // + // + class LinkData : public virtual Message + { + public: + LinkData (Protocol::MessageHeader const* header, + void* payload, + size_t size) + : size_ (size) + { + ACE_OS::memcpy (&header_, header, sizeof (Protocol::MessageHeader)); + ACE_OS::memcpy (payload_, payload, size_); + } + + Protocol::MessageHeader const& + header () const + { + return header_; + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + Protocol::MessageHeader header_; + char payload_[Protocol::MAX_MESSAGE_SIZE]; + size_t size_; + }; + + typedef + ACE_Refcounted_Auto_Ptr<LinkData, ACE_Null_Mutex> + LinkDataPtr; + + // + // + // + class LinkListener + { + private: + class Terminate : public virtual Message {}; + + public: + LinkListener (ACE_SOCK_Dgram_Mcast& sock, MessageQueue& out) + : sock_(sock), out_ (out) + { + ACE_thread_t unused; + if (ACE_OS::thr_create (&thread_thunk, + this, + THR_JOINABLE, + &unused, + &thread_) != 0) ACE_OS::abort (); + } + + ~LinkListener () + { + { + MessageQueueAutoLock lock (control_); + + control_.push (MessagePtr (new Terminate)); + } + + if (ACE_OS::thr_join (thread_, 0) != 0) ACE_OS::abort (); + + // cerr << "Link listener is down." << endl; + } + + + static ACE_THR_FUNC_RETURN + thread_thunk (void* arg) + { + LinkListener* obj = reinterpret_cast<LinkListener*> (arg); + + obj->execute (); + return 0; + } + + void + execute () + { + char msg[Protocol::MAX_MESSAGE_SIZE]; + + ssize_t header_size = sizeof (Protocol::MessageHeader); + + // OS::Time timeout (1000000); // one millisecond + + ACE_Time_Value timeout (0, 1000); // one millisecond + + try + { + while (true) + { + // Check control message queue + + { + MessageQueueAutoLock lock (control_); + + if (!control_.empty ()) break; + } + + ACE_INET_Addr junk; + ssize_t n = sock_.recv (msg, + Protocol::MAX_MESSAGE_SIZE, + junk, + 0, + &timeout); + + if (n != -1) + { + if (n < header_size) throw false; + + Protocol::MessageHeader* header = + reinterpret_cast<Protocol::MessageHeader*> (msg); + + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkData (header, + msg + header_size, + n - header_size))); + } + } + } + catch (...) + { + MessageQueueAutoLock lock (out_); + + out_.push (MessagePtr (new LinkFailure)); + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + ACE_hthread_t thread_; + ACE_SOCK_Dgram_Mcast& sock_; + MessageQueue& out_; + MessageQueue control_; + }; +} diff --git a/ACE/protocols/ace/TMCast/MTQueue.cpp b/ACE/protocols/ace/TMCast/MTQueue.cpp new file mode 100644 index 00000000000..9f08db5843d --- /dev/null +++ b/ACE/protocols/ace/TMCast/MTQueue.cpp @@ -0,0 +1,7 @@ +// file : ACE_TMCast/MTQueue.cpp +// author : Steve Huston <shuston@riverace.com> +// cvs-id : $Id$ + +#include "LinkListener.hpp" +#include "MTQueue.hpp" + diff --git a/ACE/protocols/ace/TMCast/MTQueue.hpp b/ACE/protocols/ace/TMCast/MTQueue.hpp new file mode 100644 index 00000000000..2eb128823fe --- /dev/null +++ b/ACE/protocols/ace/TMCast/MTQueue.hpp @@ -0,0 +1,176 @@ +// file : ACE_TMCast/MTQueue.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MT_QUEUE_HPP +#define TMCAST_MT_QUEUE_HPP + +#include "ace/Auto_Ptr.h" +#include "ace/Unbounded_Set.h" +#include "ace/Unbounded_Queue.h" +#include "ace/os_include/sys/os_types.h" + +namespace ACE_TMCast +{ + template <typename T, + typename M, + typename C, + typename Q = ACE_Unbounded_Queue<T> > + class MTQueue + { + public: + typedef T ElementType; + typedef M MutexType; + typedef C ConditionalType; + typedef Q QueueType; + + public: + + MTQueue () + : mutexp_ (new MutexType), + mutex_ (*mutexp_), + queue_ (), + signal_ (false) + { + } + + MTQueue (MutexType& mutex) + : mutexp_ (), + mutex_ (mutex), + queue_ (), + signal_ (false) + { + } + + public: + bool + empty () const + { + return queue_.is_empty (); + } + + size_t + size () const + { + return queue_.size (); + } + + // typedef typename QueueType::Empty Empty; + + class Empty {}; + + T& + front () + { + ACE_Unbounded_Queue_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + + T const& + front () const + { + ACE_Unbounded_Queue_Const_Iterator<T> f (queue_); + T* tmp; + if (!f.next (tmp)) throw Empty (); + + return *tmp; + } + + /* + T& + back () + { + return queue_.back (); + } + + + T const& + back () const + { + return queue_.back (); + } + */ + + void + push (T const& t) + { + signal_ = empty (); + queue_.enqueue_tail (t); + } + + void + pop () + { + T junk; + queue_.dequeue_head (junk); + } + + public: + void + lock () const + { + mutex_.acquire (); + } + + void + unlock () const + { + if (signal_) + { + signal_ = false; + + for (ConditionalSetConstIterator_ i (cond_set_); + !i.done (); + i.advance ()) + { + ConditionalType** c = 0; + + i.next (c); + + (*c)->signal (); + } + } + + mutex_.release (); + } + + void + subscribe (ConditionalType& c) + { + //@@ should check for duplicates + // + cond_set_.insert (&c); + } + + void + unsubscribe (ConditionalType& c) + { + //@@ should check for absence + // + cond_set_.remove (&c); + } + + private: + auto_ptr<MutexType> mutexp_; + MutexType& mutex_; + QueueType queue_; + + typedef + ACE_Unbounded_Set<ConditionalType*> + ConditionalSet_; + + typedef + ACE_Unbounded_Set_Const_Iterator<ConditionalType*> + ConditionalSetConstIterator_; + + ConditionalSet_ cond_set_; + + mutable bool signal_; + }; +} + +#endif // TMCAST_MT_QUEUE_HPP diff --git a/ACE/protocols/ace/TMCast/Makefile.am b/ACE/protocols/ace/TMCast/Makefile.am new file mode 100644 index 00000000000..c524083fd92 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Makefile.am @@ -0,0 +1,76 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## ./bin/mwc.pl -type automake -noreldefs ACE.mwc + +includedir = @includedir@/ace/TMCast +pkgconfigdir = @libdir@/pkgconfig + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.TMCast.am + +if BUILD_EXCEPTIONS +if BUILD_THREADS +if !BUILD_ACE_FOR_TAO + +lib_LTLIBRARIES = libACE_TMCast.la + +libACE_TMCast_la_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) \ + -DTMCAST_BUILD_DLL + +libACE_TMCast_la_SOURCES = \ + Group.cpp \ + MTQueue.cpp \ + Protocol.cpp + +libACE_TMCast_la_LDFLAGS = \ + -release @ACE_VERSION_NAME@ + +libACE_TMCast_la_LIBADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +nobase_include_HEADERS = \ + Export.hpp \ + FaultDetector.hpp \ + Group.hpp \ + GroupFwd.hpp \ + LinkListener.hpp \ + MTQueue.hpp \ + Messaging.hpp \ + Protocol.hpp \ + TransactionController.hpp + +pkgconfig_DATA = \ + ACE_TMCast.pc + +CLEANFILES = \ + ACE_TMCast.pc + +ACE_TMCast.pc: ${top_builddir}/config.status ${srcdir}/ACE_TMCast.pc.in + ${top_builddir}/config.status --file $@:${srcdir}/ACE_TMCast.pc.in + +endif !BUILD_ACE_FOR_TAO +endif BUILD_THREADS +endif BUILD_EXCEPTIONS + +EXTRA_DIST = \ + ACE_TMCast.pc.in + + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/protocols/ace/TMCast/Messaging.hpp b/ACE/protocols/ace/TMCast/Messaging.hpp new file mode 100644 index 00000000000..f17dd40b03c --- /dev/null +++ b/ACE/protocols/ace/TMCast/Messaging.hpp @@ -0,0 +1,54 @@ +// file : ACE_TMCast/Messaging.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_MESSAGING_HPP +#define TMCAST_MESSAGING_HPP + +#include <ace/Synch.h> +#include <ace/Bound_Ptr.h> + +#include "MTQueue.hpp" + +namespace ACE_TMCast +{ + class Message + { + public: + virtual + ~Message () {} + }; + + typedef + ACE_Strong_Bound_Ptr<Message, ACE_SYNCH_MUTEX> + MessagePtr; + + typedef + MTQueue<MessagePtr, ACE_Thread_Mutex, ACE_Condition<ACE_Thread_Mutex> > + MessageQueue; + + struct MessageQueueAutoLock + { + MessageQueueAutoLock (MessageQueue& q) + : q_ (q) + { + q_.lock (); + } + + void + unlock () + { + q_.unlock (); + } + + ~MessageQueueAutoLock () + { + q_.unlock (); + } + + private: + MessageQueue& q_; + }; +} + +#endif // TMCAST_MESSAGING_HPP diff --git a/ACE/protocols/ace/TMCast/Protocol.cpp b/ACE/protocols/ace/TMCast/Protocol.cpp new file mode 100644 index 00000000000..ea4c6b39020 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Protocol.cpp @@ -0,0 +1,31 @@ +// file : ACE_TMCast/Protocol.cpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "Protocol.hpp" + +namespace ACE_TMCast +{ + namespace Protocol + { + /* + namespace + { + char const* labels[] = { + "NONE", "BEGIN", "COMMIT", "ABORT", "COMMITED", "ABORTED"}; + } + + std::string + tslabel (Protocol::TransactionStatus s) + { + return labels[s]; + } + + std::ostream& + operator << (std::ostream& o, Transaction const& t) + { + return o << "{" << t.id << "; " << tslabel (t.status) << "}"; + } + */ + } +} diff --git a/ACE/protocols/ace/TMCast/Protocol.hpp b/ACE/protocols/ace/TMCast/Protocol.hpp new file mode 100644 index 00000000000..6cdf374f4f9 --- /dev/null +++ b/ACE/protocols/ace/TMCast/Protocol.hpp @@ -0,0 +1,107 @@ +// file : ACE_TMCast/Protocol.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#ifndef TMCAST_PROTOCOL_HPP +#define TMCAST_PROTOCOL_HPP + +namespace ACE_TMCast +{ + namespace Protocol + { + // + // + // + unsigned long const MEMBER_ID_LENGTH = 38; + + struct MemberId + { + char id[MEMBER_ID_LENGTH]; + /* + unsigned long ip; + unsigned short port; + */ + }; + + // + // + // + typedef unsigned short TransactionId; + + + + typedef unsigned char TransactionStatus; + + TransactionStatus const TS_BEGIN = 1; + TransactionStatus const TS_COMMIT = 2; + TransactionStatus const TS_ABORT = 3; + TransactionStatus const TS_COMMITED = 4; + TransactionStatus const TS_ABORTED = 5; + + struct Transaction + { + TransactionId id; + TransactionStatus status; + }; + + // Transaction List (TL) + + // unsigned long const TL_LENGTH = 1; + + // typedef Transaction TransactionList[TL_LENGTH]; + + + // + // + // + struct MessageHeader + { + unsigned long length; + + unsigned long check_sum; + + MemberId member_id; + + Transaction current; + + //TransactionList transaction_list; + }; + + + // + // + // + + unsigned long const MAX_MESSAGE_SIZE = 768; + + unsigned long const + MAX_PAYLOAD_SIZE = MAX_MESSAGE_SIZE - sizeof (MessageHeader); + + // Protocol timing + // + // + + unsigned long const SYNC_PERIOD = 30000; // in mks + + unsigned short const VOTING_FRAME = 4; // in SYNC_PERIOD's + unsigned short const SEPARATION_FRAME = 5; // in SYNC_PERIOD's + + // FATAL_SILENCE_FRAME in SYNC_PERIOD's + // Generally it's a good idea to set it to < VOTING_FRAME + SEPARATION_FRAME + // + + short const FATAL_SILENCE_FRAME = VOTING_FRAME + SEPARATION_FRAME - 2; + + // short const FATAL_SILENCE_FRAME = 10000; + + // Helpers + + // std::string + // tslabel (Protocol::TransactionStatus s); + + // std::ostream& + // operator << (std::ostream& o, Transaction const& t); + } +} + +#endif // TMCAST_PROTOCOL_HPP diff --git a/ACE/protocols/ace/TMCast/README b/ACE/protocols/ace/TMCast/README new file mode 100644 index 00000000000..7104be46e30 --- /dev/null +++ b/ACE/protocols/ace/TMCast/README @@ -0,0 +1,240 @@ + + +Introduction +------------ + +TMCast (stands for Transaction MultiCast) is an implementation of a +transactional multicast protocol. In essence, the idea is to represent +each message delivery to members of a multicast group as a transaction +- an atomic, consistent and isolated action. A multicast transaction +can be viewed as an atomic transition of the group members to a new +state. If we define [Mo] as a set of operational (non-faulty) members +of the group, [Mf] as a set of faulty members of the group, [Ma] as a +set of members that view transition [Tn] as aborted and [Mc] as a set +of members that view transition [Tn] as committed, then this atomic +transition [Tn] should satisfy one of the following equations: + +Mo(Tn-1) = Ma(T) U Mf(T) +Mo(Tn-1) = Mc(T) U Mf(T) + +Or, in other words, after transaction T has been committed (aborted), +all operational (before transaction T) members are either in the +committed (aborted) or failed state. + +Thus, for each member of the group, outcome of the transaction can be +commit, abort or a member failure. It is important for a member to +exhibit a failfast (error latency is less than transaction cycle) +behavior. Or, in other words, if a member transitioned into a wrong +state, it is guaranteed to fail instead of delivering a wrong result. + +In order to achieve such an error detection in a decentralized +environment, certain limitations were imposed. One of the most +user-visible limitation is the fact that the lifetime of the group +with only one member is very short. This is because there is not way +for a member to distinguish "no members yet" case from "my link to the +group is down". In such a situation, the member assumes the latter +case. There is also a military saying that puts it quite nicely: two +is one, one is nothing. + + + +State of Implementation +----------------------- + +The current implementation is in a prototypical stage. The following +parts are not implemented or still under development: + +* Handling of network partitioning (TODO) + +* Redundant network support (TODO) + +* Member failure detection (partial implementation) + + +Examples +-------- + +There is a simple example available in examples/TMCast/Member with +the corresponding README. + + +Architecture +------------ + +Primary goals of the protocol are to (1) mask transient failures of the +underlying multicast protocol (or, more precisely, allow to recover +from transient failures) and (2) exhibit failfast behavior in cases of +permanent failures. + +The distinction between transient and permanent failures is based on +timeouts thus what can be a transient failure in one configuration of +the protocol could be a permanent failure in the other. + +[Maybe talk more about a transient/permanent threshold and its effect +on performance/resource utilization/etc.] + +[Maybe add a terminology section.] + +Each member of a multicast group has its unique (group-wise) id: + +struct MemberId +{ + char id[MEMBER_ID_LENGTH]; +}; + +Each payload delivery is part of a transaction. Each transaction is +identified by TransactionId: + +typedef unsigned short TransactionId; + + +Each transaction has a status code which identifies its state, as viewed by +a group member: + + +typedef unsigned char TransactionStatus; + +TransactionStatus const TS_BEGIN = 1; +TransactionStatus const TS_COMMIT = 2; +TransactionStatus const TS_ABORT = 3; +TransactionStatus const TS_COMMITTED = 4; +TransactionStatus const TS_ABORTED = 5; + +Thus each transaction is described by its id and status: + +struct Transaction +{ + TransactionId id; + TransactionStatus status; +}; + +The outcome of some predefined number of recent transactions is stored +in TransactionList: + +typedef Transaction TransactionList[TL_LENGTH]; + + +Each message sent to a multicast group has the following header: + +struct MessageHeader +{ + unsigned long length; + unsigned long check_sum; + MemberId member_id; + Transaction current; + TransactionList transaction_list; +}; + +[Maybe describe each field here.] + +A new member joins the group with transaction id 0 and status +TS_COMMITTED. + +Each member sends a periodic 'pulse' messages with some predefined interval +advertising its current view of the group. This includes the state of the +current transaction and the history of the recent transactions. + + +If a member of the group needs a payload delivery it starts a new +transaction by sending a message with current transaction set to + +{++current_id, TS_BEGIN} + +and payload appended after the header. + + +Each member joins a transaction in one of the following ways: + +* A member that began the transaction joins it 'to commit' (TS_COMMIT) + +* A member that received TS_BEGIN joins current transaction 'to commit' + (TS_COMMIT). + +* A member that received TS_COMMIT or TS_ABORT but did not receive TS_BEGIN + joins current transaction 'to abort' (TS_ABORT). + + +After a member has joined the transaction it starts participating in the +transaction's voting phase. On this phase members of the group decide the +fate of the transaction. Each member sends a predefined number of messages +where it announces its vote. In between those messages the member is receiving +and processing votes from other members and can be influenced by their +'opinion'. + +In their decision-making members follow the principle of the majority. As +the voting progresses (and comes close to an end) members become more and +more reluctant to deviate from the decision of the majority. + +[Maybe add an equation that measures member's willingness to change +its mind.] + +At the end of the voting phase each member declares the current transaction +either committed (TS_COMMITTED) or aborted (TS_ABORTED). If this decision does +not agree with the majority the member declares itself failed. + +In addition, each member builds a 'majority view' of the transaction history +(based on transaction_list). If it deviates from the member's own history the +member declares itself failed. + +Here are some example scenarios of how the protocol behaves in different +situations. Let's say we have three members of the group S, R1, R2. S +initiates a transaction. R1 and R2 join it. + +Scenario 1. (two-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 receives TS_BEGIN, joins for commit +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 announces TS_COMMIT (first vote) +4a. S announces TS_COMMIT (second vote) +4b. R1 announces TS_COMMIT (second vote) +4c. R2 announces TS_COMMIT (second vote) +5a. S announces TS_COMMITTED (end of vote) +5b. R1 announces TS_COMMITTED (end of vote) +5c. R2 announces TS_COMMITTED (end of vote) + + +Scenario 2. (two-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 didn't receive TS_BEGIN +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 received R1's TS_COMMIT announces TS_ABORT (first vote) +4a. S received R2's TS_ABORT announces TS_ABORT (second vote) +4b. R1 received R2's TS_ABORT announces TS_ABORT (second vote) +4c. R2 announces TS_ABORT (second vote) +5a. S announces TS_ABORTED (end of vote) +5b. R1 announces TS_ABORTED (end of vote) +5c. R2 announces TS_ABORTED (end of vote) + + +Scenario 3. (three-step voting) + +1. S initiates a transaction (TS_BEGIN) +2a. R1 receives TS_BEGIN, joins for commit +2b. R2 didn't receive TS_BEGIN +3a. S announces TS_COMMIT (first vote) +3b. R1 announces TS_COMMIT (first vote) +3c. R2 still didn't receive anything +4a. S announces TS_COMMIT (second vote) +4b. R1 announces TS_COMMIT (second vote) +4c. R2 received R1's TS_COMMIT, announces TS_ABORT (first vote) + +5a. S received R2's TS_ABORT but it is the end of the voting phase and + majority (S and R1) vote for commit, announces TS_COMMIT (third vote) +5b. R1 received R2's TS_ABORT but it is the end of the voting phase and + majority (S and R1) vote for commit, announces TS_COMMIT (third vote) +5c. R2 announces TS_ABORT (second vote) + +6a. S announces TS_COMMITTED (end of vote) +6b. R1 announces TS_COMMITTED (end of vote) +6c. R2 discovers that the the majority has declared current transaction + committed and thus declares itself failed. + + +-- +Boris Kolpackov <boris@dre.vanderbilt.edu> diff --git a/ACE/protocols/ace/TMCast/TMCast.mpc b/ACE/protocols/ace/TMCast/TMCast.mpc new file mode 100644 index 00000000000..5d053de03a5 --- /dev/null +++ b/ACE/protocols/ace/TMCast/TMCast.mpc @@ -0,0 +1,12 @@ +// -*- MPC -*- +// $Id$ + +project : acelib, ace_output, install, exceptions, threads { + avoids = ace_for_tao + sharedname = ACE_TMCast + dynamicflags += TMCAST_BUILD_DLL + + Pkgconfig_Files { + ACE_TMCast.pc.in + } +} diff --git a/ACE/protocols/ace/TMCast/TransactionController.hpp b/ACE/protocols/ace/TMCast/TransactionController.hpp new file mode 100644 index 00000000000..0ed5002bdf1 --- /dev/null +++ b/ACE/protocols/ace/TMCast/TransactionController.hpp @@ -0,0 +1,388 @@ +// file : ACE_TMCast/TransactionController.hpp +// author : Boris Kolpackov <boris@dre.vanderbilt.edu> +// cvs-id : $Id$ + +#include "ace/OS_NS_string.h" +#include "ace/OS_NS_stdlib.h" +#include "ace/Synch.h" +#include "ace/Bound_Ptr.h" + +#include "Protocol.hpp" +#include "Messaging.hpp" + +#include <typeinfo> + +namespace ACE_TMCast +{ + + // Messages + // + // + class Send : public virtual Message + { + public: + Send (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Strong_Bound_Ptr<Send, ACE_SYNCH_MUTEX> + SendPtr; + + + class Recv : public virtual Message + { + public: + Recv (void const* msg, size_t size) + : size_ (size) + { + ACE_OS::memcpy (payload_, msg, size_); + } + + void const* + payload () const + { + return payload_; + } + + size_t + size () const + { + return size_; + } + + private: + size_t size_; + char payload_[Protocol::MAX_PAYLOAD_SIZE]; + }; + + typedef + ACE_Strong_Bound_Ptr<Recv, ACE_SYNCH_MUTEX> + RecvPtr; + + class Aborted : public virtual Message {}; + + class Commited : public virtual Message {}; + + + // + // + // + class TransactionController + { + public: + TransactionController (MessageQueue& in, + MessageQueue& send_out, + MessageQueue& recv_out) + : trace_ (false), + voting_duration_ (0), + separation_duration_ (0), + in_ (in), + send_out_ (send_out), + recv_out_ (recv_out) + { + current_.id = 0; + current_.status = Protocol::TS_COMMITED; + } + + public: + class Failure {}; + + + void + outsync (Protocol::Transaction& c, void* payload, size_t& size) + { + if (current_.status == Protocol::TS_COMMIT || + current_.status == Protocol::TS_ABORT) + { + if (++voting_duration_ >= Protocol::VOTING_FRAME) + { + // end of voting frame + + if (current_.status == Protocol::TS_COMMIT) + { + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Commited)); + } + else // joined transaction + { + MessageQueueAutoLock lock (recv_out_); + recv_out_.push (MessagePtr (recv_)); + recv_ = RecvPtr (); + } + } + + current_.status = Protocol::TS_COMMITED; + + // if (trace_) cerr << "commited transaction with id " + // << current_.id << endl; + } + else // TS_ABORT + { + if (initiated_) + { + MessageQueueAutoLock lock (send_out_); + send_out_.push (MessagePtr (new Aborted)); + } + else + { + // free revc_ buffer if necessary + // + if (recv_.get ()) recv_ = RecvPtr (); + } + + + current_.status = Protocol::TS_ABORTED; + + // if (trace_) cerr << "aborted transaction with id " + // << current_.id << endl; + } + + // start transaction separation frame (counts down) + // +1 because it will be decremented on this iteration + separation_duration_ = Protocol::SEPARATION_FRAME + 1; + } + } + + // Set current outsync info + + c.id = current_.id; + c.status = current_.status; + + + // Do some post-processing + + switch (current_.status) + { + case Protocol::TS_COMMITED: + case Protocol::TS_ABORTED: + { + if (separation_duration_ > 0) --separation_duration_; + break; + } + case Protocol::TS_BEGIN: + { + // transfer payload + + size = send_->size (); + memcpy (payload, send_->payload (), size); + + send_ = SendPtr (); + + // get redy to vote for 'commit' + + current_.status = Protocol::TS_COMMIT; + voting_duration_ = 0; + } + } + } + + void + current_transaction (Protocol::Transaction const& t, + void const* payload, + size_t size) + { + Protocol::TransactionId& id = current_.id; + Protocol::TransactionStatus& s = current_.status; + + if (id == 0 && t.id != 0) // catch up + { + switch (t.status) + { + case Protocol::TS_BEGIN: + case Protocol::TS_COMMIT: + case Protocol::TS_ABORT: + { + id = t.id - 1; + s = Protocol::TS_COMMITED; + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + id = t.id; + s = t.status; + break; + } + } + + // if (trace_) cerr << "caught up with id " << id << endl; + } + + bool stable (s == Protocol::TS_COMMITED || s == Protocol::TS_ABORTED); + + switch (t.status) + { + case Protocol::TS_BEGIN: + { + if (!stable || t.id != id + 1) + { + // Transaction is in progress or hole in transaction id's + + // cerr << "unexpected request to join " << t + // << " while on " << current_ << endl; + + // if (!stable) cerr << "voting progress is " << voting_duration_ + // << "/" << Protocol::VOTING_FRAME << endl; + + if (t.id == id) // collision + { + if (!stable && s != Protocol::TS_ABORT) + { + // abort both + // cerr << "aborting both transactions" << endl; + + s = Protocol::TS_ABORT; + voting_duration_ = 0; //@@ reset voting frame + } + } + else + { + // @@ delicate case. need to think more + + // cerr << "Declaring node failed." << endl; + throw Failure (); + } + } + else + { + // join the transaction + + initiated_ = false; + + recv_ = RecvPtr (new Recv (payload, size)); + + id = t.id; + s = Protocol::TS_COMMIT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-commit transaction with id " + // << id << endl; + } + break; + } + case Protocol::TS_COMMIT: + { + if (stable && id == t.id - 1) + { + // not begin and and we haven't joined + + // join for abort + + initiated_ = false; + + current_.id = t.id; + current_.status = Protocol::TS_ABORT; + voting_duration_ = 0; + + // if (trace_) cerr << "joining-for-abort transaction with id " + // << current_.id << endl; + } + break; + } + case Protocol::TS_ABORT: + { + if ((!stable && id == t.id && s == Protocol::TS_COMMIT) || + (stable && id == t.id - 1)) // abort current || new transaction + { + // if (trace_) cerr << "voting-for-abort on transaction with id " + // << current_.id << endl; + + id = t.id; + s = Protocol::TS_ABORT; + + voting_duration_ = 0; //@@ reseting voting_duration_ + } + else + { + } + + break; + } + case Protocol::TS_ABORTED: + case Protocol::TS_COMMITED: + { + // nothing for now + break; + } + } + } + + void + api () + { + if ((current_.status == Protocol::TS_COMMITED || + current_.status == Protocol::TS_ABORTED) && + separation_duration_ == 0) // no transaction in progress + { + // start new transaction + + // Note that in_ is already locked by Scheduler + + MessagePtr m (in_.front ()); + in_.pop (); + + if (typeid (*m) == typeid (Send)) + { + send_ = SendPtr (m); + } + else + { + // cerr << "Expecting Send but received " << typeid (*m).name () + // << endl; + + ACE_OS::abort (); + } + + current_.id++; + current_.status = Protocol::TS_BEGIN; + + initiated_ = true; + + // if (trace_) cerr << "starting transaction with id " << current_.id + // << endl; + } + } + + private: + typedef ACE_Guard<ACE_Thread_Mutex> AutoLock; + + bool trace_; + + Protocol::Transaction current_; + + bool initiated_; + + unsigned short voting_duration_; + unsigned short separation_duration_; + + MessageQueue& in_; + MessageQueue& send_out_; + MessageQueue& recv_out_; + + SendPtr send_; + RecvPtr recv_; + }; +} |