summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/sfp.cpp918
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/sfp.h197
2 files changed, 1115 insertions, 0 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
new file mode 100644
index 00000000000..4e1e3d21413
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
@@ -0,0 +1,918 @@
+#include "orbsvcs/AV/sfp.h"
+#include "ace/ARGV.h"
+
+// $Id$
+
+// constructor.
+SFP::SFP (CORBA::ORB_ptr orb,
+ ACE_Reactor* reactor,
+ ACE_Time_Value timeout1,
+ ACE_Time_Value timeout2,
+ SFP_Callback *callback)
+ :orb_ (orb),
+ reactor_ (reactor),
+ encoder_ (0),
+ decoder_ (0),
+ timeout1_ (timeout1),
+ timeout2_ (timeout2),
+ start_tries_ (10),
+ startReply_tries_ (10),
+ callback_ (callback),
+ sequence_num_ (0)
+{
+
+}
+
+void
+SFP::set_cdr_length (void)
+{
+ CORBA::ULong bodylen = encoder_->total_length ();
+ char* buf = ACE_const_cast(char*,encoder_->buffer ());
+ buf += 4;
+#if !defined (TAO_ENABLE_SWAP_ON_WRITE)
+ *ACE_reinterpret_cast(CORBA::ULong*,buf) = bodylen;
+#else
+ if (!encoder_->do_byte_swap ())
+ {
+ *ACE_reinterpret_cast(CORBA::ULong*, buf) = bodylen;
+ }
+ else
+ {
+ CDR::swap_4 (ACE_reinterpret_cast(char*,&bodylen), buf);
+ }
+#endif
+}
+
+// Copies length bytes from the given message into the
+// CDR buffer. Returns 0 on success, -1 on failure
+int
+SFP::create_cdr_buffer (char *message,
+ size_t length)
+{
+ if (this->decoder_)
+ delete this->decoder_;
+
+ ACE_NEW_RETURN (this->decoder_,
+ TAO_InputCDR (message,
+ length),
+ -1);
+
+ ACE_OS::memcpy (this->decoder_->rd_ptr (),
+ message,
+ length);
+
+ return 0;
+}
+
+// Start the active end of the stream.
+int
+SFP::start_stream (const char *receiver_addr)
+{
+ // @@we have to do ACE_NTOHS for all the network-byte ordered fields.
+ int result;
+ ACE_INET_Addr sender;
+ result = this->connect_to_receiver (receiver_addr);
+ if (result < 0)
+ return result;
+ while (this->start_tries_ > 0)
+ {
+ if ((result = this->send_start ()) != 0)
+ return result;
+ char magic_number [4];
+ // Timed recv.
+ ssize_t n =this->dgram_.recv (magic_number,
+ sizeof(magic_number),
+ sender,
+ MSG_PEEK,
+ &this->timeout1_);
+ ACE_DEBUG ((LM_DEBUG,"n = %d\n",n));
+ if (n == -1)
+ {
+ if (errno == ETIME)
+ {
+ ACE_DEBUG ((LM_DEBUG,"Timed out in reading StartReply"));
+ this->start_tries_ --;
+ continue;
+ }
+ else
+ ACE_ERROR_RETURN ((LM_ERROR,"dgram recv error:%p","recv"),-1);
+ }
+ else if (n==0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::start_stream -peek"),-1);
+ // successful receive of dgram.
+ ACE_DEBUG ((LM_DEBUG,"StartReply received"));
+ // check if its startreply message.
+ char *magic_string = this->magic_number_to_string (magic_number);
+ if (ACE_OS::strcmp (magic_string,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
+ flowProtocol::StartReply start_reply;
+ n = this->dgram_.recv ((char *)&start_reply,
+ sizeof (start_reply),
+ sender);
+ if (n != sizeof (start_reply))
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input-StartReply\n"),0);
+ // check for SFP version difference.??
+ // Call the application back.
+ this->state_ = REPLY_RECEIVED;
+ }
+ // register the data handler.
+ result = this->register_dgram_handler ();
+ return result;
+ }
+ return 0;
+}
+
+// Start the passive end of the stream.
+int
+SFP::start_stream (const char *local_addr,int Credit)
+{
+ int result;
+ ACE_INET_Addr sender;
+
+ this->state_ = PASSIVE_START;
+ ACE_INET_Addr myaddr (local_addr);
+ result = this->dgram_.open (myaddr);
+
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::passive start- open failed\n"),-1);
+
+ char magic_number[4];
+ // Timed recv.
+ ssize_t n =this->dgram_.recv (magic_number,
+ sizeof(magic_number),
+ sender,
+ MSG_PEEK,
+ &this->timeout2_);
+ if ((n == -1) && (errno == ETIME))
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,"Timedout in reading Start"),-1);
+ }
+ else if (n==0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::start_stream -peek"),-1);
+
+ ACE_DEBUG ((LM_DEBUG,"Start received:"));
+ char *magic_string = this->magic_number_to_string (magic_number);
+ if (ACE_OS::strcmp (magic_string,TAO_SFP_START_MAGIC_NUMBER) == 0)
+ {
+ // Read the start message.
+ flowProtocol::Start start;
+ n = this->dgram_.recv ((char *)&start,
+ sizeof (start),
+ sender);
+ if (n != sizeof (start))
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Start\n"),0);
+ else
+ ACE_DEBUG ((LM_DEBUG,"Start message consumed\n"));
+ this->state_ = START_RECEIVED;
+ this->receiver_inet_addr_.set (sender);
+ // Now send a startReply message back.
+ result = this->send_startReply ();
+ if (result != 0)
+ return result;
+ // Now we register a timeout handler until we receive a data
+ // frame.
+ result = this->reactor_->schedule_timer (this,
+ 0,
+ this->timeout1_);
+ if (result == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"schedule_timer failed\n"),result);
+
+ // register the data handler.
+ result = this->register_dgram_handler ();
+ return result;
+ }
+
+ else
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,"Invalid messaged received"),-1);
+ }
+ return 0;
+}
+
+// Sends the ACE_Message_Block data as a frame, fragmenting if necessary.
+int
+SFP::send_simple_frame (ACE_Message_Block *frame)
+{
+ // Currently there is no fragmentation handled, just a simple
+ // frame.
+
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block,
+ -1);
+ if (this->credit_ > 0)
+ {
+ // if we have enough credit then we send.
+ // Currently no fragmentation.
+ int length = frame->length ();
+
+ if (length > ACE_MAX_DGRAM_SIZE)
+ ACE_ERROR_RETURN ((LM_ERROR,"sfp doesn't support fragmentation yet"),-1);
+
+ flowProtocol::frameHeader frame_header;
+
+ // The magic_number and flags are to be sent in network-byte order.
+ frame_header.magic_number [0] = ACE_HTONS ('=');
+ frame_header.magic_number [1] = ACE_HTONS ('S');
+ frame_header.magic_number [2] = ACE_HTONS ('F');
+ frame_header.magic_number [3] = ACE_HTONS ('P');
+
+ // set the byte order and no fragments.
+ frame_header.flags = 0;
+ frame_header.flags |= TAO_ENCAP_BYTE_ORDER;
+
+ frame_header.flags = ACE_HTONS (frame_header.flags);
+ //set the size of the message block.
+ int len = sizeof(frame_header.magic_number)+sizeof
+ (frame_header.flags);
+ mb->size (len);
+
+ mb->rd_ptr ((char *)&frame_header);
+ mb->wr_ptr ((char *)&frame_header+len);
+ if (this->encoder_ != 0)
+ delete this->encoder_;
+ ACE_NEW_RETURN (this->encoder_,
+ TAO_OutputCDR,
+ -1);
+
+ frame_header.message_type = flowProtocol::SimpleFrame;
+ frame_header.message_size = frame->length ();
+ this->encoder_->write_octet (frame_header.message_type);
+ this->encoder_->write_ulong (frame_header.message_size);
+
+ // This is a good maximum, because Dgrams cannot be longer than
+ // 64K and the usual size for a CDR fragment is 512 bytes.
+ // @@ TODO In the future we may need to allocate some memory
+ // from the heap.
+ const int TAO_WRITEV_MAX = 128;
+ iovec iov[TAO_WRITEV_MAX];
+
+ iov[0].iov_base = mb->rd_ptr ();
+ iov[0].iov_len = mb->length ();
+ ACE_DEBUG ((LM_DEBUG,"length: %d ",mb->length ()));
+ int iovcnt = 1;
+ for (const ACE_Message_Block* b = this->encoder_->begin ();
+ b != this->encoder_->end () && iovcnt < TAO_WRITEV_MAX;
+ b = b->cont ())
+ {
+ iov[iovcnt].iov_base = b->rd_ptr ();
+ iov[iovcnt].iov_len = b->length ();
+ ACE_DEBUG ((LM_DEBUG,"length: %d ",b->length ()));
+ iovcnt++;
+ }
+ iov[iovcnt].iov_base = frame->rd_ptr ();
+ iov[iovcnt].iov_len = frame->length ();
+ ACE_DEBUG ((LM_DEBUG,"length: %d ",frame->length ()));
+ ssize_t n = this->dgram_.send (iov,
+ iovcnt,
+ this->receiver_inet_addr_);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send_simple_frame (%t) send failed %p\n", ""),-1);
+ else if (n == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send_simple_Frame (%t) EOF on send \n"),-1);
+ }
+ return 0;
+}
+
+// This is used to send large frames with fragmentation.This is not
+// complete yet.
+int
+SFP::send_frame (ACE_Message_Block *frame)
+{
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb,
+ ACE_Message_Block,
+ -1);
+ if (this->credit_ > 0)
+ {
+ // if we have enough credit then we send.
+ // Do fragmentation if necessary.
+ int length = frame->length ();
+
+ int total_length = 0;
+ ACE_Message_Block *temp = frame;
+ while (temp != 0)
+ {
+ total_length += temp->length ();
+ temp = temp->next ();
+ }
+ flowProtocol::frameHeader frame_header;
+
+ // The magic_number and flags are to be sent in network-byte order.
+ frame_header.magic_number [0] = ACE_HTONS ('=');
+ frame_header.magic_number [1] = ACE_HTONS ('S');
+ frame_header.magic_number [2] = ACE_HTONS ('F');
+ frame_header.magic_number [3] = ACE_HTONS ('P');
+ // sizeof (frameHeader) may have to be replaced with more
+ // accurate size??.
+ if (total_length > (ACE_MAX_DGRAM_SIZE- sizeof (frameHeader)))
+ {
+ // If the message size is not okay including the headers i.e it
+ // cannot fit in a dgram.
+
+ // set the byte order and no fragments.
+ frame_header.flags = 0;
+ frame_header.flags |= TAO_ENCAP_BYTE_ORDER;
+ // set the fragments bit.
+ frame_header.flags |= 2;
+
+ frame_header.flags = ACE_HTONS (frame_header.flags);
+
+ // first fragment will have size to be
+ //set the size of the message block.
+ int len = sizeof(frame_header.magic_number)+sizeof
+ (frame_header.flags);
+ mb->size (len);
+
+ mb->rd_ptr ((char *)&frame_header);
+ mb->wr_ptr ((char *)&frame_header+len);
+ if (this->encoder_ != 0)
+ delete this->encoder_;
+ ACE_NEW_RETURN (this->encoder_,
+ TAO_OutputCDR,
+ -1);
+
+
+ // This is a good maximum, because Dgrams cannot be longer than
+ // 64K and the usual size for a CDR fragment is 512 bytes.
+ // @@ TODO In the future we may need to allocate some memory
+ // from the heap.
+ int message_len = 0;
+ const int TAO_WRITEV_MAX = 128;
+ iovec iov[TAO_WRITEV_MAX];
+
+ iov[0].iov_base = mb->rd_ptr ();
+ iov[0].iov_len = mb->length ();
+ int header_len = mb->length ()+2* sizeof(CORBA::ULong);
+ message_len += header_len;
+ ACE_DEBUG ((LM_DEBUG,"length: %d ",mb->length ()));
+ int iovcnt = 2;
+ ACE_Message_Block *mb = frame;
+ int prev_len;
+ while (mb != 0)
+ {
+ prev_len = message_len;
+ message_len += mb->length ();
+ if (message_len > ACE_MAX_DGRAM_SIZE)
+ {
+ // get only the length that we can accomodate.
+ int current_len = ACE_MAX_DGRAM_SIZE - prev_len;
+ if (current_len < mb->length ())
+ {
+ // The above condition is an assertion.
+ iov [iovcnt].iov_base = mb->rd_ptr ();
+ iov [iovcnt].iov_len = current_len;
+ message_len += current_len;
+ mb->rd_ptr (current_len);
+ iovcnt++;
+ }
+ break;
+ }
+ else
+ {
+ // we can accomodate this message block
+ iov [iovcnt].iov_base = mb->rd_ptr ();
+ iov [iovcnt].iov_len = mb->length ();
+ message_len += mb->length ();
+ iovcnt++;
+ mb = mb->next ();
+ }
+ }
+ frame_header.message_type = flowProtocol::SimpleFrame;
+ frame_header.message_size = message_len - header_len;
+ this->encoder_->write_octet (frame_header.message_type);
+ this->encoder_->write_ulong (frame_header.message_size);
+
+ // THe header will be only in the first cdr fragment.
+ iov[1].iov_base = this->encoder_->begin ()->rd_ptr ();
+ iov[1].iov_len = this->encoder_->begin ()->length ();
+
+ // send the fragment 0.
+ ssize_t n = this->dgram_.send (iov,
+ iovcnt,
+ this->receiver_inet_addr_);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send_frame (%t) fragment 0 send failed %p\n", ""),-1);
+ else if (n == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "send_Frame (%t) EOF on send \n"),-1);
+
+ int frag_number = 0;
+ // If there is any more data send those as fragments.
+ while (mb != 0)
+ {
+ flowProtocol::fragment frag;
+
+ // The magic_number and flags are to be sent in network-byte order.
+ frag.magic_number [0] = ACE_HTONS ('F');
+ frag.magic_number [1] = ACE_HTONS ('R');
+ frag.magic_number [2] = ACE_HTONS ('A');
+ frag.magic_number [3] = ACE_HTONS ('G');
+
+ ACE_Message_Block *magic_block;
+ ACE_NEW_RETURN (magic_block,
+ ACE_Message_Block,
+ -1);
+ magic_block->size (5);// magic_number+flags size.
+ magic_block->rd_ptr ((char *)&frag);
+ magic_block->wr_ptr (5);
+
+ iov [0].iov_base = magic_block->rd_ptr ();
+ iov [0].iov_len = magic_block->length ();
+
+ int header_len = 5 + 4 *sizeof (CORBA::ULong);
+ message_len = header_len;
+ // 5 for magic_number+flags and 4 ulongs in the fragment header.
+ iovcnt = 2;// 1 is for the frag header.
+ while (mb != 0)
+ {
+ prev_len = message_len;
+ message_len += mb->length ();
+ if (message_len > ACE_MAX_DGRAM_SIZE)
+ {
+ // get only the length that we can accomodate.
+ int current_len = ACE_MAX_DGRAM_SIZE - prev_len;
+ if (current_len < mb->length ())
+ {
+ // The above condition is an assertion.
+ iov [iovcnt].iov_base = mb->rd_ptr ();
+ iov [iovcnt].iov_len = current_len;
+ mb->rd_ptr (current_len);
+ message_len += current_len;
+ iovcnt++;
+ }
+ break;
+ }
+ else
+ {
+ // we can accomodate this message block
+ iov [iovcnt].iov_base = mb->rd_ptr ();
+ iov [iovcnt].iov_len = mb->length ();
+ message_len += mb->length ();
+ iovcnt++;
+ mb = mb->next ();
+ }
+ }
+ // send this fragment.
+ // set the more fragments flag
+ if ((mb != 0) && (mb->length () != 0))
+ frag.flags = ACE_HTONS (2);
+ else
+ break;
+ if (this->encoder_ != 0)
+ delete this->encoder_;
+ ACE_NEW_RETURN (this->encoder_,
+ TAO_OutputCDR,
+ -1);
+ frag.frag_number = frag_number++;
+ frag.sequence_num = this->sequence_num_;
+ frag.frag_sz = message_len - header_len;
+ frag.source_id = 0;
+ this->encoder_->write_ulong (frag.frag_number);
+ this->encoder_->write_ulong (frag.sequence_num);
+ this->encoder_->write_ulong (frag.frag_sz);
+ this->encoder_->write_ulong (frag.source_id);
+
+ // THe header will be only in the first cdr fragment.
+ iov[1].iov_base = this->encoder_->begin ()->rd_ptr ();
+ iov[1].iov_len = this->encoder_->begin ()->length ();
+
+ // send the fragment now.
+ ssize_t n = this->dgram_.send (iov,
+ iovcnt,
+ this->receiver_inet_addr_);
+ if (n == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "SFP::send_frame (%t) send failed %p\n", ""));
+ return -1;
+ }
+ else if (n == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "SFP::send_frame (%t) EOF on send \n"));
+ return -1;
+ }
+ }
+ }
+ }
+ return 0;
+}
+
+
+// creates a connected dgram.
+int
+SFP::connect_to_receiver (const char *receiver_addr)
+{
+ this->receiver_addr_ = ACE_OS::strdup (receiver_addr);
+ // Get the local UDP address
+ if (this->dgram_.open (ACE_Addr::sap_any) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) datagram open failed %p\n"),1);
+
+ // set the socket buffer sizes to 64k.
+ int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
+ int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
+
+ if (this->dgram_.set_option (SOL_SOCKET,
+ SO_SNDBUF,
+ (void *) &sndbufsize,
+ sizeof (sndbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+ else if (this->dgram_.set_option (SOL_SOCKET,
+ SO_RCVBUF,
+ (void *) &rcvbufsize,
+ sizeof (rcvbufsize)) == -1
+ && errno != ENOTSUP)
+ return -1;
+
+ this->receiver_inet_addr_.set (receiver_addr);
+ if (ACE_OS::connect (this->dgram_.get_handle (),(sockaddr *) this->receiver_inet_addr_.get_addr (),
+ this->receiver_inet_addr_.get_size ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) datagram connect failed %p\n"),-1);
+ return 0;
+}
+
+// sends all the ACE_Message_Blocks in the current CDR stream.
+int
+SFP::send_cdr_buffer (void)
+{
+ // This is a good maximum, because Dgrams cannot be longer than
+ // 64K and the usual size for a CDR fragment is 512 bytes.
+ // @@ TODO In the future we may need to allocate some memory
+ // from the heap.
+ const int TAO_WRITEV_MAX = 128;
+ iovec iov[TAO_WRITEV_MAX];
+
+ int iovcnt = 0;
+ for (const ACE_Message_Block* b = this->encoder_->begin ();
+ b != this->encoder_->end () && iovcnt < TAO_WRITEV_MAX;
+ b = b->cont ())
+ {
+ iov[iovcnt].iov_base = b->rd_ptr ();
+ iov[iovcnt].iov_len = b->length ();
+ iovcnt++;
+ }
+ // send the message.
+ ssize_t n = this->dgram_.send (iov,
+ iovcnt,
+ this->receiver_inet_addr_);
+ if (n == -1)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "SFP::send_cdr_buffer (%t) send failed %p\n", ""));
+ return -1;
+ }
+ else if (n == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "SFP::send_cdr_buffer (%t) EOF on send \n"));
+ return -1;
+ }
+ return 0;
+}
+
+int
+SFP::send_start (void)
+{
+ int result;
+ // Start message is in network byte order.
+ // construct the start message
+ flowProtocol::Start start;
+
+ // copy the magic number into the message
+ start.magic_number [0] = ACE_HTONS ('=');
+ start.magic_number [1] = ACE_HTONS ('S');
+ start.magic_number [2] = ACE_HTONS ('T');
+ start.magic_number [3] = ACE_HTONS ('A');
+
+ // put the version number into the field
+ start.major_version = ACE_HTONS (TAO_SFP_MAJOR_VERSION);
+ start.minor_version = ACE_HTONS (TAO_SFP_MINOR_VERSION);
+
+ // flags field is all zeroes
+ start.flags = ACE_HTONS (0);
+
+ this->state_ = ACTIVE_START;
+
+ // Now send the network byte ordered start message.
+ int n = this->dgram_.send ((char *)&start,
+ sizeof (start),
+ this->receiver_inet_addr_);
+ if (n!= sizeof (start))
+ ACE_ERROR_RETURN ((LM_ERROR,"start send failed\n"),-1);
+
+ ACE_DEBUG ((LM_DEBUG," Start sent\n"));
+// // non-interval timer.
+// result = this->reactor_->schedule_timer (this,
+// 0,
+// this->timeout1_);
+// if (result != 0)
+// return result;
+
+ return 0;
+}
+
+int
+SFP::send_startReply (void)
+{
+ int result;
+
+ flowProtocol::StartReply start_reply;
+
+ // copy the magic number into the message
+ start_reply.magic_number [0] = ACE_HTONS ('=');
+ start_reply.magic_number [1] = ACE_HTONS ('S');
+ start_reply.magic_number [2] = ACE_HTONS ('T');
+ start_reply.magic_number [3] = ACE_HTONS ('R');
+
+ start_reply.flags = ACE_HTONS (0);
+
+ // Now send the network byte ordered start message.
+ int n = this->dgram_.send ((char *)&start_reply,
+ sizeof (start_reply),
+ this->receiver_inet_addr_);
+ if (n!= sizeof (start_reply))
+ ACE_ERROR_RETURN ((LM_ERROR,"startreply send failed\n"),-1);
+
+ ACE_DEBUG ((LM_DEBUG," startReply sent\n"));
+ return 0;
+}
+
+int
+SFP::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ int result;
+ // Handle the timeout for timeout1 and timeout2.
+ switch (this->state_)
+ {
+ case ACTIVE_START:
+ case PASSIVE_START:
+ // Timingout for Start Messages.
+ ACE_DEBUG ((LM_DEBUG,"Invalid state in handle_timeout\n"));
+ break;
+ case START_RECEIVED:
+ // we need to reduce the startreply_tries and also reschedule
+ // the timer.
+ if (this->startReply_tries_ --)
+ {
+ ACE_DEBUG ((LM_DEBUG,"Timed out on receiving Data Frame\n"));
+ // send startreply.
+ result = this->send_startReply ();
+ if (result != 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"Error in sending startreply"),0);
+ this->reactor_->schedule_timer (this,
+ 0,
+ this->timeout1_);
+ }
+ else
+ {
+ this->end_stream ();
+ }
+ }
+ return 0;
+}
+
+// Handle_input is called when data arrives on the dgram
+// socket. Currently both the receiver and sender side input is
+// handled in this same handle_input ().
+int
+SFP::handle_input (ACE_HANDLE fd)
+{
+ flowProtocol::MsgType msg_type;
+ ACE_INET_Addr sender;
+ char magic_number[4];
+ ssize_t n =this->dgram_.recv (magic_number,
+ sizeof(magic_number),
+ sender,
+ MSG_PEEK);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
+ else if (n==0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
+
+ // convert from network byte order to host order.
+
+ magic_number [0] = ACE_NTOHS (magic_number [0]);
+ magic_number [1] = ACE_NTOHS (magic_number [1]);
+ magic_number [2] = ACE_NTOHS (magic_number [2]);
+ magic_number [3] = ACE_NTOHS (magic_number [3]);
+
+ char *magic_string = this->magic_number_to_string (magic_number);
+
+ if (ACE_OS::strcmp (magic_string,TAO_SFP_START_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
+ msg_type = flowProtocol::start;
+ }
+ else if (ACE_OS::strcmp (magic_string,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
+ msg_type = flowProtocol::startReply;
+ }
+ else if (ACE_OS::strcmp (magic_string,TAO_SFP_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
+ msg_type = flowProtocol::SimpleFrame;
+ }
+ else if (ACE_OS::strcmp (magic_string,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
+ msg_type = flowProtocol::Fragment;
+ }
+ else if (ACE_OS::strcmp (magic_string,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
+ msg_type = flowProtocol::Credit;
+ }
+ switch (this->state_)
+ {
+ case ACTIVE_START:
+ // Check if we received a StartReply back.
+ ACE_DEBUG ((LM_DEBUG,"Unexpected message while StartReply expected\n"));
+ break;
+ case PASSIVE_START:
+ // Check if we received a Start from the Sender.
+ ACE_DEBUG ((LM_DEBUG,"Unexpected message while Start expected\n"));
+ break;
+ case START_RECEIVED:
+ // In this state we check for credit frames.
+ switch (msg_type)
+ {
+ case flowProtocol::Credit:
+ {
+ flowProtocol::credit credit;
+ n = this->dgram_.recv ((char *)&credit,
+ sizeof (credit),
+ sender);
+ if (n != sizeof (credit))
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Credit\n"),0);
+ this->credit_ += credit.cred_num;
+ break;
+ }
+ case flowProtocol::start:
+ // consume the retransmitted start message.
+ {
+ flowProtocol::Start start;
+ n = this->dgram_.recv ((char *)&start,
+ sizeof (start),
+ sender);
+ if (n != sizeof (start))
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Start\n"),0);
+ else
+ ACE_DEBUG ((LM_DEBUG,"Start message consumed\n"));
+ // ACE_DEBUG ((LM_DEBUG,"Unexpected message while
+ // Credit expected\n"));
+ break;
+ }
+ case flowProtocol::SimpleFrame:
+ {
+ ACE_Message_Block * mb =this->read_simple_frame ();
+ this->callback_->receive_frame (mb);
+ }
+ }
+ break;
+ case REPLY_RECEIVED:
+ // In this state we check for Data frames.
+ switch (msg_type)
+ {
+ case flowProtocol::startReply:
+ {
+ flowProtocol::StartReply start_reply;
+ n = this->dgram_.recv ((char *)&start_reply,
+ sizeof (start_reply),
+ sender);
+ if (n != sizeof (start_reply))
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input-StartReply\n"),0);
+ else
+ ACE_DEBUG ((LM_DEBUG,"start reply consumed\n"));
+ }
+ }
+ }
+ return 0;
+}
+
+char *
+SFP::magic_number_to_string (char *magic_number)
+{
+ char *buf;
+ ACE_NEW_RETURN (buf,
+ char [5],
+ 0);
+ for (int i=0;i<4;i++)
+ {
+ buf [i] = magic_number [i];
+ ACE_DEBUG ((LM_DEBUG,"%c ",buf [i]));
+ }
+ buf[i] = 0;
+ return buf;
+}
+
+int
+SFP::end_stream (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"SFP - ending the stream\n"));
+ int result = this->reactor_->remove_handler (this,
+ ACE_Event_Handler::READ_MASK);
+ return result;
+}
+
+int
+SFP::register_dgram_handler (void)
+{
+ int result;
+ result = this->reactor_->register_handler (this,
+ ACE_Event_Handler::READ_MASK);
+ return result;
+}
+
+ACE_HANDLE
+SFP::get_handle (void) const
+{
+ return this->dgram_.get_handle ();
+}
+
+ACE_Message_Block *
+SFP::read_simple_frame (void)
+{
+ ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
+ // Check to see what the length of the message is.
+
+ flowProtocol::frameHeader frame_header;
+ char *buf;
+ ssize_t firstlen =sizeof (frame_header.magic_number)+sizeof (frame_header.flags);
+ ssize_t buflen =firstlen+2*sizeof (CORBA::ULong)+3;// hack to ensure
+ // that buffer is aligned for CDR.
+
+ ACE_DEBUG ((LM_DEBUG,"firstlen = %d,buflen =%d\n",firstlen,buflen));
+ ACE_NEW_RETURN (buf,
+ char [buflen],
+ 0);
+
+ ACE_INET_Addr sender;
+
+ buf +=3;
+ ssize_t n =this->dgram_.recv (buf,
+ buflen,
+ sender,
+ MSG_PEEK);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
+ else if (n==0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
+ for (int i=0;i<4;i++)
+ ACE_DEBUG ((LM_DEBUG,"%c ",buf[i]));
+ //skip the magic_number..
+ buf += 4 ;
+
+ // Get the byte order from the flags.
+
+ int byte_order = buf[0];
+ ACE_DEBUG ((LM_DEBUG,"byte_order = %d\n",byte_order));
+ // move past the flags.
+ buf += 1;
+ // CORBA::ULong *header = (CORBA::ULong *) (buf+firstlen);
+ // ACE_DEBUG ((LM_DEBUG,"first ulong = %d, second ulong = %d",*(CORBA::ULong*)(buf+firstlen),
+ // *(CORBA::ULong *)(buf+firstlen+sizeof (CORBA::ULong))));
+
+ // ACE_DEBUG ((LM_DEBUG,"first ulong = %d, second ulong = %d",header [0],header[1]));
+
+
+ ACE_Message_Block mb (buf,buflen-firstlen+CDR::MAX_ALIGNMENT);
+ CDR::mb_align (&mb);
+ mb.wr_ptr (buflen-firstlen);
+ ACE_DEBUG ((LM_DEBUG,"mb len = %d\n",mb.length ()));
+ TAO_InputCDR cdr (&mb,byte_order);
+ cdr.read_octet (frame_header.message_type);
+ cdr.read_ulong (frame_header.message_size);
+
+ ACE_DEBUG ((LM_DEBUG,"message_type = %d, message_size = %d",
+ frame_header.message_type,frame_header.message_size));
+
+ char *message;
+ int message_len = buflen+frame_header.message_size;
+ ACE_NEW_RETURN (message,
+ char [message_len],
+ 0);
+
+ ACE_DEBUG ((LM_DEBUG,"message_len = %d\n",message_len));
+ n = this->dgram_.recv (message,message_len,sender);
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
+ else if (n==0)
+ ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
+
+ ACE_Message_Block *message_block;
+ ACE_NEW_RETURN (message_block,
+ ACE_Message_Block (message+buflen,
+ message_len-buflen),
+ 0);
+ message_block->wr_ptr (message_len-buflen);
+ ACE_DEBUG ((LM_DEBUG,"messageblock length: ",message_block ->length ()));
+ return message_block;
+}
diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.h b/TAO/orbsvcs/orbsvcs/AV/sfp.h
new file mode 100644
index 00000000000..2dfa734a1c5
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/sfp.h
@@ -0,0 +1,197 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// AVStreams.
+//
+// = FILENAME
+// sfp.h
+//
+// = AUTHOR
+// Nagarajan Surendran <naga@cs.wustl.edu>
+//
+// ============================================================================
+
+#if !defined (TAO_AV_SFP_H)
+#define TAO_AV_SFP_H
+
+#include "ace/SOCK_Dgram.h"
+#include "orbsvcs/sfpC.h"
+
+// default arguments to pass to use for the ORB
+const char *TAO_SFP_ORB_ARGUMENTS = "-ORBobjrefstyle URL";
+
+// SFP magic numbers
+const char *TAO_SFP_MAGIC_NUMBER = "=SFP";
+const char *TAO_SFP_FRAGMENT_MAGIC_NUMBER = "FRAG";
+const char *TAO_SFP_START_MAGIC_NUMBER = "=STA";
+const char *TAO_SFP_CREDIT_MAGIC_NUMBER = "=CRE";
+const char *TAO_SFP_STARTREPLY_MAGIC_NUMBER = "=STR";
+
+// SFP version 1.0
+const unsigned char TAO_SFP_MAJOR_VERSION = 1;
+const unsigned char TAO_SFP_MINOR_VERSION = 0;
+
+// lengths of various SFP headers
+const unsigned char TAO_SFP_FRAME_HEADER_LEN = 12;
+
+class TAO_ORBSVCS_Export SFP_Callback
+ // =TITLE
+ // Callback interface for SFP.
+ //
+ // =Description
+ // Application should create a callback object which they
+ // register with the SFP. The SFP implementation notifies the
+ // applicationn of any changes in the stream status like stream
+ // established, stream ended.
+{
+public:
+ virtual int start_failed (void) = 0;
+ // This is called for both active and passive start.
+
+ virtual int stream_established (void) = 0;
+ // This is a callback for both active and passive stream
+ // establshment.
+
+ virtual int receive_frame (ACE_Message_Block *frame) =0;
+};
+
+class TAO_ORBSVCS_Export SFP :public virtual ACE_Event_Handler
+ // = TITLE
+ // SFP implementation on UDP.
+ //
+ // = Description
+ // This implements the methods to send and receive data octet
+ // streams using the Simple Flow Protocol.
+
+{
+public:
+ enum State
+ {
+ ACTIVE_START,
+ PASSIVE_START,
+ TIMEDOUT_T1,
+ TIMEDOUT_T2,
+ REPLY_RECEIVED,
+ START_RECEIVED
+ };
+
+ SFP (CORBA::ORB_ptr orb,
+ ACE_Reactor* reactor,
+ ACE_Time_Value timeout1,
+ ACE_Time_Value timeout2,
+ SFP_Callback *callback);
+ // constructor.
+
+ virtual int start_stream (const char *receiver_addr);
+ // Actively start the stream by trying to connect to the UDP
+ // receiver_addr in host:port format.
+
+ virtual int start_stream (const char *local_addr,int credit_);
+ // Passive start.
+
+ virtual int send_simple_frame (ACE_Message_Block *frame);
+ // sends a single frame over UDP.
+
+ virtual int send_frame (ACE_Message_Block *frame);
+ // This will send a larger frame fragmenting if necessary.
+
+ virtual ACE_Message_Block* read_simple_frame (void);
+ // receives a single frame from the network.
+
+ virtual int end_stream (void);
+ // terminates the stream.
+
+ virtual int handle_input (ACE_HANDLE fd);
+ // Callback when event happens on the dgram socket.
+
+ virtual int handle_timeout (const ACE_Time_Value&, const void*);
+ // Used for timeout for the number of tries for starting a stream.
+
+ virtual ACE_HANDLE get_handle (void) const;
+private:
+
+ int create_cdr_buffer (char *message,
+ size_t length);
+ // Helper - copies length bytes from the given message into the CDR
+ // buffer. Returns 0 on success, -1 on failure
+
+ //// ACE_Message_Block *create_message_block (void);
+ void set_cdr_length (void);
+ // Helper method - copies the buffer in encoder_
+ // into a new ACE_Message_Block and returns it.
+ // Returns 0 on failure
+
+ void create_local_dgram (void);
+ // Create the local dgram endpoint.
+
+ int connect_to_receiver (const char *receiver_addr);
+ // Creates a connected dgram with the receiver addr.
+
+ int send_start (void);
+ // sends the start message to the receiver.
+
+ int send_startReply (void);
+ // sends the StartReply message to the receiver.
+
+ int send_cdr_buffer (void);
+ // sends the encoders cdr buffer using iovecs.
+
+ char *magic_number_to_string (char *magic_number);
+ // appends a 0 to the end of the magic number.
+
+ int register_dgram_handler (void);
+ // registers the dgram socket with the reactor.
+
+ CORBA::ORB_ptr orb_;
+ // ORB reference.
+
+ ACE_Reactor* reactor_;
+ // Used for registering the dgram handler.
+
+ TAO_OutputCDR *encoder_;
+ // Use the TAO CDR encoder to encode everything
+
+ TAO_InputCDR *decoder_;
+ // Use the TAO CDR encoder to encode everything
+
+ ACE_SOCK_Dgram dgram_;
+ // Connection Oriented Dgram.
+
+ int start_tries_;
+ // Number of tries to send a Start message.
+
+ int startReply_tries_;
+ // Number of tries to send a StartReply message.
+
+ CORBA::ULong credit_;
+ // Credit on the number of frames.
+
+ ACE_Time_Value timeout1_;
+ // Timeout used for Start on Sender side and also for Credit on
+ // receiver side.
+
+ ACE_Time_Value timeout2_;
+ // Timeout used for StartReply on the receiver side and also for
+ // CREDIT on the sender side.
+
+ State state_;
+ // State variable.
+ // @@We can use the state pattern here.
+
+ const char *receiver_addr_;
+ // The address of the receiver to which we're connected to.
+
+ ACE_INET_Addr receiver_inet_addr_;
+ // INET addr of the receiver.
+
+ SFP_Callback *callback_;
+ // Application Callback Object.
+
+ int sequence_num_;
+ // sequence number of the packet.
+};
+
+#endif /* !defined (TAO_SFP_H) */