summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/sfp.cpp')
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/sfp.cpp1159
1 files changed, 0 insertions, 1159 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
deleted file mode 100644
index bf08d0a3b74..00000000000
--- a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
+++ /dev/null
@@ -1,1159 +0,0 @@
-// $Id$
-
-#include "orbsvcs/AV/sfp.h"
-#include "ace/ARGV.h"
-
-// default arguments to pass to use for the ORB
-const char *TAO_SFP::TAO_SFP_ORB_ARGUMENTS = "-ORBobjrefstyle URL";
-
-// SFP magic numbers
-const char *TAO_SFP::TAO_SFP_MAGIC_NUMBER = "=SFP";
-const char *TAO_SFP::TAO_SFP_FRAGMENT_MAGIC_NUMBER = "FRAG";
-const char *TAO_SFP::TAO_SFP_START_MAGIC_NUMBER = "=STA";
-const char *TAO_SFP::TAO_SFP_CREDIT_MAGIC_NUMBER = "=CRE";
-const char *TAO_SFP::TAO_SFP_STARTREPLY_MAGIC_NUMBER = "=STR";
-
-// SFP version 1.0
-const unsigned char TAO_SFP::TAO_SFP_MAJOR_VERSION = 1;
-const unsigned char TAO_SFP::TAO_SFP_MINOR_VERSION = 0;
-
-// lengths of various SFP headers
-const unsigned char TAO_SFP::TAO_SFP_FRAME_HEADER_LEN = 12;
-
-int
-operator< (const TAO_SFP_Fragment_Node& left,
- const TAO_SFP_Fragment_Node& right)
-{
- return left.fragment_info_.frag_number < right.fragment_info_.frag_number;
-}
-
-// constructor.
-TAO_SFP::TAO_SFP (CORBA::ORB_ptr orb,
- ACE_Reactor* reactor,
- ACE_Time_Value timeout1,
- ACE_Time_Value timeout2,
- SFP_Callback *callback)
- :orb_ (orb),
- reactor_ (reactor),
- start_tries_ (10),
- startReply_tries_ (10),
- timeout1_ (timeout1),
- timeout2_ (timeout2),
- callback_ (callback),
- sequence_num_ (0),
- credit_num_ (10),
- magic_number_len_ (sizeof (magic_number_)-1)
-{
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- // fill in the default frameHeader fields.
- this->frame_header_.magic_number [0] = '=';
- this->frame_header_.magic_number [1] = 'S';
- this->frame_header_.magic_number [2] = 'F';
- this->frame_header_.magic_number [3] = 'P';
- this->frame_header_.flags = TAO_ENCAP_BYTE_ORDER;
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_frameHeader,
- &this->frame_header_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- this->frame_header_len_ = this->output_cdr_.total_length ();
- // fill in the default fragment message fields.
- this->fragment_.magic_number [0] = 'F';
- this->fragment_.magic_number [1] = 'R';
- this->fragment_.magic_number [2] = 'A';
- this->fragment_.magic_number [3] = 'G';
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_fragment,
- &this->fragment_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- this->fragment_len_ = this->output_cdr_.total_length ();
- // fill in the default Start message fields.
- this->start_.magic_number [0] = '=';
- this->start_.magic_number [1] = 'S';
- this->start_.magic_number [2] = 'T';
- this->start_.magic_number [3] = 'A';
- this->start_.major_version = TAO_SFP_MAJOR_VERSION;
- this->start_.minor_version = TAO_SFP_MINOR_VERSION;
- this->start_.flags = 0;
- this->start_len_ = sizeof (this->start_);
- // fill in the default StartReply message fields.
- this->start_reply_.magic_number [0] = '=';
- this->start_reply_.magic_number [1] = 'S';
- this->start_reply_.magic_number [2] = 'T';
- this->start_reply_.magic_number [3] = 'R';
- this->start_reply_.flags = 0;
- this->start_reply_len_ = sizeof (this->start_reply_);
- // fill in the default Credit message fields.
- this->credit_.magic_number [0] = '=';
- this->credit_.magic_number [1] = 'C';
- this->credit_.magic_number [2] = 'R';
- this->credit_.magic_number [3] = 'E';
- this->credit_len_ = sizeof (this->credit_);
- this->output_cdr_.reset ();
- // this->output_cdr_ <<= this->credit_;
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_credit,
- &this->credit_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- this->credit_len_ = this->output_cdr_.total_length ();
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_SFP constructor");
- }
- ACE_ENDTRY;
-}
-
-// Start the active end of the stream.
-int
-TAO_SFP::start_stream (const char *receiver_addr)
-{
- int result;
- ACE_INET_Addr sender;
- result = this->connect_to_receiver (receiver_addr);
- if (result < 0)
- return result;
- while (this->start_tries_ > 0)
- {
- result = this->send_start ();
- if (result != 0)
- return result;
- // Timed recv.
- char magic_number [MAGIC_NUMBER_LEN];
- ssize_t n =this->dgram_.recv (magic_number,
- this->magic_number_len_,
- sender,
- MSG_PEEK,
- &this->timeout1_);
- // ACE_DEBUG ((LM_DEBUG,"n = %d\n",n));
- if (n == -1)
- {
- if (errno == ETIME)
- {
- ACE_DEBUG ((LM_DEBUG,"Timed out in reading StartReply"));
- this->start_tries_ --;
- continue;
- }
- else
- ACE_ERROR_RETURN ((LM_ERROR,"dgram recv error:%d,%p",errno,"recv"),-1);
- }
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::start_stream -peek"),-1);
- // Null terminate the magic number.
- magic_number [this->magic_number_len_] = 0;
- // check if its startreply message.
- if (ACE_OS::strcmp (magic_number,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
- flowProtocol::StartReply start_reply;
- n = this->dgram_.recv ((char *)&start_reply,
- sizeof (start_reply),
- sender);
- if (n != sizeof (start_reply))
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input-StartReply\n"),0);
- // check for SFP version difference.??
- this->state_ = REPLY_RECEIVED;
- }
- else
- ACE_ERROR_RETURN ((LM_ERROR,"Invalid message while StartReply expected\n"),0);
- // register the data handler.
- return this->register_dgram_handler ();
- }
- return 0;
-}
-
-// Start the passive end of the stream.
-int
-TAO_SFP::start_stream (const char *local_addr,int /* Credit */)
-{
- int result;
- ACE_INET_Addr sender;
-
- this->state_ = PASSIVE_START;
- ACE_INET_Addr myaddr (local_addr);
- result = this->dgram_.open (myaddr);
-
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::passive start- open failed\n"),-1);
-
- char magic_number[MAGIC_NUMBER_LEN];
- // Timed recv.
- ssize_t n =this->dgram_.recv (magic_number,
- this->magic_number_len_,
- sender,
- MSG_PEEK,
- &this->timeout2_);
- if ((n == -1) && (errno == ETIME))
- {
- ACE_ERROR_RETURN ((LM_ERROR,"Timedout in reading Start"),-1);
- }
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::start_stream -peek"),-1);
- // Null terminate the magic_number.
- magic_number [this->magic_number_len_] = 0;
- if (ACE_OS::strcmp (magic_number,TAO_SFP_START_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"Start received:"));
- // Read the start message.
- flowProtocol::Start start;
- n = this->dgram_.recv ((char *)&start,
- sizeof (start),
- sender);
- if (n != sizeof (start))
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Start\n"),0);
- else
- ACE_DEBUG ((LM_DEBUG,"Start message consumed\n"));
- this->state_ = START_RECEIVED;
- this->receiver_inet_addr_.set (sender);
- // Now send a startReply message back.
- result = this->send_startReply ();
- if (result != 0)
- return result;
- // Now we register a timeout handler until we receive a data
- // frame.
- result = this->reactor_->schedule_timer (this,
- 0,
- this->timeout1_);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"schedule_timer failed\n"),result);
-
- // register the data handler.
- return this->register_dgram_handler ();
- }
- else
- ACE_ERROR_RETURN ((LM_ERROR,"Invalid messaged received while Start expected\n"),-1);
-}
-
-// Sends the ACE_Message_Block data as a frame, fragmenting if necessary.
-int
-TAO_SFP::send_frame (ACE_Message_Block *frame)
-{
- ACE_TRY_NEW_ENV
- {
- if (this->credit_num_ > 0)
- {
- // if we have enough credit then we send.
- int total_length = 0;
- for (ACE_Message_Block *temp = frame;temp != 0;temp = temp->cont ())
- total_length += temp->length ();
- ACE_DEBUG ((LM_DEBUG,"total_length of frame=%d\n",total_length));
- if (total_length < (SFP_MAX_PACKET_SIZE -this->frame_header_len_))
- {
- // clear the output cdr.
- this->output_cdr_.reset ();
- // CDR encode the frame header.
- //(<<= isAvailable only in compiled marshalling!)
- this->frame_header_.message_type = flowProtocol::SimpleFrame_Msg;
- this->frame_header_.message_size = frame->length ()+this->frame_header_len_;
- this->output_cdr_.encode (flowProtocol::_tc_frameHeader,
- &this->frame_header_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- // this->output_cdr_ <<= this->frame_header_;
- this->send_cdr_buffer (this->output_cdr_,frame);
- }
- else // larger frame,fragment and send it.
- {
- // set the fragments bit.
- this->frame_header_.flags |= 2;
- // This is a good maximum, because Dgrams cannot be longer than
- // 64K and the usual size for a CDR fragment is 512 bytes.
- // @@ TODO In the future we may need to allocate some memory
- // from the heap.
- int message_len = this->frame_header_len_;
- iovec iov[TAO_WRITEV_MAX];
- int iovcnt = 1;// since first iov is for frameHeader.
- flowProtocol::frame frame_info;
- frame_info.timestamp = 10;
- frame_info.synchSource = 10;
- frame_info.source_ids.length (1);
- frame_info.source_ids [0] = 1; // XXX random number.
- frame_info.sequence_num = this->sequence_num_;
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_frame,
- &frame_info,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- ACE_DEBUG ((LM_DEBUG,"frame info length:%d\n",this->output_cdr_.total_length ()));
- for (const ACE_Message_Block* b = this->output_cdr_.begin ()->clone ();
- b != 0 && iovcnt < TAO_WRITEV_MAX;
- b = b->cont ())
- {
- // ACE_DEBUG ((LM_DEBUG,"iovcnt:%d\n",iovcnt));
- iov[iovcnt].iov_base = b->rd_ptr ();
- iov[iovcnt].iov_len = b->length ();
- message_len += b->length ();
- ACE_DEBUG ((LM_DEBUG,"send_cdr_buffer:length=%d\n",b->length ()));
- // print the buffer.
- // this->dump_buf (b->rd_ptr (),b->length ());
- iovcnt++;
- }
- ACE_Message_Block *mb = frame;
- int prev_len;
- while (mb != 0)
- {
- prev_len = message_len;
- message_len += mb->length ();
- if (message_len > SFP_MAX_PACKET_SIZE)
- {
- // get only the length that we can accomodate.
- size_t current_len = SFP_MAX_PACKET_SIZE - prev_len;
- if (current_len < mb->length ())
- {
- // The above condition is an assertion.
- iov [iovcnt].iov_base = mb->rd_ptr ();
- iov [iovcnt].iov_len = current_len;
- message_len += (current_len-mb->length ());
- mb->rd_ptr (current_len);
- iovcnt++;
- }
- break;
- }
- else
- {
- // we can accomodate this message block
- iov [iovcnt].iov_base = mb->rd_ptr ();
- iov [iovcnt].iov_len = mb->length ();
- message_len += mb->length ();
- iovcnt++;
- mb = mb->cont ();
- }
- }
- // This can be either a simpleframe or a sequenced frame,other types of frames.
- this->frame_header_.message_type = flowProtocol::Frame_Msg;
- this->frame_header_.message_size = message_len;
- ACE_DEBUG ((LM_DEBUG,"first fragment of size:%d\n",message_len- this->frame_header_len_));
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_frameHeader,
- &this->frame_header_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- // header will be only in the first cdr fragment.
- iov[0].iov_base = this->output_cdr_.begin ()->rd_ptr ();
- iov[0].iov_len = this->output_cdr_.begin ()->length ();
- ACE_DEBUG ((LM_DEBUG,"frame header len:%d\n",iov[0].iov_len));
- // send the first fragment.
- for (int i=0;i<iovcnt;i++)
- {
- // this->dump_buf (iov[i].iov_base,iov[i].iov_len);
- }
- ssize_t n = this->dgram_.send (iov,
- iovcnt,
- this->receiver_inet_addr_);
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,
- "send_frame (%t) fragment 0 send failed %p\n", ""),-1);
- else if (n == 0)
- ACE_ERROR_RETURN ((LM_ERROR,
- "send_Frame (%t) EOF on send \n"),-1);
-
- int frag_number = 1;
- // If there is any more data send those as fragments.
- while (mb != 0)
- {
- message_len = this->fragment_len_;
- iovcnt = 1;// 1 is for the frag header.
- while (mb != 0)
- {
- prev_len = message_len;
- message_len += mb->length ();
- if (message_len > SFP_MAX_PACKET_SIZE)
- {
- // get only the length that we can accomodate.
- size_t current_len = SFP_MAX_PACKET_SIZE - prev_len;
- if (current_len < mb->length ())
- {
- // The above condition is an assertion.
- iov [iovcnt].iov_base = mb->rd_ptr ();
- iov [iovcnt].iov_len = current_len;
- message_len += (current_len - mb->length ());
- mb->rd_ptr (current_len);
- iovcnt++;
- }
- break;
- }
- else
- {
- // we can accomodate this message block
- iov [iovcnt].iov_base = mb->rd_ptr ();
- iov [iovcnt].iov_len = mb->length ();
- iovcnt++;
- mb = mb->cont ();
- }
- }
- this->fragment_.flags = TAO_ENCAP_BYTE_ORDER;
- if (mb == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"sending the last fragment\n"));
- // This is the last fragment so clear the fragments bit.
- }
- else
- {
- // set the more fragments flag
- this->fragment_.flags |= 2;
- }
- // if there are no data blocks.
- if (iovcnt == 1)
- break;
- this->fragment_.frag_number = frag_number++;
- this->fragment_.sequence_num = this->sequence_num_;
- this->fragment_.frag_sz = message_len;
- this->fragment_.source_id = 0;
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_fragment,
- &this->fragment_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- ACE_DEBUG ((LM_DEBUG,"sending a fragment numbered %d of size %d\n",
- this->fragment_.frag_number,
- this->fragment_.frag_sz));
- // THe header will be only in the first cdr fragment.
- iov[0].iov_base = this->output_cdr_.begin ()->rd_ptr ();
- iov[0].iov_len = this->output_cdr_.begin ()->length ();
- // send the fragment now.
- // without the sleep the fragments gets lost!
- // probably because the UDP buffer queue on the sender side
- // is overflown it drops the packets.
- // XXX: This is a hack.
- ACE_OS::sleep (1);
- ssize_t n = this->dgram_.send (iov,
- iovcnt,
- this->receiver_inet_addr_);
- if ((n == -1) || (n==0))
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::send_framed failed:%p\n",""),-1);
- }
- }
- }
- else
- {
- // flow controlled so wait.
- }
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_SFP::send_frame");
- return -1;
- }
- ACE_ENDTRY;
- return 0;
-}
-
-
-// creates a connected dgram.
-int
-TAO_SFP::connect_to_receiver (const char *receiver_addr)
-{
- this->receiver_addr_ = ACE_OS::strdup (receiver_addr);
- // Get the local UDP address
- if (this->dgram_.open (ACE_Addr::sap_any) == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"(%P|%t) datagram open failed %p\n"),1);
-
- // set the socket buffer sizes to 64k.
- int sndbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
- int rcvbufsize = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
-
- if (this->dgram_.set_option (SOL_SOCKET,
- SO_SNDBUF,
- (void *) &sndbufsize,
- sizeof (sndbufsize)) == -1
- && errno != ENOTSUP)
- return -1;
- else if (this->dgram_.set_option (SOL_SOCKET,
- SO_RCVBUF,
- (void *) &rcvbufsize,
- sizeof (rcvbufsize)) == -1
- && errno != ENOTSUP)
- return -1;
-
- this->receiver_inet_addr_.set (receiver_addr);
- return 0;
-}
-
-// sends all the ACE_Message_Blocks in the current CDR stream.
-int
-TAO_SFP::send_cdr_buffer (TAO_OutputCDR &cdr,ACE_Message_Block *mb)
-{
- // This is a good maximum, because Dgrams cannot be longer than
- // 64K and the usual size for a CDR fragment is 512 bytes.
- // @@ TODO In the future we may need to allocate some memory
- // from the heap.
- iovec iov[TAO_WRITEV_MAX];
- int iovcnt = 0;
- const ACE_Message_Block* b = 0;
- for (b = cdr.begin ();
- b != cdr.end () && iovcnt < TAO_WRITEV_MAX;
- b = b->cont ())
- {
- iov[iovcnt].iov_base = b->rd_ptr ();
- iov[iovcnt].iov_len = b->length ();
- // ACE_DEBUG ((LM_DEBUG,"send_cdr_buffer:length=%d\n",b->length ()));
- // print the buffer.
- // this->dump_buf (b->rd_ptr (),b->length ());
- iovcnt++;
- }
- for (b = mb; b!=0 && iovcnt < TAO_WRITEV_MAX; b=b->cont ())
- {
- iov [iovcnt].iov_base = b->rd_ptr ();
- iov [iovcnt].iov_len = b->length ();
- iovcnt++;
- }
- // send the message.
- ssize_t n = this->dgram_.send (iov,
- iovcnt,
- this->receiver_inet_addr_);
- if (n == -1)
- {
- ACE_DEBUG ((LM_DEBUG,
- "SFP::send_cdr_buffer (%t) send failed %p\n", ""));
- return -1;
- }
- else if (n == 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- "SFP::send_cdr_buffer (%t) EOF on send \n"));
- return -1;
- }
- return 0;
-}
-
-int
-TAO_SFP::send_start (void)
-{
- // copy the magic number into the message
- this->state_ = ACTIVE_START;
- // Now send the network byte ordered start message.
- int n = this->dgram_.send ((char *)&this->start_,
- this->start_len_,
- this->receiver_inet_addr_);
- if (n!= this->start_len_)
- ACE_ERROR_RETURN ((LM_ERROR,"start send failed\n"),-1);
-
- ACE_DEBUG ((LM_DEBUG," Start sent\n"));
- return 0;
-}
-
-int
-TAO_SFP::send_startReply (void)
-{
- int n = this->dgram_.send ((char *)&this->start_reply_,
- this->start_reply_len_,
- this->receiver_inet_addr_);
- if (n!= this->start_reply_len_)
- ACE_ERROR_RETURN ((LM_ERROR,"startreply send failed\n"),-1);
-
- ACE_DEBUG ((LM_DEBUG," startReply sent\n"));
- return 0;
-}
-
-int
-TAO_SFP::handle_timeout (const ACE_Time_Value &/* tv */,
- const void * /* arg */)
-{
- int result;
- // Handle the timeout for timeout1 and timeout2.
- switch (this->state_)
- {
- case ACTIVE_START:
- case PASSIVE_START:
- // Timingout for Start Messages.
- ACE_DEBUG ((LM_DEBUG,"Invalid state in handle_timeout\n"));
- break;
- case START_RECEIVED:
- // we need to reduce the startreply_tries and also reschedule
- // the timer.
- if (this->startReply_tries_ --)
- {
- ACE_DEBUG ((LM_DEBUG,"Timed out on receiving Data Frame\n"));
- // send startreply.
- result = this->send_startReply ();
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"Error in sending startreply"),0);
- this->reactor_->schedule_timer (this,
- 0,
- this->timeout1_);
- }
- else
- {
- this->end_stream ();
- }
- break;
- default:
- ACE_DEBUG ((LM_DEBUG,"Handle_timeout: No Action in this state %d",this->state_));
- }
- return 0;
-}
-
-// Handle_input is called when data arrives on the dgram
-// socket. Currently both the receiver and sender side input is
-// handled in this same handle_input ().
-int
-TAO_SFP::handle_input (ACE_HANDLE /* fd */)
-{
- // ACE_DEBUG ((LM_DEBUG,"TAO_SFP::handle_input\n"));
- flowProtocol::MsgType msg_type = flowProtocol::Start_Msg;
- ACE_INET_Addr sender;
- char peek_buffer [MAGIC_NUMBER_LEN+2];// 2 is for flags + message_type.
- int peek_len = MAGIC_NUMBER_LEN +2;
- ssize_t n =this->dgram_.recv (peek_buffer,
- peek_len,
- sender,
- MSG_PEEK);
- ACE_OS::strncpy (this->magic_number_,
- peek_buffer,
- this->magic_number_len_);
- this->magic_number_ [this->magic_number_len_] = 0;
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),-1);
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
-
- if (ACE_OS::strcmp (this->magic_number_,TAO_SFP_START_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)Start message received\n"));
- msg_type = flowProtocol::Start_Msg;
- }
- else if (ACE_OS::strcmp (this->magic_number_,TAO_SFP_STARTREPLY_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)StartReply message received\n"));
- msg_type = flowProtocol::StartReply_Msg;
- }
- else if (ACE_OS::strcmp (this->magic_number_,TAO_SFP_MAGIC_NUMBER) == 0)
- {
- // ACE_DEBUG ((LM_DEBUG,"(%P|%t) frameHeader received\n"));
- // msg_type = flowProtocol::SimpleFrame;
- msg_type = (flowProtocol::MsgType)peek_buffer [MESSAGE_TYPE_OFFSET];
- ACE_DEBUG ((LM_DEBUG,"Message Type = %d\n",msg_type));
- }
- else if (ACE_OS::strcmp (this->magic_number_,TAO_SFP_FRAGMENT_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t) fragment Header received\n"));
- msg_type = flowProtocol::Fragment_Msg;
- }
- else if (ACE_OS::strcmp (this->magic_number_,TAO_SFP_CREDIT_MAGIC_NUMBER) == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t) credit message received\n"));
- msg_type = flowProtocol::Credit_Msg;
- }
- else
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP:Invalid magic number\n"),0);
- switch (this->state_)
- {
- case ACTIVE_START:
- // Check if we received a StartReply back.
- ACE_DEBUG ((LM_DEBUG,"Unexpected message while StartReply expected\n"));
- break;
- case PASSIVE_START:
- // Check if we received a Start from the Sender.
- ACE_DEBUG ((LM_DEBUG,"Unexpected message while Start expected\n"));
- break;
- case START_RECEIVED:
- // In this state we check for credit frames.
- switch (msg_type)
- {
- case flowProtocol::Credit_Msg:
- {
- flowProtocol::credit credit;
- n = this->dgram_.recv ((char *)&credit,
- sizeof (credit),
- sender);
- if (n != sizeof (credit))
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Credit\n"),0);
- break;
- }
- case flowProtocol::Start_Msg:
- // consume the retransmitted start message.
- {
- flowProtocol::Start start;
- n = this->dgram_.recv ((char *)&start,
- sizeof (start),
- sender);
- if (n != sizeof (start))
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input - Start\n"),0);
- else
- ACE_DEBUG ((LM_DEBUG,"Start message consumed\n"));
- break;
- }
-
- case flowProtocol::Frame_Msg:
- case flowProtocol::SimpleFrame_Msg:
- {
- ACE_Message_Block * mb =this->read_simple_frame ();
- if (mb != 0)
- this->callback_->receive_frame (mb);
- else
- {
- if (!this->more_fragments_)
- {
- char buf[BUFSIZ];
- // consume the wrong UDP frame.
- this->dgram_.recv (buf,
- BUFSIZ,
- sender);
- }
- }
- break;
- }
- case flowProtocol::Fragment_Msg:
- {
- ACE_DEBUG ((LM_DEBUG,"Fragment received\n"));
- ACE_Message_Block *result = this->read_fragment ();
- // no more fragments.
- if (result != 0)
- this->callback_->receive_frame (result);
- break;
- }
- case flowProtocol::EndofStream_Msg:
- {
- char *buf;
- ACE_NEW_RETURN (buf,
- char [ this->frame_header_len_],
- -1);
- n = this->dgram_.recv (buf,
- this->frame_header_len_,
- sender);
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"Error reading EndofStream;%p",""),-1);
- this->callback_->end_stream ();
- return -1;
- }
- default:
- break;
- }
- break;
- case REPLY_RECEIVED:
- // In this state we check for Data frames.
- switch (msg_type)
- {
- case flowProtocol::StartReply_Msg:
- {
- flowProtocol::StartReply start_reply;
- n = this->dgram_.recv ((char *)&start_reply,
- sizeof (start_reply),
- sender);
- if (n != sizeof (start_reply))
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input-StartReply\n"),0);
- else
- ACE_DEBUG ((LM_DEBUG,"start reply consumed\n"));
- }
- break;
- default:
- ACE_DEBUG ((LM_DEBUG,"Invalid message in state REPLY_RECEIVED\n"));
- break;
- }
- break;
- default:
- break;
- }
- return 0;
-}
-
-int
-TAO_SFP::end_stream (void)
-{
- int result = -1;
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- ACE_DEBUG ((LM_DEBUG,"SFP - ending the stream\n"));
- // send the EndofStream message.
- this->frame_header_.flags = TAO_ENCAP_BYTE_ORDER;
- this->frame_header_.message_type = flowProtocol::EndofStream_Msg;
- this->output_cdr_.reset ();
- this->output_cdr_.encode (flowProtocol::_tc_frameHeader,
- &this->frame_header_,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- ssize_t n = this->dgram_.send (this->output_cdr_.begin ()->rd_ptr (),
- this->output_cdr_.begin ()->length (),
- this->receiver_inet_addr_);
- if ((n==-1) || (n==0))
- ACE_ERROR_RETURN ((LM_ERROR,"Error sending endofstream message:%p",""),-1);
- result = this->reactor_->remove_handler (this,
- ACE_Event_Handler::READ_MASK);
-
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_SFP::end_stream ()\n");
- return result;
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (-1);
- return result;
-}
-
-int
-TAO_SFP::register_dgram_handler (void)
-{
- int result = this->reactor_->register_handler (this,
- ACE_Event_Handler::READ_MASK);
- return result;
-}
-
-ACE_HANDLE
-TAO_SFP::get_handle (void) const
-{
- return this->dgram_.get_handle ();
-}
-
-ACE_Message_Block *
-TAO_SFP::read_simple_frame (void)
-{
- ACE_Message_Block *message_block = 0;
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- ACE_DEBUG ((LM_DEBUG,"Reading simple frame\n"));
- // Check to see what the length of the message is.
-
- flowProtocol::frameHeader frame_header;
-
- int result =
- this->read_frame_header (frame_header);
-
- if (result < 0)
- return 0;
- int byte_order = frame_header.flags & 0x1;
- int message_len = frame_header.message_size;
-
- ACE_NEW_RETURN (message_block,
- ACE_Message_Block (message_len),
- 0);
- ACE_INET_Addr sender;
- int n = this->dgram_.recv (message_block->wr_ptr (),message_len,sender);
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
- else if (n != message_len)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
- // print the buffer.
- // this->dump_buf (message,n);
- // skip over the frame header.
- message_block->rd_ptr (this->frame_header_len_);
- message_block->wr_ptr (n);
- if (frame_header.flags & 0x2)
- {
- ACE_DEBUG ((LM_DEBUG,"fragmented frame:0th fragment\n"));
- this->more_fragments_ = 1;
- // read the frame info.
- ACE_Message_Block frame_info_mb (message_len-this->frame_header_len_+ACE_CDR::MAX_ALIGNMENT);
- ACE_CDR::mb_align (&frame_info_mb);
- frame_info_mb.copy (message_block->rd_ptr (),
- message_block->length ());
- // print the buffer.
- // this->dump_buf (message_block->rd_ptr (),16);
- TAO_InputCDR frame_info_cdr (&frame_info_mb,byte_order);
- flowProtocol::frame frame_info;
- frame_info_cdr.decode (flowProtocol::_tc_frame,
- &frame_info,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
- ACE_DEBUG ((LM_DEBUG,"frame.timestamp = %d, frame.synchsource = %d, frame.sequence_num = %d\n",
- frame_info.timestamp,
- frame_info.synchSource,
- frame_info.sequence_num));
- // The remaining message in the CDR stream is the fragment data for frag.0
- ACE_Message_Block *data =
- frame_info_cdr.start ()->clone ();
- ACE_DEBUG ((LM_DEBUG,"Length of 0th fragment= %d\n",data->length ()));
- TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
- TAO_SFP_Fragment_Node *new_node;
- ACE_NEW_RETURN (new_node,
- TAO_SFP_Fragment_Node,
- 0);
- new_node->fragment_info_.frag_sz = data->length ();
- new_node->fragment_info_.frag_number = 0;
- new_node->fragment_info_.source_id = frame_info.source_ids [0];
- new_node->data_ = data;
- if (this->fragment_table_.find (frame_info.sequence_num,fragment_entry) == 0)
- {
- // This case can happen where a nth (n > 0)fragment is received before the 0th fragment.
- ACE_DEBUG ((LM_DEBUG,"fragment table entry found for 0th fragment:\n"));
- result = fragment_entry->fragment_set_.insert (*new_node);
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
- // check if all the fragments have been received.
- return check_all_fragments (fragment_entry);
- }
- else
- {
- ACE_DEBUG ((LM_DEBUG,"fragment table entry not found for 0th fragment\n"));
- TAO_SFP_Fragment_Table_Entry *new_entry;
- ACE_NEW_RETURN (new_entry,
- TAO_SFP_Fragment_Table_Entry,
- 0);
- result = new_entry->fragment_set_.insert (*new_node);
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"insert for 0th fragment failed\n"),0);
- // not found. so bind a new entry.
- result = this->fragment_table_.bind (frame_info.sequence_num,new_entry);
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"fragment table bind failed\n"),0);
- return 0;
- }
- }
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"read_simple_frame");
- return 0;
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (0);
- return message_block;
-}
-
-int
-TAO_SFP::read_frame_header (flowProtocol::frameHeader &frame_header)
-{
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- ACE_INET_Addr sender;
- char *buf;
- ACE_NEW_RETURN (buf,
- char [this->frame_header_len_+ACE_CDR::MAX_ALIGNMENT],
- 0);
- ssize_t n =this->dgram_.recv (buf,
- this->frame_header_len_,
- sender,
- MSG_PEEK);
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame -peek:%p",""),0);
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame -peek:%p",""),0);
- else if (n != this->frame_header_len_)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame - not able to peek\n"),0);
- // print the buffer.
- // this->dump_buf (buf,n);
- ACE_Message_Block mb (n+ACE_CDR::MAX_ALIGNMENT);
- ACE_CDR::mb_align (&mb);
- int result
- = mb.copy (buf,n);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"Message_Block::copy failed\n"),0);
- // buf[4] is the byte order.
- int byte_order = buf[4] & 0x1;
- // ACE_DEBUG ((LM_DEBUG,"mb len = %d,byte_order=%d\n",mb.length (),byte_order));
- TAO_InputCDR cdr (&mb,byte_order);
- // cdr >>= frame_header;
- cdr.decode (flowProtocol::_tc_frameHeader,
- &frame_header,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
-
- ACE_DEBUG ((LM_DEBUG,"message_type = %d, message_size = %d,message_flags = %d\n",
- frame_header.message_type,frame_header.message_size,frame_header.flags));
-
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_SFP::read_frame_header");
- return -1;
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (-1);
- return 0;
-}
-
-ACE_Message_Block *
-TAO_SFP::read_fragment (void)
-{
- TAO_SFP_Fragment_Table_Entry *fragment_entry = 0;
- ACE_DECLARE_NEW_CORBA_ENV;
- ACE_TRY
- {
- flowProtocol::fragment fragment;
- ACE_INET_Addr sender;
- char *buf = 0;
- ACE_NEW_RETURN (buf,
- char [this->fragment_len_+ACE_CDR::MAX_ALIGNMENT],
- 0);
- ssize_t n =this->dgram_.recv (buf,
- this->fragment_len_,
- sender,
- MSG_PEEK);
- if (n == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_fragment -peek:%p",""),0);
- else if (n==0)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame -peek:%p",""),0);
- else if (n != this->fragment_len_)
- ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame - not able to peek\n"),0);
- // print the buffer.
- this->dump_buf (buf,n);
- ACE_Message_Block mb (n+ACE_CDR::MAX_ALIGNMENT);
- ACE_CDR::mb_align (&mb);
- int result
- = mb.copy (buf,n);
- if (result == -1)
- ACE_ERROR_RETURN ((LM_ERROR,"read_fragment::Message_Block::copy failed\n"),0);
- // buf[4] is the byte order.
- int byte_order = buf[4] & 0x1;
- ACE_DEBUG ((LM_DEBUG,"mb len = %d,byte_order=%d\n",mb.length (),byte_order));
- TAO_InputCDR cdr (&mb,byte_order);
- // cdr >>= frame_header;
- cdr.decode (flowProtocol::_tc_fragment,
- &fragment,
- 0,
- ACE_TRY_ENV);
- ACE_TRY_CHECK;
-
- ACE_DEBUG ((LM_DEBUG,"frag number = %d, frag size = %d,source id = %d\n",
- fragment.frag_number,fragment.frag_sz,fragment.source_id));
-
- ACE_Message_Block *data;
- ACE_NEW_RETURN (data,
- ACE_Message_Block(fragment.frag_sz),
- 0);
-
- // Read the fragment.
- n = this->dgram_.recv (data->wr_ptr (),fragment.frag_sz,sender);
- if ((n == -1) || (n==0))
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP::read_fragment:%p",""),0);
- // move past the fragment header.
- data->rd_ptr (this->fragment_len_);
- data->wr_ptr (n);
- ACE_DEBUG ((LM_DEBUG,"length of %dth fragment is: %d\n",
- fragment.frag_number,
- data->length ()));
-
- TAO_SFP_Fragment_Node *new_node;
- ACE_NEW_RETURN (new_node,
- TAO_SFP_Fragment_Node,
- 0);
- new_node->fragment_info_ = fragment;
- new_node->data_ = data;
- if (this->fragment_table_.find (fragment.sequence_num,fragment_entry) == 0)
- {
- // Already an entry exists. Traverse the list and insert it at the right place.
- result = fragment_entry->fragment_set_.insert (*new_node);
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"insert for %dth node failed\n",fragment.frag_number),0);
- // check if all the fragments have been received.
- }
- else
- {
- ACE_NEW_RETURN (fragment_entry,
- TAO_SFP_Fragment_Table_Entry,
- 0);
- fragment_entry->fragment_set_.insert (*new_node);
- // bind a new entry for this sequence number.
- result = this->fragment_table_.bind (fragment.sequence_num,fragment_entry);
- if (result != 0)
- ACE_ERROR_RETURN ((LM_ERROR,"bind for %dth fragment failed\n",
- fragment.frag_number),0);
- }
- if (!(fragment.flags & 0x2))
- {
- ACE_DEBUG ((LM_DEBUG,"Last fragment received\n"));
- // if bit 1 is not set then there are
- // no more fragments.
- fragment_entry->last_received_ = 1;
- // since fragment number starts from 0 to n-1 we add 1.
- fragment_entry->num_fragments_ = fragment.frag_number + 1;
- }
-
- }
- ACE_CATCHANY
- {
- ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,"TAO_SFP::read_fragment");
- return 0;
- }
- ACE_ENDTRY;
- ACE_CHECK_RETURN (0);
- return check_all_fragments (fragment_entry);
-}
-
-ACE_Message_Block*
-TAO_SFP::check_all_fragments (TAO_SFP_Fragment_Table_Entry *fragment_entry)
-{
- ACE_DEBUG ((LM_DEBUG,"table size: %d, num_fragments: %d\n",fragment_entry->fragment_set_.size (),fragment_entry->num_fragments_));
- // check to see if all the frames have been received.
- if (fragment_entry->fragment_set_.size () == fragment_entry->num_fragments_)
- {
- ACE_DEBUG ((LM_DEBUG,"all fragments have been received\n"));
- // all the fragments have been received
- // we can now chain the ACE_Message_Blocks in the fragment_set_ and then return them
- // back.
- ACE_Message_Block *frame = 0,*head = 0;
- FRAGMENT_SET_ITERATOR frag_iterator (fragment_entry->fragment_set_);
- TAO_SFP_Fragment_Node *node;
- for (;frag_iterator.next (node) != 0;frag_iterator.advance ())
- {
-// ACE_Message_Block *block = node->data_;
-// char *buf =block->rd_ptr ();
-// ACE_DEBUG ((LM_DEBUG,"length of buf = %d\n",block->length ()));
-// for (int i=0;i<block->length ();i++)
-// ACE_DEBUG ((LM_DEBUG,"%c ",buf[i]));
-// ACE_DEBUG ((LM_DEBUG,"\n"));
-
- if (!head)
- {
- frame = node->data_;
- head = frame;
- }
- else
- {
- frame->cont (node->data_);
- frame = node->data_;
- }
- }
- return head;
- }
- return 0;
-}
-
-void
-TAO_SFP::dump_buf(char *buffer,int size)
-{
- char *buf = buffer;
- ACE_DEBUG ((LM_DEBUG,"========================================n"));
- for (int i=0;i<size;i++)
- ACE_DEBUG ((LM_DEBUG,"%d ",buf[i]));
- ACE_DEBUG ((LM_DEBUG,"n========================================n"));
-}
-
-#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
-
-template class ACE_DNode<TAO_SFP_Fragment_Node>;
-template class ACE_Equal_To<CORBA::ULong>;
-template class ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node>;
-template class ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node>;
-template class ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Manager_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
-template class ACE_Hash_Map_Entry<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *>;
-template class ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>;
-template class ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>;
-template class ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>;
-#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
-
-#pragma instantiate ACE_DNode<TAO_SFP_Fragment_Node>
-#pragma instantiate ACE_Ordered_MultiSet<TAO_SFP_Fragment_Node>
-#pragma instantiate ACE_Ordered_MultiSet_Iterator<TAO_SFP_Fragment_Node>
-#pragma instantiate ACE_Hash_Map_Manager<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Manager_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Entry<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *>
-#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<CORBA::ULong, TAO_SFP_Fragment_Table_Entry *, ACE_Hash<CORBA::ULong>, ACE_Equal_To<CORBA::ULong>, ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Iterator_Ex<CORBA::ULong,TAO_SFP_Fragment_Table_Entry*,ACE_Hash<CORBA::ULong>,ACE_Equal_To<CORBA::ULong>,ACE_Null_Mutex>
-#pragma instantiate ACE_Equal_To<CORBA::ULong>
-#pragma instantiate ACE_Hash_Map_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>
-#pragma instantiate ACE_Hash_Map_Reverse_Iterator<CORBA::ULong,TAO_SFP_Fragment_Table_Entry *,ACE_Null_Mutex>
-#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */