summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp340
1 files changed, 340 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp
new file mode 100644
index 00000000000..7ab74bdf599
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/Event/ECG_CDR_Message_Sender.cpp
@@ -0,0 +1,340 @@
+// $Id$
+
+#include "orbsvcs/Event/ECG_CDR_Message_Sender.h"
+#include "tao/CDR.h"
+#include "ace/SOCK_Dgram.h"
+#include "ace/INET_Addr.h"
+#include "ace/ACE.h"
+
+#if !defined(__ACE_INLINE__)
+#include "orbsvcs/Event/ECG_CDR_Message_Sender.inl"
+#endif /* __ACE_INLINE__ */
+
+ACE_RCSID(Event, ECG_CDR_Message_Sender, "$Id$")
+
+TAO_BEGIN_VERSIONED_NAMESPACE_DECL
+
+void
+TAO_ECG_CDR_Message_Sender::init (
+ TAO_ECG_Refcounted_Endpoint endpoint_rptr
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (endpoint_rptr.get () == 0
+ || endpoint_rptr->dgram ().get_handle () == ACE_INVALID_HANDLE)
+ {
+ ACE_ERROR ((LM_ERROR, "TAO_ECG_CDR_Message_Sender::init(): "
+ "nil or unitialized endpoint argument.\n"));
+ ACE_THROW (CORBA::INTERNAL ());
+ }
+
+ this->endpoint_rptr_ = endpoint_rptr;
+}
+
+void
+TAO_ECG_CDR_Message_Sender::send_message (const TAO_OutputCDR &cdr,
+ const ACE_INET_Addr &addr
+ ACE_ENV_ARG_DECL)
+ ACE_THROW_SPEC ((CORBA::SystemException))
+{
+ if (this->endpoint_rptr_.get () == 0)
+ {
+ ACE_ERROR ((LM_ERROR, "Attempt to invoke send_message() "
+ "on non-initialized sender object.\n"));
+ ACE_THROW (CORBA::INTERNAL ());
+ }
+
+ CORBA::ULong max_fragment_payload = this->mtu () -
+ TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE;
+ // ACE_ASSERT (max_fragment_payload != 0);
+
+#if defined (ACE_HAS_BROKEN_DGRAM_SENDV)
+ const int TAO_WRITEV_MAX = ACE_IOV_MAX - 1;
+#else
+ const int TAO_WRITEV_MAX = ACE_IOV_MAX;
+#endif /* ACE_HAS_BROKEN_DGRAM_SENDV */
+ iovec iov[TAO_WRITEV_MAX];
+
+ CORBA::ULong total_length;
+ CORBA::ULong fragment_count =
+ this->compute_fragment_count (cdr.begin (),
+ cdr.end (),
+ TAO_WRITEV_MAX,
+ max_fragment_payload,
+ total_length);
+
+ CORBA::ULong request_id = this->endpoint_rptr_->next_request_id ();
+
+ // Reserve the first iovec for the header...
+ int iovcnt = 1;
+ CORBA::ULong fragment_id = 0;
+ CORBA::ULong fragment_offset = 0;
+ CORBA::ULong fragment_size = 0;
+ for (const ACE_Message_Block* b = cdr.begin ();
+ b != cdr.end ();
+ b = b->cont ())
+ {
+ CORBA::ULong l = b->length ();
+
+ char* rd_ptr = b->rd_ptr ();
+
+ iov[iovcnt].iov_base = rd_ptr;
+ iov[iovcnt].iov_len = l;
+ fragment_size += l;
+ ++iovcnt;
+ while (fragment_size > max_fragment_payload)
+ {
+ // This fragment is full, we have to send it...
+
+ // First adjust the last iov entry:
+ CORBA::ULong last_mb_length =
+ max_fragment_payload - (fragment_size - l);
+ iov[iovcnt - 1].iov_len = last_mb_length;
+
+ this->send_fragment (addr,
+ request_id,
+ total_length,
+ max_fragment_payload,
+ fragment_offset,
+ fragment_id,
+ fragment_count,
+ iov,
+ iovcnt
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ++fragment_id;
+ fragment_offset += max_fragment_payload;
+
+ // Reset, but don't forget that the last Message_Block
+ // may need to be sent in multiple fragments..
+ l -= last_mb_length;
+ rd_ptr += last_mb_length;
+ iov[1].iov_base = rd_ptr;
+ iov[1].iov_len = l;
+ fragment_size = l;
+ iovcnt = 2;
+ }
+ if (fragment_size == max_fragment_payload)
+ {
+ // We filled a fragment, but this time it was filled
+ // exactly, the treatment is a little different from the
+ // loop above...
+ this->send_fragment (addr,
+ request_id,
+ total_length,
+ max_fragment_payload,
+ fragment_offset,
+ fragment_id,
+ fragment_count,
+ iov,
+ iovcnt
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ++fragment_id;
+ fragment_offset += max_fragment_payload;
+
+ iovcnt = 1;
+ fragment_size = 0;
+ }
+ if (iovcnt == TAO_WRITEV_MAX)
+ {
+ // Now we ran out of space in the iovec, we must send a
+ // fragment to work around that....
+ this->send_fragment (addr,
+ request_id,
+ total_length,
+ fragment_size,
+ fragment_offset,
+ fragment_id,
+ fragment_count,
+ iov,
+ iovcnt
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ++fragment_id;
+ fragment_offset += fragment_size;
+
+ iovcnt = 1;
+ fragment_size = 0;
+ }
+ }
+ // There is something left in the iovvec that we must send
+ // also...
+ if (iovcnt != 1)
+ {
+ // Now we ran out of space in the iovec, we must send a
+ // fragment to work around that....
+ this->send_fragment (addr,
+ request_id,
+ total_length,
+ fragment_size,
+ fragment_offset,
+ fragment_id,
+ fragment_count,
+ iov,
+ iovcnt
+ ACE_ENV_ARG_PARAMETER);
+ ACE_CHECK;
+ ++fragment_id;
+ fragment_offset += fragment_size;
+
+ // reset, not needed here...
+ // iovcnt = 1;
+ // fragment_size = 0;
+ }
+ // ACE_ASSERT (total_length == fragment_offset);
+ // ACE_ASSERT (fragment_id == fragment_count);
+
+}
+
+
+void
+TAO_ECG_CDR_Message_Sender::send_fragment (const ACE_INET_Addr &addr,
+ CORBA::ULong request_id,
+ CORBA::ULong request_size,
+ CORBA::ULong fragment_size,
+ CORBA::ULong fragment_offset,
+ CORBA::ULong fragment_id,
+ CORBA::ULong fragment_count,
+ iovec iov[],
+ int iovcnt
+ ACE_ENV_ARG_DECL)
+{
+ CORBA::ULong header[TAO_ECG_CDR_Message_Sender::ECG_HEADER_SIZE
+ / sizeof(CORBA::ULong)
+ + ACE_CDR::MAX_ALIGNMENT];
+ char* buf = reinterpret_cast<char*> (header);
+ TAO_OutputCDR cdr (buf, sizeof(header));
+ cdr.write_boolean (TAO_ENCAP_BYTE_ORDER);
+ // Insert some known values in the padding bytes, so we can smoke
+ // test the message on the receiving end.
+ cdr.write_octet ('A'); cdr.write_octet ('B'); cdr.write_octet ('C');
+ cdr.write_ulong (request_id);
+ cdr.write_ulong (request_size);
+ cdr.write_ulong (fragment_size);
+ cdr.write_ulong (fragment_offset);
+ cdr.write_ulong (fragment_id);
+ cdr.write_ulong (fragment_count);
+ CORBA::Octet padding[4];
+
+
+ // MRH
+ if (checksum_)
+ {
+ // Compute CRC
+ iov[0].iov_base = cdr.begin ()->rd_ptr ();
+ iov[0].iov_len = cdr.begin ()->length ();
+ unsigned int crc = 0;
+ unsigned char *crc_parts = (unsigned char *)(&crc);
+ if (iovcnt > 1)
+ {
+ crc = ACE::crc32 (iov, iovcnt);
+ crc = htonl (crc);
+ }
+ for (int cnt=0; cnt<4; ++cnt)
+ {
+ padding[cnt] = crc_parts[cnt];
+ }
+ }
+ else
+ {
+ for (int cnt=0; cnt<4; ++cnt)
+ {
+ padding[cnt] = 0;
+ }
+ }
+ //End MRH
+ cdr.write_octet_array (padding, 4);
+
+ iov[0].iov_base = cdr.begin ()->rd_ptr ();
+ iov[0].iov_len = cdr.begin ()->length ();
+
+ ssize_t n = this->dgram ().send (iov,
+ iovcnt,
+ addr);
+ size_t expected_n = 0;
+ for (int i = 0; i < iovcnt; ++i)
+ expected_n += iov[i].iov_len;
+ if (n > 0 && size_t(n) != expected_n)
+ {
+ ACE_ERROR ((LM_ERROR, ("Sent only %d out of %d bytes "
+ "for mcast fragment.\n"),
+ n,
+ expected_n));
+ }
+
+ if (n == -1)
+ {
+ if (errno == EWOULDBLOCK)
+ {
+ ACE_ERROR ((LM_ERROR, "Send of mcast fragment failed (%m).\n"));
+ // @@ TODO Use a Event Channel specific exception
+ ACE_THROW (CORBA::COMM_FAILURE ());
+ }
+ else
+ {
+ ACE_DEBUG ((LM_WARNING, "Send of mcast fragment blocked (%m).\n"));
+ }
+ }
+ else if (n == 0)
+ {
+ ACE_DEBUG ((LM_WARNING, "EOF on send of mcast fragment (%m).\n"));
+ }
+}
+
+
+CORBA::ULong
+TAO_ECG_CDR_Message_Sender::compute_fragment_count (const ACE_Message_Block* begin,
+ const ACE_Message_Block* end,
+ int iov_size,
+ CORBA::ULong max_fragment_payload,
+ CORBA::ULong& total_length)
+{
+ CORBA::ULong fragment_count = 0;
+ total_length = 0;
+
+ CORBA::ULong fragment_size = 0;
+ // Reserve the first iovec for the header...
+ int iovcnt = 1;
+ for (const ACE_Message_Block* b = begin;
+ b != end;
+ b = b->cont ())
+ {
+ CORBA::ULong l = b->length ();
+ total_length += l;
+ fragment_size += l;
+ ++iovcnt;
+ while (fragment_size > max_fragment_payload)
+ {
+ // Ran out of space, must create a fragment...
+ ++fragment_count;
+
+ // The next iovector will contain what remains of this
+ // buffer, but also consider
+ iovcnt = 2;
+ l -= max_fragment_payload - (fragment_size - l);
+ fragment_size = l;
+ }
+ if (fragment_size == max_fragment_payload)
+ {
+ ++fragment_count;
+ iovcnt = 1;
+ fragment_size = 0;
+ }
+ if (iovcnt >= iov_size)
+ {
+ // Ran out of space in the iovector....
+ ++fragment_count;
+ iovcnt = 1;
+ fragment_size = 0;
+ }
+ }
+ if (iovcnt != 1)
+ {
+ // Send the remaining data in another fragment
+ ++fragment_count;
+ }
+ return fragment_count;
+}
+
+TAO_END_VERSIONED_NAMESPACE_DECL