diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp | 607 |
1 files changed, 607 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp new file mode 100644 index 00000000000..03c5ff717fc --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Receiver.cpp @@ -0,0 +1,607 @@ +// $Id$ + +#include "orbsvcs/Event/ECG_CDR_Message_Receiver.h" +#include "orbsvcs/Event/ECG_CDR_Message_Sender.h" + +#include "tao/Exception.h" + +#include "ace/SOCK_Dgram.h" +#include "ace/ACE.h" +#include "ace/OS_NS_string.h" + +#if !defined(__ACE_INLINE__) +#include "orbsvcs/Event/ECG_CDR_Message_Receiver.inl" +#endif /* __ACE_INLINE__ */ + +ACE_RCSID (Event, + ECG_CDR_Message_Receiver, + "$Id$") + +TAO_BEGIN_VERSIONED_NAMESPACE_DECL + +TAO_ECG_CDR_Processor::~TAO_ECG_CDR_Processor (void) +{ +} +// **************************************************************** + +TAO_ECG_UDP_Request_Entry::~TAO_ECG_UDP_Request_Entry (void) +{ + if (this->own_received_fragments_) + { + this->own_received_fragments_ = 0; + delete[] this->received_fragments_; + } +} + +TAO_ECG_UDP_Request_Entry:: +TAO_ECG_UDP_Request_Entry (CORBA::Boolean byte_order, + CORBA::ULong request_id, + CORBA::ULong request_size, + CORBA::ULong fragment_count) + : byte_order_ (byte_order) + , request_id_ (request_id) + , request_size_ (request_size) + , fragment_count_ (fragment_count) +{ + ACE_CDR::grow (&this->payload_, this->request_size_); + this->payload_.wr_ptr (request_size_); + + this->received_fragments_ = this->default_received_fragments_; + this->own_received_fragments_ = 0; + const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT; + this->received_fragments_size_ = + this->fragment_count_ / bits_per_ulong + 1; + if (this->received_fragments_size_ > ECG_DEFAULT_FRAGMENT_BUFSIZ) + { + ACE_NEW (this->received_fragments_, + CORBA::ULong[this->received_fragments_size_]); + this->own_received_fragments_ = 1; + } + + for (CORBA::ULong i = 0; i < this->received_fragments_size_; ++i) + this->received_fragments_[i] = 0; + CORBA::ULong idx = this->fragment_count_ / bits_per_ulong; + CORBA::ULong bit = this->fragment_count_ % bits_per_ulong; + this->received_fragments_[idx] = (0xFFFFFFFF << bit); +} + +int +TAO_ECG_UDP_Request_Entry::validate_fragment (CORBA::Boolean byte_order, + CORBA::ULong request_size, + CORBA::ULong fragment_size, + CORBA::ULong fragment_offset, + CORBA::ULong /* fragment_id */, + CORBA::ULong fragment_count) const +{ + if (byte_order != this->byte_order_ + || request_size != this->request_size_ + || fragment_count != this->fragment_count_) + return 0; + + if (fragment_offset >= request_size + || fragment_offset + fragment_size > request_size) + return 0; + + return 1; +} + +int +TAO_ECG_UDP_Request_Entry::test_received (CORBA::ULong fragment_id) const +{ + // Assume out-of-range fragments as received, so they are dropped... + if (fragment_id > this->fragment_count_) + return 1; + + const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT; + CORBA::ULong idx = fragment_id / bits_per_ulong; + CORBA::ULong bit = fragment_id % bits_per_ulong; + return ACE_BIT_ENABLED (this->received_fragments_[idx], 1<<bit); +} + +void +TAO_ECG_UDP_Request_Entry::mark_received (CORBA::ULong fragment_id) +{ + // Assume out-of-range fragments as received, so they are dropped... + if (fragment_id > this->fragment_count_) + return; + + const int bits_per_ulong = sizeof(CORBA::ULong) * CHAR_BIT; + CORBA::ULong idx = fragment_id / bits_per_ulong; + CORBA::ULong bit = fragment_id % bits_per_ulong; + ACE_SET_BITS (this->received_fragments_[idx], 1<<bit); +} + +int +TAO_ECG_UDP_Request_Entry::complete (void) const +{ + for (CORBA::ULong i = 0; + i < this->received_fragments_size_; + ++i) + { + if (this->received_fragments_[i] != 0xFFFFFFFF) + return 0; + } + return 1; +} + +char* +TAO_ECG_UDP_Request_Entry::fragment_buffer (CORBA::ULong fragment_offset) +{ + return this->payload_.rd_ptr () + fragment_offset; +} +// **************************************************************** + +int +TAO_ECG_CDR_Message_Receiver::Requests::init (size_t size, + size_t min_purge_count) +{ + // Already initialized. + if (this->fragmented_requests_) + return -1; + + ACE_NEW_RETURN (this->fragmented_requests_, + TAO_ECG_UDP_Request_Entry*[size], + -1); + + this->size_ = size; + this->id_range_low_ = 0; + this->id_range_high_ = size - 1; + this->min_purge_count_ = min_purge_count; + + for (size_t i = 0; i < size; ++i) + { + this->fragmented_requests_[i] = 0; + } + + return 0; +} + +TAO_ECG_CDR_Message_Receiver::Requests::~Requests (void) +{ + for (size_t i = 0; i < this->size_; ++i) + { + TAO_ECG_UDP_Request_Entry* request = + this->fragmented_requests_[i]; + + if (request != &TAO_ECG_CDR_Message_Receiver::Request_Completed_) + delete request; + } + + delete [] this->fragmented_requests_; + + this->fragmented_requests_ = 0; + this->size_ = 0; + this->id_range_low_ = 0; + this->id_range_high_ = 0; +} + +TAO_ECG_UDP_Request_Entry ** +TAO_ECG_CDR_Message_Receiver::Requests::get_request (CORBA::ULong request_id) +{ + if (request_id < this->id_range_low_) + // <request_id> is below the current range. + { + return 0; + } + + if (request_id > this->id_range_high_) + // <request_id> is above the current range - need to shift the range + // to include it. + { + CORBA::ULong new_slots_needed = request_id - this->id_range_high_; + + if (new_slots_needed < this->min_purge_count_) + new_slots_needed = this->min_purge_count_; + + if (new_slots_needed > this->size_) + // Shifting the range by more than the size of array. + { + this->purge_requests (this->id_range_low_, this->id_range_high_); + this->id_range_high_ = request_id; + this->id_range_low_ = request_id - this->size_ + 1; + } + else + { + this->purge_requests (this->id_range_low_, + this->id_range_low_ + new_slots_needed - 1); + this->id_range_high_ += new_slots_needed; + this->id_range_low_ += new_slots_needed; + } + } + + // Return array location for <request_id>. + int index = request_id % this->size_; + return this->fragmented_requests_ + index; +} + + +void +TAO_ECG_CDR_Message_Receiver::Requests::purge_requests ( + CORBA::ULong purge_first, + CORBA::ULong purge_last) +{ + for (CORBA::ULong i = purge_first; i <= purge_last; ++i) + { + size_t index = i % this->size_; + if (this->fragmented_requests_[index] + != &TAO_ECG_CDR_Message_Receiver::Request_Completed_) + { + delete this->fragmented_requests_[index]; + } + this->fragmented_requests_[index] = 0; + } +} + +// **************************************************************** + +TAO_ECG_UDP_Request_Entry +TAO_ECG_CDR_Message_Receiver::Request_Completed_ (0, 0, 0, 0); + +int +TAO_ECG_CDR_Message_Receiver::handle_input ( + ACE_SOCK_Dgram& dgram, + TAO_ECG_CDR_Processor *cdr_processor) +{ + char nonaligned_header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE + + ACE_CDR::MAX_ALIGNMENT]; + char *header_buf = ACE_ptr_align_binary (nonaligned_header, + ACE_CDR::MAX_ALIGNMENT); + char nonaligned_data[ACE_MAX_DGRAM_SIZE + ACE_CDR::MAX_ALIGNMENT]; + char *data_buf = ACE_ptr_align_binary (nonaligned_data, + ACE_CDR::MAX_ALIGNMENT); + + // Read the message from dgram. + + const int iovcnt = 2; + iovec iov[iovcnt]; + iov[0].iov_base = header_buf; + iov[0].iov_len = TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; + iov[1].iov_base = data_buf; + iov[1].iov_len = ACE_MAX_DGRAM_SIZE; + + ACE_INET_Addr from; + ssize_t n = dgram.recv (iov, iovcnt, from); + + if (n == -1) + { + if (errno == EWOULDBLOCK) + return 0; + + ACE_ERROR_RETURN ((LM_ERROR, "Error reading mcast fragment (%m).\n"), + -1); + } + + if (n == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " + "read 0 bytes from socket.\n"), + 0); + } + + if (n < TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE) + { + ACE_ERROR_RETURN ((LM_ERROR, "Trying to read mcast fragment: " + "# of bytes read < mcast header size.\n"), + -1); + } + + u_int crc = 0; + + if (this->check_crc_) + { + iov[1].iov_len = n - iov[0].iov_len; + iov[0].iov_len -= 4; // don't include crc + + crc = ACE::crc32 (iov, 2); + } + // Check whether the message is a loopback message. + if (this->ignore_from_.get () != 0 + && this->ignore_from_->is_loopback (from)) + { + return 0; + } + + // Decode and validate mcast header. + Mcast_Header header; + if (header.read (header_buf, n, this->check_crc_) == -1) + return -1; + + if ( this->check_crc_ && header.crc != crc) + { + static unsigned int err_count = 0; + ACE_ERROR ((LM_ERROR, + "******************************\n")); + + ACE_ERROR ((LM_ERROR, + "ERROR DETECTED \n")); + + if (crc == 0) + { + ACE_ERROR ((LM_ERROR, + "Sending process may not have computed CRC \n")); + } + else + { + ACE_ERROR ((LM_ERROR, + " NETWORK CRC CHECKSUM FAILED\n")); + } + + ACE_ERROR ((LM_ERROR, + "Message was received from [%s:%s:%d] \n", + from.get_host_name (), + from.get_host_addr (), + from.get_port_number())); + + ACE_ERROR ((LM_ERROR, + "Num errors = %d \n", + ++err_count)); + ACE_ERROR ((LM_ERROR, + "This is a bad thing. Attempting to ignore ..\n")); + + return 0; + } + + // Process received data. + if (header.fragment_count == 1) + { + // Update <request_map_> to mark this request as completed. (Not + // needed if we don't care about duplicates.) + int const result = this->mark_received (from, header.request_id); + if (result != 1) + return result; + + TAO_InputCDR cdr (data_buf, header.request_size, header.byte_order); + if (cdr_processor->decode (cdr) == -1) + return -1; + else + return 1; + } + + return this->process_fragment (from, header, data_buf, cdr_processor); +} + +int +TAO_ECG_CDR_Message_Receiver::mark_received (const ACE_INET_Addr &from, + CORBA::ULong request_id) +{ + // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); + + Request_Map::ENTRY * entry = this->get_source_entry (from); + if (!entry) + return -1; + + TAO_ECG_UDP_Request_Entry ** request = + entry->int_id_->get_request (request_id); + + if (request == 0) + { + ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence" + "below currently expected range.\n")); + return 0; + } + if (*request == &Request_Completed_) + { + ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " + "(Request already complete).\n")); + return 0; + } + if (*request != 0) + { + ACE_ERROR_RETURN ((LM_ERROR, "Inconsistent fragments for " + "mcast request.\n"), + -1); + } + + *request = &Request_Completed_; + return 1; +} + +int +TAO_ECG_CDR_Message_Receiver::process_fragment ( + const ACE_INET_Addr &from, + const Mcast_Header &header, + char * data_buf, + TAO_ECG_CDR_Processor *cdr_processor) +{ + // ACE_GUARD_RETURN (ACE_Lock, guard, *this->lock_, -1); + + Request_Map::ENTRY * source_entry = this->get_source_entry (from); + if (!source_entry) + return -1; + + TAO_ECG_UDP_Request_Entry ** request = + source_entry->int_id_->get_request (header.request_id); + + if (request == 0) + { + ACE_DEBUG ((LM_WARNING, "Received mcast request with sequence " + "below currently expected range.\n")); + return 0; + } + if (*request == &Request_Completed_) + { + ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment. " + "(Request already complete).\n")); + return 0; + } + if (*request == 0) + // Entry for this request has not yet been allocated. + { + ACE_NEW_RETURN (*request, + TAO_ECG_UDP_Request_Entry (header.byte_order, + header.request_id, + header.request_size, + header.fragment_count), + -1); + } + + // Validate the fragment. + if ((*request)->validate_fragment (header.byte_order, + header.request_size, + header.fragment_size, + header.fragment_offset, + header.fragment_id, + header.fragment_count) == 0) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Received invalid mcast fragment.\n"), + -1); + } + + // Check whether this fragment was already received. + if ((*request)->test_received (header.fragment_id) == 1) + { + ACE_DEBUG ((LM_INFO, "Received duplicate mcast fragment.\n")); + return 0; + } + + // Add the fragment to the request entry. + (*request)->mark_received (header.fragment_id); + ACE_OS::memcpy ((*request)->fragment_buffer (header.fragment_offset), + data_buf, + header.fragment_size); + + // The request is not yet complete. + if (!(*request)->complete ()) + { + return 0; + } + + // The request is complete - decode it. + TAO_InputCDR cdr ((*request)->fragment_buffer (0), + header.request_size, + header.byte_order); + + if (cdr_processor->decode (cdr) == -1) + return -1; + + delete *request; + *request = &Request_Completed_; + return 1; +} + +TAO_ECG_CDR_Message_Receiver::Request_Map::ENTRY* +TAO_ECG_CDR_Message_Receiver::get_source_entry (const ACE_INET_Addr &from) +{ + // Get the entry for <from> from the <request_map_>. + Request_Map::ENTRY * entry = 0; + + if (this->request_map_.find (from, entry) == -1) + { + // Create an entry if one doesn't exist. + Requests *requests = 0; + ACE_NEW_RETURN (requests, + Requests, + 0); + auto_ptr<Requests> requests_aptr (requests); + + if (requests->init (this->max_requests_, this->min_purge_count_) == -1 + || this->request_map_.bind (from, requests, entry) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Unable to create hash map " + "entry for a new request.\n"), + 0); + } + requests_aptr.release (); + } + + return entry; +} + +void +TAO_ECG_CDR_Message_Receiver::shutdown (void) +{ + // ACE_GUARD (ACE_Lock, guard, *this->lock_); + + Request_Map::iterator end = this->request_map_.end (); + for (Request_Map::iterator i = this->request_map_.begin (); + i != end; + ++i) + { + delete (*i).int_id_; + (*i).int_id_ = 0; + } + + this->ignore_from_.reset (); +} + +// **************************************************************** +int +TAO_ECG_CDR_Message_Receiver::Mcast_Header::read (char *header, + size_t bytes_received, + CORBA::Boolean checkcrc) +{ + // Decode. + this->byte_order = header[0]; + if(this->byte_order != 0 && this->byte_order != 1) + { + ACE_ERROR_RETURN ((LM_ERROR, "Reading mcast packet header: byte " + "order is neither 0 nor 1, it is %d.\n", + this->byte_order), + -1); + } + + TAO_InputCDR header_cdr (header, + TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE, + byte_order); + CORBA::Boolean unused; + CORBA::Octet a, b, c; + if (!header_cdr.read_boolean (unused) + || !header_cdr.read_octet (a) + || !header_cdr.read_octet (b) + || !header_cdr.read_octet (c) + || a != 'A' || b != 'B' || c != 'C') + { + ACE_ERROR_RETURN ((LM_ERROR, "Error reading magic bytes " + "in mcast packet header.\n"), + -1); + } + + if (!header_cdr.read_ulong (this->request_id) + || !header_cdr.read_ulong (this->request_size) + || !header_cdr.read_ulong (this->fragment_size) + || !header_cdr.read_ulong (this->fragment_offset) + || !header_cdr.read_ulong (this->fragment_id) + || !header_cdr.read_ulong (this->fragment_count)) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Error decoding mcast packet header.\n"), + -1); + } + + if (checkcrc) + { + CORBA::Octet padding[4]; + header_cdr.read_octet_array (padding, 4); + + unsigned char *crcparts = (unsigned char *)(&this->crc); + + for (int cnt=0; cnt != 4; ++cnt) + { + crcparts[cnt] = padding[cnt]; + } + + this->crc = ntohl (this->crc); + } + + // Validate. + size_t const data_bytes_received = + bytes_received - TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE; + + if (this->request_size < this->fragment_size + || this->fragment_offset >= this->request_size + || this->fragment_id >= this->fragment_count + || (this->fragment_count == 1 + && (this->fragment_size != this->request_size + || this->request_size != data_bytes_received))) + { + ACE_ERROR_RETURN ((LM_ERROR, + "Invalid mcast fragment: " + "inconsistent header fields.\n"), + -1); + } + + return 0; +} + +TAO_END_VERSIONED_NAMESPACE_DECL |