diff options
author | naga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-13 00:18:21 +0000 |
---|---|---|
committer | naga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 1999-08-13 00:18:21 +0000 |
commit | 28e137d1e3c364665d252b72afa9ff01738ab18f (patch) | |
tree | 0dc4a33ac0b1e73d9d41a1dd525c06afd43b2068 | |
parent | de87efdee5a7e038e7ec13bae9acf84a83ea1753 (diff) | |
download | ATCD-28e137d1e3c364665d252b72afa9ff01738ab18f.tar.gz |
Fixed the Policies and also optimized sfp for the simpleframe case to
use a static message block. Added a role set method in flowspec_entry
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp | 4 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp | 14 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h | 5 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i | 14 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Policy.cpp | 63 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Policy.h | 62 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Policy.i | 36 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp | 4 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h | 6 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/RTCP.cpp | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/RTCP.h | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/RTP.cpp | 6 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/RTP.h | 2 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/Transport.cpp | 17 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/sfp.cpp | 93 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/sfp.h | 11 |
17 files changed, 218 insertions, 125 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp index d4ed30797de..289488e23b8 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp @@ -1534,6 +1534,10 @@ TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char // TAO_Base_StreamEndPoint // ---------------------------------------------------------------------- +TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void) +{ +} + TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void) { } diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h index ed6a1050f3c..7921a38b54f 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h +++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h @@ -478,7 +478,9 @@ class TAO_ORBSVCS_Export TAO_Base_StreamEndPoint // is used to control the stream. It should be subclassed // by applications that want to provide more control features. + // @@Naga: Rename this class to TAO_Base_EndPoint since both stream and flowendpoints derive from it. public: + TAO_Base_StreamEndPoint (void); virtual ~TAO_Base_StreamEndPoint (void); virtual int handle_open (void); diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp index b1e2917f794..07b2a360996 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp @@ -26,7 +26,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void) transport_ (0), handler_ (0), protocol_object_ (0), - is_multicast_ (0) + is_multicast_ (0), + role_ (TAO_AV_INVALID_ROLE) { } @@ -51,7 +52,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, transport_ (0), handler_ (0), protocol_object_ (0), - is_multicast_ (0) + is_multicast_ (0), + role_ (TAO_AV_INVALID_ROLE) { this->set_protocol (); this->set_direction (this->direction_str_); @@ -77,7 +79,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, transport_ (0), handler_ (0), protocol_object_ (0), - is_multicast_ (0) + is_multicast_ (0), + role_ (TAO_AV_INVALID_ROLE) { ACE_CString cstring(this->address_str_,0,0); int colon_pos = cstring.find (':'); @@ -319,6 +322,9 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry) TAO_FlowSpec_Entry::Role TAO_Forward_FlowSpec_Entry::role (void) { + if (this->role_ != TAO_AV_INVALID_ROLE) + return this->role_; + switch (this->direction_) { case TAO_AV_DIR_IN: @@ -429,6 +435,8 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname, TAO_FlowSpec_Entry::Role TAO_Reverse_FlowSpec_Entry::role (void) { + if (this->role_ != TAO_AV_INVALID_ROLE) + return this->role_; switch (this->direction_) { case TAO_AV_DIR_IN: diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h index 3ecff97e0e0..83ab0f9a4c0 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h @@ -101,12 +101,16 @@ public: // accessor to the direction. virtual Role role (void) = 0; + void role (Role role); char * direction_str (void); // string version of direction . char *flow_protocol_str (void); // accesor to the flow protocol string. + void flow_protocol_str (const char *flow_protocol_str); + // set the flow protocol string. + ACE_Addr *address (void); // Address of the carrier protocol. @@ -195,6 +199,7 @@ protected: TAO_AV_Transport *transport_; TAO_AV_Flow_Handler *handler_; TAO_AV_Protocol_Object *protocol_object_; + Role role_; }; class TAO_ORBSVCS_Export TAO_Forward_FlowSpec_Entry diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i index 404e58d1f1c..8522b9a55b4 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i @@ -57,6 +57,13 @@ TAO_FlowSpec_Entry::flow_protocol_str (void) } ACE_INLINE +void +TAO_FlowSpec_Entry::flow_protocol_str (const char *str) +{ + this->flow_protocol_ = CORBA::string_dup (str); +} + +ACE_INLINE TAO_AV_Core::Protocol TAO_FlowSpec_Entry::carrier_protocol (void) { @@ -192,3 +199,10 @@ TAO_FlowSpec_Entry::is_multicast (void) { return this->is_multicast_; } + +ACE_INLINE +void +TAO_FlowSpec_Entry::role (TAO_FlowSpec_Entry::Role role) +{ + this->role_ = role; +} diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.cpp b/TAO/orbsvcs/orbsvcs/AV/Policy.cpp index b024cb15d7a..eb86ba138fa 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Policy.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/Policy.cpp @@ -3,74 +3,38 @@ #include "Policy.h" #include "FlowSpec_Entry.h" -TAO_AV_Policy::TAO_AV_Policy (TAO_AV_Policy::PolicyType type) +TAO_AV_Policy::TAO_AV_Policy (CORBA::ULong type) :type_ (type) { } TAO_AV_SSRC_Policy::TAO_AV_SSRC_Policy (CORBA::ULong ssrc) - :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_SSRC_POLICY), + :TAO_AV_Policy (TAO_AV_SSRC_POLICY), ssrc_ (ssrc) { } TAO_AV_Payload_Type_Policy::TAO_AV_Payload_Type_Policy (int payload_type) - :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY), + :TAO_AV_Policy (TAO_AV_PAYLOAD_TYPE_POLICY), payload_type_ (payload_type) { } // TAO_AV_RTP_Sdes_Policy TAO_AV_RTCP_Sdes_Policy::TAO_AV_RTCP_Sdes_Policy (void) - :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_RTCP_SDES_POLICY) -{ -} - -// TAO_AV_Timestamp_Policy -TAO_AV_Timestamp_Policy::TAO_AV_Timestamp_Policy (void) - :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_TIMESTAMP_POLICY) -{ -} - -TAO_AV_Policy * -TAO_AV_Policy_Manager::create_policy (TAO_AV_Policy::PolicyType type, - void *value) -{ - TAO_AV_Policy *policy = 0; - switch (type) - { - case TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY: - { - int *payload_type = ACE_static_cast (int *,value); - if (payload_type == 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Policy_Manager::create_policy:Invalid value\n"),0); - ACE_NEW_RETURN (policy, - TAO_AV_Payload_Type_Policy (*payload_type), - 0); - } - break; - case TAO_AV_Policy::TAO_AV_SSRC_POLICY: - { - CORBA::ULong *ssrc = ACE_static_cast (CORBA::ULong *,value); - if (ssrc == 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Policy_Manager::create_policy:Invalid value\n"),0); - ACE_NEW_RETURN (policy, - TAO_AV_SSRC_Policy (*ssrc), - 0); - } - break; - default: - break; - } - return policy; + :TAO_AV_Policy (TAO_AV_RTCP_SDES_POLICY) +{ } -// TAO_AV_Callback +TAO_AV_SFP_Credit_Policy::TAO_AV_SFP_Credit_Policy (void) + :TAO_AV_Policy (TAO_AV_SFP_CREDIT_POLICY) +{ +} +// TAO_AV_Callback TAO_AV_Callback::TAO_AV_Callback (void) - // :transport_ (0), :protocol_object_ (0) { } @@ -136,6 +100,13 @@ TAO_AV_Callback::handle_timeout (void *arg) return 0; } +TAO_AV_PolicyList +TAO_AV_Callback::get_policies (void) +{ + TAO_AV_PolicyList list; + return list; +} + // TAO_AV_Transport* // TAO_AV_Callback::transport (void) // { diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.h b/TAO/orbsvcs/orbsvcs/AV/Policy.h index 926f9c711b1..cefec9b2725 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Policy.h +++ b/TAO/orbsvcs/orbsvcs/AV/Policy.h @@ -34,28 +34,25 @@ struct TAO_AV_frame_info CORBA::ULong sequence_num; }; +#define TAO_AV_SSRC_POLICY 100 +#define TAO_AV_PAYLOAD_TYPE_POLICY 101 +#define TAO_AV_TIMEOUT_POLICY 102 +#define TAO_AV_RTCP_SDES_POLICY 103 +#define TAO_AV_SFP_CREDIT_POLICY 104 + +struct TAO_AV_RTCP_Sdes +{ + CORBA::String_var name_; + CORBA::String_var value_; +}; + class TAO_ORBSVCS_Export TAO_AV_Policy { public: - struct sdes - { - CORBA::String_var name_; - CORBA::String_var value_; - }; - - enum PolicyType - { - TAO_AV_SSRC_POLICY, - TAO_AV_PAYLOAD_TYPE_POLICY, - TAO_AV_TIMEOUT_POLICY, - TAO_AV_RTCP_SDES_POLICY, - TAO_AV_TIMESTAMP_POLICY - }; - - TAO_AV_Policy (PolicyType type); - PolicyType type (void); + TAO_AV_Policy (CORBA::ULong type); + CORBA::ULong type (void); protected: - PolicyType type_; + CORBA::ULong type_; }; class TAO_AV_SSRC_Policy @@ -86,31 +83,23 @@ class TAO_AV_RTCP_Sdes_Policy { public: TAO_AV_RTCP_Sdes_Policy (void); - TAO_AV_Policy::sdes &value (void); - void value (const TAO_AV_Policy::sdes& sdes_val); + TAO_AV_RTCP_Sdes &value (void); + void value (const TAO_AV_RTCP_Sdes& sdes_val); protected: - TAO_AV_Policy::sdes sdes_; + TAO_AV_RTCP_Sdes sdes_; }; -class TAO_AV_Timestamp_Policy - :public TAO_AV_Policy +class TAO_AV_SFP_Credit_Policy : public TAO_AV_Policy { public: - TAO_AV_Timestamp_Policy (void); - ACE_UINT32 value (void); - void value (ACE_UINT32 timestamp); + TAO_AV_SFP_Credit_Policy (void); + int value (void); + void value (int val); protected: - ACE_UINT32 timestamp_; + int value_; }; -typedef TAO_Unbounded_Sequence<TAO_AV_Policy*> PolicyList; - -class TAO_AV_Policy_Manager -{ -public: - TAO_AV_Policy *create_policy (TAO_AV_Policy::PolicyType type, - void *value); -}; +typedef TAO_Unbounded_Sequence<TAO_AV_Policy*> TAO_AV_PolicyList; class TAO_AV_Protocol_Object; class TAO_AV_Transport; @@ -160,6 +149,9 @@ public: TAO_AV_Protocol_Object *protocol_object (void); // Accessor to protocol object. + + virtual TAO_AV_PolicyList get_policies (void); + // get the policies for the protocol object. protected: TAO_AV_Protocol_Object *protocol_object_; TAO_AV_Flow_Handler *handler_; diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.i b/TAO/orbsvcs/orbsvcs/AV/Policy.i index e2dd0de51be..0994186e6ed 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Policy.i +++ b/TAO/orbsvcs/orbsvcs/AV/Policy.i @@ -4,7 +4,7 @@ // TAO_AV_Policy //-------------------------------------------------- -ACE_INLINE TAO_AV_Policy::PolicyType +ACE_INLINE CORBA::ULong TAO_AV_Policy::type (void) { return this->type_; @@ -42,28 +42,34 @@ TAO_AV_Payload_Type_Policy::value (void) return this->payload_type_; } -// TAO_AV_Timestamp_Policy -ACE_INLINE void -TAO_AV_Timestamp_Policy::value (ACE_UINT32 timestamp) -{ - this->timestamp_ = timestamp; -} - -ACE_INLINE ACE_UINT32 -TAO_AV_Timestamp_Policy::value (void) -{ - return this->timestamp_; -} // TAO_AV_RTCP_Sdes_Policy -ACE_INLINE TAO_AV_Policy::sdes & +ACE_INLINE TAO_AV_RTCP_Sdes & TAO_AV_RTCP_Sdes_Policy::value (void) { return this->sdes_; } ACE_INLINE void -TAO_AV_RTCP_Sdes_Policy::value (const TAO_AV_Policy::sdes &sdes_val) +TAO_AV_RTCP_Sdes_Policy::value (const TAO_AV_RTCP_Sdes &sdes_val) { this->sdes_ = sdes_val; } + +//---------------------------------------------------------------------- +// TAO_AV_SFP_Credit_Policy +//---------------------------------------------------------------------- + +ACE_INLINE +void +TAO_AV_SFP_Credit_Policy::value (int credit) +{ + this->value_ = credit; +} + +ACE_INLINE +int +TAO_AV_SFP_Credit_Policy::value (void) +{ + return this->value_; +} diff --git a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp index 5314a9c4f43..ea6bf89ddda 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp @@ -110,13 +110,13 @@ TAO_AV_Protocol_Object::stop (void) } int -TAO_AV_Protocol_Object::set_policies (const PolicyList &policy_list) +TAO_AV_Protocol_Object::set_policies (const TAO_AV_PolicyList &policy_list) { this->policy_list_ = policy_list; return 0; } -PolicyList +TAO_AV_PolicyList TAO_AV_Protocol_Object::get_policies (void) { return this->policy_list_; diff --git a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h index 927b3110d5c..53c22530af8 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h +++ b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h @@ -55,8 +55,8 @@ public: const ACE_Addr &peer_address); // Called on a control object. - virtual int set_policies (const PolicyList &policy_list); - virtual PolicyList get_policies (void); + virtual int set_policies (const TAO_AV_PolicyList &policy_list); + virtual TAO_AV_PolicyList get_policies (void); // set/get policies. virtual int start (void); @@ -78,7 +78,7 @@ public: TAO_AV_Transport *transport (void); protected: TAO_AV_Transport *transport_; - PolicyList policy_list_; + TAO_AV_PolicyList policy_list_; TAO_AV_Callback *callback_; }; diff --git a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp index 5ac27761e45..fa2469891fd 100644 --- a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp @@ -664,7 +664,7 @@ TAO_AV_RTCP_Object::destroy (void) } int -TAO_AV_RTCP_Object::set_policies (const PolicyList &policy_list) +TAO_AV_RTCP_Object::set_policies (const TAO_AV_PolicyList &policy_list) { return -1; } diff --git a/TAO/orbsvcs/orbsvcs/AV/RTCP.h b/TAO/orbsvcs/orbsvcs/AV/RTCP.h index a5f57663280..76aea711e8e 100644 --- a/TAO/orbsvcs/orbsvcs/AV/RTCP.h +++ b/TAO/orbsvcs/orbsvcs/AV/RTCP.h @@ -202,7 +202,7 @@ public: virtual int handle_control_input (ACE_Message_Block *frame, const ACE_Addr &peer_address); - virtual int set_policies (const PolicyList &policy_list); + virtual int set_policies (const TAO_AV_PolicyList &policy_list); // set/get policies. virtual int start (void); diff --git a/TAO/orbsvcs/orbsvcs/AV/RTP.cpp b/TAO/orbsvcs/orbsvcs/AV/RTP.cpp index df429cd40b4..eff4967d050 100644 --- a/TAO/orbsvcs/orbsvcs/AV/RTP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/RTP.cpp @@ -236,7 +236,7 @@ TAO_AV_RTP_Object::destroy (void) } int -TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list) +TAO_AV_RTP_Object::set_policies (const TAO_AV_PolicyList &policy_list) { this->policy_list_ = policy_list; u_int num_policies = this->policy_list_.length (); @@ -246,7 +246,7 @@ TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list) policy = this->policy_list_ [i]; switch (policy->type ()) { - case TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY: + case TAO_AV_PAYLOAD_TYPE_POLICY: { TAO_AV_Payload_Type_Policy *payload_policy = ACE_static_cast (TAO_AV_Payload_Type_Policy *,policy); @@ -255,7 +255,7 @@ TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list) this->format_ = payload_policy->value (); } break; - case TAO_AV_Policy::TAO_AV_SSRC_POLICY: + case TAO_AV_SSRC_POLICY: { TAO_AV_SSRC_Policy *ssrc_policy = ACE_static_cast (TAO_AV_SSRC_Policy *,policy); diff --git a/TAO/orbsvcs/orbsvcs/AV/RTP.h b/TAO/orbsvcs/orbsvcs/AV/RTP.h index 2f77b73106c..7ab37bcd5e0 100644 --- a/TAO/orbsvcs/orbsvcs/AV/RTP.h +++ b/TAO/orbsvcs/orbsvcs/AV/RTP.h @@ -265,7 +265,7 @@ public: TAO_AV_frame_info *frame_info = 0); virtual int destroy (void); - virtual int set_policies (const PolicyList &policy_list); + virtual int set_policies (const TAO_AV_PolicyList &policy_list); virtual void control_object (TAO_AV_Protocol_Object *object); protected: ACE_UINT16 sequence_num_; diff --git a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp index d430ca36d57..ecb920e6ff3 100644 --- a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp @@ -67,6 +67,23 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, start != end; ++start) { TAO_FlowSpec_Entry *entry = (*start); + switch (direction) + { + case TAO_AV_Core::TAO_AV_ENDPOINT_B: + { + switch (entry->direction ()) + { + case TAO_FlowSpec_Entry::TAO_AV_DIR_IN: + entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER); + break; + case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT: + entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER); + break; + } + } + default: + break; + } ACE_Addr *address = entry->address (); if (address != 0) { diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp index 0cf1b7a5831..f98cbe26de7 100644 --- a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp @@ -202,16 +202,19 @@ TAO_SFP_Base::read_frame (TAO_AV_Transport *transport, int byte_order = frame_header.flags & 0x1; int message_len = frame_header.message_size; - ACE_NEW_RETURN (message_block, - ACE_Message_Block (message_len), - 0); - int n = transport->recv (message_block->wr_ptr (),message_len); +// ACE_NEW_RETURN (message_block, +// ACE_Message_Block (message_len), +// 0); + state.static_frame_.rd_ptr (state.static_frame_.base ()); + state.static_frame_.wr_ptr (state.static_frame_.base ()); + int n = transport->recv (state.static_frame_.rd_ptr (),message_len); if (n == -1) ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0); else if (n==0) ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0); else if (n != message_len) ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0); + message_block = &state.static_frame_; // print the buffer. // this->dump_buf (message,n); // skip over the frame header. @@ -252,7 +255,7 @@ TAO_SFP_Base::read_frame (TAO_AV_Transport *transport, } case flowProtocol::SimpleFrame_Msg: { - data = message_block; + data = message_block->clone (); break; } case flowProtocol::SequencedFrame_Msg: @@ -996,9 +999,11 @@ TAO_SFP_Object::TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport) :TAO_AV_Protocol_Object (callback,transport), source_id_ (10), - max_credit_ (20) - + max_credit_ (-1), + current_credit_ (-1) { + TAO_SFP_BASE::instance (); + this->state_.static_frame_.size (2* this->transport_->mtu ()); } TAO_SFP_Object::~TAO_SFP_Object (void) @@ -1020,6 +1025,7 @@ TAO_SFP_Object::destroy (void) out_stream); if (result < 0) return result; + this->callback_->handle_destroy (); return 0; } @@ -1033,7 +1039,7 @@ TAO_SFP_Object::send_frame (ACE_Message_Block *frame, CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER; if (this->transport_ == 0) ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1); - if (this->current_credit_ > 0) + if (this->current_credit_ != 0) { // if we have enough credit then we send. int total_length = 0; @@ -1167,7 +1173,8 @@ TAO_SFP_Object::send_frame (ACE_Message_Block *frame, // Increment the sequence_num after sending the message. this->sequence_num_++; // Also reduce the number of credits. - this->current_credit_--; + if (this->max_credit_ > 0) + this->current_credit_--; } } else @@ -1230,11 +1237,48 @@ TAO_SFP_Object::get_fragment (ACE_Message_Block *&mb, return fragment_mb; } +int +TAO_SFP_Object::set_policies (const TAO_AV_PolicyList& policies) +{ + TAO_AV_Policy *policy = 0; + for (u_int i=0;i<policies.length ();i++) + { + policy = policies[i]; + switch (policies[i]->type ()) + { + + case TAO_AV_SFP_CREDIT_POLICY: + { + TAO_AV_SFP_Credit_Policy *credit_policy = + ACE_dynamic_cast (TAO_AV_SFP_Credit_Policy*,policy); + this->max_credit_ = credit_policy->value (); + } + default: + break; + } + } + return 0; +} + // TAO_SFP_Consumer_Object TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object (TAO_AV_Callback *callback, - TAO_AV_Transport *transport) + TAO_AV_Transport *transport, + char *&sfp_options) :TAO_SFP_Object (callback,transport) { + TAO_AV_PolicyList policies = callback->get_policies (); + if (policies.length () == 0) + return; + this->set_policies (policies); + if (this->max_credit_ > 0) + { + ACE_NEW (sfp_options, + char [BUFSIZ]); + + ACE_OS::sprintf (sfp_options, + "sfp:1.0:credit=%d", + max_credit_); + } } int @@ -1254,17 +1298,37 @@ TAO_SFP_Consumer_Object::handle_input (void) { this->callback_->receive_frame (this->state_.frame_block_, frame_info); + // Now release the memory for the frame. + if (this->state_.frame_block_ != &this->state_.static_frame_) + { + ACE_Message_Block *temp = 0; + for (temp = this->state_.frame_block_; + temp != 0; + temp = temp->cont ()) + { + temp->release (); + delete temp; + } + } this->state_.reset (); } return 0; } TAO_SFP_Producer_Object::TAO_SFP_Producer_Object (TAO_AV_Callback *callback, - TAO_AV_Transport *transport) + TAO_AV_Transport *transport, + char *&sfp_options) :TAO_SFP_Object (callback,transport), credit_sequence_num_ (0) { + TAO_Tokenizer flow_string (sfp_options,':'); + if (flow_string [2] != 0) + { + TAO_Tokenizer options (flow_string[2],'='); + if (options [1] != 0) + this->max_credit_ = ACE_OS::atoi (options[1]); + } } int @@ -1347,13 +1411,15 @@ TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, TAO_AV_Callback *callback = 0; endpoint->get_callback (entry->flowname (), callback); + char *flow_string = entry->flow_protocol_str (); switch (entry->role ()) { case TAO_FlowSpec_Entry::TAO_AV_PRODUCER: { ACE_NEW_RETURN (object, TAO_SFP_Producer_Object (callback, - transport), + transport, + flow_string), 0); } break; @@ -1361,7 +1427,8 @@ TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, { ACE_NEW_RETURN (object, TAO_SFP_Consumer_Object (callback, - transport), + transport, + flow_string), 0); } break; diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.h b/TAO/orbsvcs/orbsvcs/AV/sfp.h index ea552319066..cbbc8365e22 100644 --- a/TAO/orbsvcs/orbsvcs/AV/sfp.h +++ b/TAO/orbsvcs/orbsvcs/AV/sfp.h @@ -79,6 +79,7 @@ public: CORBA::Boolean more_fragments_; ACE_Message_Block *frame_block_; // boolean flags indicating that there are more fragments. + ACE_Message_Block static_frame_; TAO_SFP_Fragment_Table_Map fragment_table_map_; }; @@ -190,6 +191,7 @@ protected: // dumps the buffer to the screen. }; +// Beware the SFP_Base code relies on the Singleton being initialized. typedef ACE_Singleton <TAO_SFP_Base,ACE_SYNCH_MUTEX> TAO_SFP_BASE; class TAO_ORBSVCS_Export TAO_SFP_Object : public TAO_AV_Protocol_Object @@ -197,6 +199,7 @@ class TAO_ORBSVCS_Export TAO_SFP_Object : public TAO_AV_Protocol_Object public: TAO_SFP_Object (TAO_AV_Callback *callback, TAO_AV_Transport *transport); + // We should add a sfp options parameter. virtual ~TAO_SFP_Object (void); // Dtor @@ -210,6 +213,8 @@ public: TAO_AV_frame_info *frame_info = 0); virtual int destroy (void); + virtual int set_policies (const TAO_AV_PolicyList &policies); + protected: ACE_Message_Block *get_fragment (ACE_Message_Block *&frame, size_t initial_len, @@ -226,7 +231,8 @@ class TAO_ORBSVCS_Export TAO_SFP_Producer_Object : public TAO_SFP_Object { public: TAO_SFP_Producer_Object (TAO_AV_Callback *callback, - TAO_AV_Transport *transport); + TAO_AV_Transport *transport, + char *&flow_options); virtual int handle_input (void); protected: CORBA::ULong credit_sequence_num_; @@ -236,7 +242,8 @@ class TAO_ORBSVCS_Export TAO_SFP_Consumer_Object : public TAO_SFP_Object { public: TAO_SFP_Consumer_Object (TAO_AV_Callback *callback, - TAO_AV_Transport *transport); + TAO_AV_Transport *transport, + char *&flow_options); virtual int handle_input (void); }; |