summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-13 00:18:21 +0000
committernaga <naga@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>1999-08-13 00:18:21 +0000
commit28e137d1e3c364665d252b72afa9ff01738ab18f (patch)
tree0dc4a33ac0b1e73d9d41a1dd525c06afd43b2068
parentde87efdee5a7e038e7ec13bae9acf84a83ea1753 (diff)
downloadATCD-28e137d1e3c364665d252b72afa9ff01738ab18f.tar.gz
Fixed the Policies and also optimized sfp for the simpleframe case to
use a static message block. Added a role set method in flowspec_entry
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp14
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i14
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Policy.cpp63
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Policy.h62
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Policy.i36
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp4
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h6
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/RTCP.cpp2
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/RTCP.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/RTP.cpp6
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/RTP.h2
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/Transport.cpp17
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/sfp.cpp93
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/sfp.h11
17 files changed, 218 insertions, 125 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
index d4ed30797de..289488e23b8 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.cpp
@@ -1534,6 +1534,10 @@ TAO_MCastConfigIf::in_flowSpec (const AVStreams::flowSpec& flow_spec, const char
// TAO_Base_StreamEndPoint
// ----------------------------------------------------------------------
+TAO_Base_StreamEndPoint::TAO_Base_StreamEndPoint (void)
+{
+}
+
TAO_Base_StreamEndPoint::~TAO_Base_StreamEndPoint (void)
{
}
diff --git a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
index ed6a1050f3c..7921a38b54f 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
+++ b/TAO/orbsvcs/orbsvcs/AV/AVStreams_i.h
@@ -478,7 +478,9 @@ class TAO_ORBSVCS_Export TAO_Base_StreamEndPoint
// is used to control the stream. It should be subclassed
// by applications that want to provide more control features.
+ // @@Naga: Rename this class to TAO_Base_EndPoint since both stream and flowendpoints derive from it.
public:
+ TAO_Base_StreamEndPoint (void);
virtual ~TAO_Base_StreamEndPoint (void);
virtual int handle_open (void);
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
index b1e2917f794..07b2a360996 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
@@ -26,7 +26,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void)
transport_ (0),
handler_ (0),
protocol_object_ (0),
- is_multicast_ (0)
+ is_multicast_ (0),
+ role_ (TAO_AV_INVALID_ROLE)
{
}
@@ -51,7 +52,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
transport_ (0),
handler_ (0),
protocol_object_ (0),
- is_multicast_ (0)
+ is_multicast_ (0),
+ role_ (TAO_AV_INVALID_ROLE)
{
this->set_protocol ();
this->set_direction (this->direction_str_);
@@ -77,7 +79,8 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
transport_ (0),
handler_ (0),
protocol_object_ (0),
- is_multicast_ (0)
+ is_multicast_ (0),
+ role_ (TAO_AV_INVALID_ROLE)
{
ACE_CString cstring(this->address_str_,0,0);
int colon_pos = cstring.find (':');
@@ -319,6 +322,9 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry)
TAO_FlowSpec_Entry::Role
TAO_Forward_FlowSpec_Entry::role (void)
{
+ if (this->role_ != TAO_AV_INVALID_ROLE)
+ return this->role_;
+
switch (this->direction_)
{
case TAO_AV_DIR_IN:
@@ -429,6 +435,8 @@ TAO_Reverse_FlowSpec_Entry::TAO_Reverse_FlowSpec_Entry (const char *flowname,
TAO_FlowSpec_Entry::Role
TAO_Reverse_FlowSpec_Entry::role (void)
{
+ if (this->role_ != TAO_AV_INVALID_ROLE)
+ return this->role_;
switch (this->direction_)
{
case TAO_AV_DIR_IN:
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
index 3ecff97e0e0..83ab0f9a4c0 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
@@ -101,12 +101,16 @@ public:
// accessor to the direction.
virtual Role role (void) = 0;
+ void role (Role role);
char * direction_str (void);
// string version of direction .
char *flow_protocol_str (void);
// accesor to the flow protocol string.
+ void flow_protocol_str (const char *flow_protocol_str);
+ // set the flow protocol string.
+
ACE_Addr *address (void);
// Address of the carrier protocol.
@@ -195,6 +199,7 @@ protected:
TAO_AV_Transport *transport_;
TAO_AV_Flow_Handler *handler_;
TAO_AV_Protocol_Object *protocol_object_;
+ Role role_;
};
class TAO_ORBSVCS_Export TAO_Forward_FlowSpec_Entry
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
index 404e58d1f1c..8522b9a55b4 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
@@ -57,6 +57,13 @@ TAO_FlowSpec_Entry::flow_protocol_str (void)
}
ACE_INLINE
+void
+TAO_FlowSpec_Entry::flow_protocol_str (const char *str)
+{
+ this->flow_protocol_ = CORBA::string_dup (str);
+}
+
+ACE_INLINE
TAO_AV_Core::Protocol
TAO_FlowSpec_Entry::carrier_protocol (void)
{
@@ -192,3 +199,10 @@ TAO_FlowSpec_Entry::is_multicast (void)
{
return this->is_multicast_;
}
+
+ACE_INLINE
+void
+TAO_FlowSpec_Entry::role (TAO_FlowSpec_Entry::Role role)
+{
+ this->role_ = role;
+}
diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.cpp b/TAO/orbsvcs/orbsvcs/AV/Policy.cpp
index b024cb15d7a..eb86ba138fa 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Policy.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/Policy.cpp
@@ -3,74 +3,38 @@
#include "Policy.h"
#include "FlowSpec_Entry.h"
-TAO_AV_Policy::TAO_AV_Policy (TAO_AV_Policy::PolicyType type)
+TAO_AV_Policy::TAO_AV_Policy (CORBA::ULong type)
:type_ (type)
{
}
TAO_AV_SSRC_Policy::TAO_AV_SSRC_Policy (CORBA::ULong ssrc)
- :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_SSRC_POLICY),
+ :TAO_AV_Policy (TAO_AV_SSRC_POLICY),
ssrc_ (ssrc)
{
}
TAO_AV_Payload_Type_Policy::TAO_AV_Payload_Type_Policy (int payload_type)
- :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY),
+ :TAO_AV_Policy (TAO_AV_PAYLOAD_TYPE_POLICY),
payload_type_ (payload_type)
{
}
// TAO_AV_RTP_Sdes_Policy
TAO_AV_RTCP_Sdes_Policy::TAO_AV_RTCP_Sdes_Policy (void)
- :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_RTCP_SDES_POLICY)
-{
-}
-
-// TAO_AV_Timestamp_Policy
-TAO_AV_Timestamp_Policy::TAO_AV_Timestamp_Policy (void)
- :TAO_AV_Policy (TAO_AV_Policy::TAO_AV_TIMESTAMP_POLICY)
-{
-}
-
-TAO_AV_Policy *
-TAO_AV_Policy_Manager::create_policy (TAO_AV_Policy::PolicyType type,
- void *value)
-{
- TAO_AV_Policy *policy = 0;
- switch (type)
- {
- case TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY:
- {
- int *payload_type = ACE_static_cast (int *,value);
- if (payload_type == 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Policy_Manager::create_policy:Invalid value\n"),0);
- ACE_NEW_RETURN (policy,
- TAO_AV_Payload_Type_Policy (*payload_type),
- 0);
- }
- break;
- case TAO_AV_Policy::TAO_AV_SSRC_POLICY:
- {
- CORBA::ULong *ssrc = ACE_static_cast (CORBA::ULong *,value);
- if (ssrc == 0)
- ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_Policy_Manager::create_policy:Invalid value\n"),0);
- ACE_NEW_RETURN (policy,
- TAO_AV_SSRC_Policy (*ssrc),
- 0);
- }
- break;
- default:
- break;
- }
- return policy;
+ :TAO_AV_Policy (TAO_AV_RTCP_SDES_POLICY)
+{
}
-// TAO_AV_Callback
+TAO_AV_SFP_Credit_Policy::TAO_AV_SFP_Credit_Policy (void)
+ :TAO_AV_Policy (TAO_AV_SFP_CREDIT_POLICY)
+{
+}
+// TAO_AV_Callback
TAO_AV_Callback::TAO_AV_Callback (void)
- // :transport_ (0),
:protocol_object_ (0)
{
}
@@ -136,6 +100,13 @@ TAO_AV_Callback::handle_timeout (void *arg)
return 0;
}
+TAO_AV_PolicyList
+TAO_AV_Callback::get_policies (void)
+{
+ TAO_AV_PolicyList list;
+ return list;
+}
+
// TAO_AV_Transport*
// TAO_AV_Callback::transport (void)
// {
diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.h b/TAO/orbsvcs/orbsvcs/AV/Policy.h
index 926f9c711b1..cefec9b2725 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Policy.h
+++ b/TAO/orbsvcs/orbsvcs/AV/Policy.h
@@ -34,28 +34,25 @@ struct TAO_AV_frame_info
CORBA::ULong sequence_num;
};
+#define TAO_AV_SSRC_POLICY 100
+#define TAO_AV_PAYLOAD_TYPE_POLICY 101
+#define TAO_AV_TIMEOUT_POLICY 102
+#define TAO_AV_RTCP_SDES_POLICY 103
+#define TAO_AV_SFP_CREDIT_POLICY 104
+
+struct TAO_AV_RTCP_Sdes
+{
+ CORBA::String_var name_;
+ CORBA::String_var value_;
+};
+
class TAO_ORBSVCS_Export TAO_AV_Policy
{
public:
- struct sdes
- {
- CORBA::String_var name_;
- CORBA::String_var value_;
- };
-
- enum PolicyType
- {
- TAO_AV_SSRC_POLICY,
- TAO_AV_PAYLOAD_TYPE_POLICY,
- TAO_AV_TIMEOUT_POLICY,
- TAO_AV_RTCP_SDES_POLICY,
- TAO_AV_TIMESTAMP_POLICY
- };
-
- TAO_AV_Policy (PolicyType type);
- PolicyType type (void);
+ TAO_AV_Policy (CORBA::ULong type);
+ CORBA::ULong type (void);
protected:
- PolicyType type_;
+ CORBA::ULong type_;
};
class TAO_AV_SSRC_Policy
@@ -86,31 +83,23 @@ class TAO_AV_RTCP_Sdes_Policy
{
public:
TAO_AV_RTCP_Sdes_Policy (void);
- TAO_AV_Policy::sdes &value (void);
- void value (const TAO_AV_Policy::sdes& sdes_val);
+ TAO_AV_RTCP_Sdes &value (void);
+ void value (const TAO_AV_RTCP_Sdes& sdes_val);
protected:
- TAO_AV_Policy::sdes sdes_;
+ TAO_AV_RTCP_Sdes sdes_;
};
-class TAO_AV_Timestamp_Policy
- :public TAO_AV_Policy
+class TAO_AV_SFP_Credit_Policy : public TAO_AV_Policy
{
public:
- TAO_AV_Timestamp_Policy (void);
- ACE_UINT32 value (void);
- void value (ACE_UINT32 timestamp);
+ TAO_AV_SFP_Credit_Policy (void);
+ int value (void);
+ void value (int val);
protected:
- ACE_UINT32 timestamp_;
+ int value_;
};
-typedef TAO_Unbounded_Sequence<TAO_AV_Policy*> PolicyList;
-
-class TAO_AV_Policy_Manager
-{
-public:
- TAO_AV_Policy *create_policy (TAO_AV_Policy::PolicyType type,
- void *value);
-};
+typedef TAO_Unbounded_Sequence<TAO_AV_Policy*> TAO_AV_PolicyList;
class TAO_AV_Protocol_Object;
class TAO_AV_Transport;
@@ -160,6 +149,9 @@ public:
TAO_AV_Protocol_Object *protocol_object (void);
// Accessor to protocol object.
+
+ virtual TAO_AV_PolicyList get_policies (void);
+ // get the policies for the protocol object.
protected:
TAO_AV_Protocol_Object *protocol_object_;
TAO_AV_Flow_Handler *handler_;
diff --git a/TAO/orbsvcs/orbsvcs/AV/Policy.i b/TAO/orbsvcs/orbsvcs/AV/Policy.i
index e2dd0de51be..0994186e6ed 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Policy.i
+++ b/TAO/orbsvcs/orbsvcs/AV/Policy.i
@@ -4,7 +4,7 @@
// TAO_AV_Policy
//--------------------------------------------------
-ACE_INLINE TAO_AV_Policy::PolicyType
+ACE_INLINE CORBA::ULong
TAO_AV_Policy::type (void)
{
return this->type_;
@@ -42,28 +42,34 @@ TAO_AV_Payload_Type_Policy::value (void)
return this->payload_type_;
}
-// TAO_AV_Timestamp_Policy
-ACE_INLINE void
-TAO_AV_Timestamp_Policy::value (ACE_UINT32 timestamp)
-{
- this->timestamp_ = timestamp;
-}
-
-ACE_INLINE ACE_UINT32
-TAO_AV_Timestamp_Policy::value (void)
-{
- return this->timestamp_;
-}
// TAO_AV_RTCP_Sdes_Policy
-ACE_INLINE TAO_AV_Policy::sdes &
+ACE_INLINE TAO_AV_RTCP_Sdes &
TAO_AV_RTCP_Sdes_Policy::value (void)
{
return this->sdes_;
}
ACE_INLINE void
-TAO_AV_RTCP_Sdes_Policy::value (const TAO_AV_Policy::sdes &sdes_val)
+TAO_AV_RTCP_Sdes_Policy::value (const TAO_AV_RTCP_Sdes &sdes_val)
{
this->sdes_ = sdes_val;
}
+
+//----------------------------------------------------------------------
+// TAO_AV_SFP_Credit_Policy
+//----------------------------------------------------------------------
+
+ACE_INLINE
+void
+TAO_AV_SFP_Credit_Policy::value (int credit)
+{
+ this->value_ = credit;
+}
+
+ACE_INLINE
+int
+TAO_AV_SFP_Credit_Policy::value (void)
+{
+ return this->value_;
+}
diff --git a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp
index 5314a9c4f43..ea6bf89ddda 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.cpp
@@ -110,13 +110,13 @@ TAO_AV_Protocol_Object::stop (void)
}
int
-TAO_AV_Protocol_Object::set_policies (const PolicyList &policy_list)
+TAO_AV_Protocol_Object::set_policies (const TAO_AV_PolicyList &policy_list)
{
this->policy_list_ = policy_list;
return 0;
}
-PolicyList
+TAO_AV_PolicyList
TAO_AV_Protocol_Object::get_policies (void)
{
return this->policy_list_;
diff --git a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h
index 927b3110d5c..53c22530af8 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h
+++ b/TAO/orbsvcs/orbsvcs/AV/Protocol_Factory.h
@@ -55,8 +55,8 @@ public:
const ACE_Addr &peer_address);
// Called on a control object.
- virtual int set_policies (const PolicyList &policy_list);
- virtual PolicyList get_policies (void);
+ virtual int set_policies (const TAO_AV_PolicyList &policy_list);
+ virtual TAO_AV_PolicyList get_policies (void);
// set/get policies.
virtual int start (void);
@@ -78,7 +78,7 @@ public:
TAO_AV_Transport *transport (void);
protected:
TAO_AV_Transport *transport_;
- PolicyList policy_list_;
+ TAO_AV_PolicyList policy_list_;
TAO_AV_Callback *callback_;
};
diff --git a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp
index 5ac27761e45..fa2469891fd 100644
--- a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp
@@ -664,7 +664,7 @@ TAO_AV_RTCP_Object::destroy (void)
}
int
-TAO_AV_RTCP_Object::set_policies (const PolicyList &policy_list)
+TAO_AV_RTCP_Object::set_policies (const TAO_AV_PolicyList &policy_list)
{
return -1;
}
diff --git a/TAO/orbsvcs/orbsvcs/AV/RTCP.h b/TAO/orbsvcs/orbsvcs/AV/RTCP.h
index a5f57663280..76aea711e8e 100644
--- a/TAO/orbsvcs/orbsvcs/AV/RTCP.h
+++ b/TAO/orbsvcs/orbsvcs/AV/RTCP.h
@@ -202,7 +202,7 @@ public:
virtual int handle_control_input (ACE_Message_Block *frame,
const ACE_Addr &peer_address);
- virtual int set_policies (const PolicyList &policy_list);
+ virtual int set_policies (const TAO_AV_PolicyList &policy_list);
// set/get policies.
virtual int start (void);
diff --git a/TAO/orbsvcs/orbsvcs/AV/RTP.cpp b/TAO/orbsvcs/orbsvcs/AV/RTP.cpp
index df429cd40b4..eff4967d050 100644
--- a/TAO/orbsvcs/orbsvcs/AV/RTP.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/RTP.cpp
@@ -236,7 +236,7 @@ TAO_AV_RTP_Object::destroy (void)
}
int
-TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list)
+TAO_AV_RTP_Object::set_policies (const TAO_AV_PolicyList &policy_list)
{
this->policy_list_ = policy_list;
u_int num_policies = this->policy_list_.length ();
@@ -246,7 +246,7 @@ TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list)
policy = this->policy_list_ [i];
switch (policy->type ())
{
- case TAO_AV_Policy::TAO_AV_PAYLOAD_TYPE_POLICY:
+ case TAO_AV_PAYLOAD_TYPE_POLICY:
{
TAO_AV_Payload_Type_Policy *payload_policy =
ACE_static_cast (TAO_AV_Payload_Type_Policy *,policy);
@@ -255,7 +255,7 @@ TAO_AV_RTP_Object::set_policies (const PolicyList &policy_list)
this->format_ = payload_policy->value ();
}
break;
- case TAO_AV_Policy::TAO_AV_SSRC_POLICY:
+ case TAO_AV_SSRC_POLICY:
{
TAO_AV_SSRC_Policy *ssrc_policy =
ACE_static_cast (TAO_AV_SSRC_Policy *,policy);
diff --git a/TAO/orbsvcs/orbsvcs/AV/RTP.h b/TAO/orbsvcs/orbsvcs/AV/RTP.h
index 2f77b73106c..7ab37bcd5e0 100644
--- a/TAO/orbsvcs/orbsvcs/AV/RTP.h
+++ b/TAO/orbsvcs/orbsvcs/AV/RTP.h
@@ -265,7 +265,7 @@ public:
TAO_AV_frame_info *frame_info = 0);
virtual int destroy (void);
- virtual int set_policies (const PolicyList &policy_list);
+ virtual int set_policies (const TAO_AV_PolicyList &policy_list);
virtual void control_object (TAO_AV_Protocol_Object *object);
protected:
ACE_UINT16 sequence_num_;
diff --git a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
index d430ca36d57..ecb920e6ff3 100644
--- a/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/Transport.cpp
@@ -67,6 +67,23 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
start != end; ++start)
{
TAO_FlowSpec_Entry *entry = (*start);
+ switch (direction)
+ {
+ case TAO_AV_Core::TAO_AV_ENDPOINT_B:
+ {
+ switch (entry->direction ())
+ {
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_IN:
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_CONSUMER);
+ break;
+ case TAO_FlowSpec_Entry::TAO_AV_DIR_OUT:
+ entry->role (TAO_FlowSpec_Entry::TAO_AV_PRODUCER);
+ break;
+ }
+ }
+ default:
+ break;
+ }
ACE_Addr *address = entry->address ();
if (address != 0)
{
diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
index 0cf1b7a5831..f98cbe26de7 100644
--- a/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/sfp.cpp
@@ -202,16 +202,19 @@ TAO_SFP_Base::read_frame (TAO_AV_Transport *transport,
int byte_order = frame_header.flags & 0x1;
int message_len = frame_header.message_size;
- ACE_NEW_RETURN (message_block,
- ACE_Message_Block (message_len),
- 0);
- int n = transport->recv (message_block->wr_ptr (),message_len);
+// ACE_NEW_RETURN (message_block,
+// ACE_Message_Block (message_len),
+// 0);
+ state.static_frame_.rd_ptr (state.static_frame_.base ());
+ state.static_frame_.wr_ptr (state.static_frame_.base ());
+ int n = transport->recv (state.static_frame_.rd_ptr (),message_len);
if (n == -1)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
else if (n==0)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::handle_input -peek"),0);
else if (n != message_len)
ACE_ERROR_RETURN ((LM_ERROR,"SFP::read_simple_frame:message truncated\n"),0);
+ message_block = &state.static_frame_;
// print the buffer.
// this->dump_buf (message,n);
// skip over the frame header.
@@ -252,7 +255,7 @@ TAO_SFP_Base::read_frame (TAO_AV_Transport *transport,
}
case flowProtocol::SimpleFrame_Msg:
{
- data = message_block;
+ data = message_block->clone ();
break;
}
case flowProtocol::SequencedFrame_Msg:
@@ -996,9 +999,11 @@ TAO_SFP_Object::TAO_SFP_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport)
:TAO_AV_Protocol_Object (callback,transport),
source_id_ (10),
- max_credit_ (20)
-
+ max_credit_ (-1),
+ current_credit_ (-1)
{
+ TAO_SFP_BASE::instance ();
+ this->state_.static_frame_.size (2* this->transport_->mtu ());
}
TAO_SFP_Object::~TAO_SFP_Object (void)
@@ -1020,6 +1025,7 @@ TAO_SFP_Object::destroy (void)
out_stream);
if (result < 0)
return result;
+ this->callback_->handle_destroy ();
return 0;
}
@@ -1033,7 +1039,7 @@ TAO_SFP_Object::send_frame (ACE_Message_Block *frame,
CORBA::Octet flags = TAO_ENCAP_BYTE_ORDER;
if (this->transport_ == 0)
ACE_ERROR_RETURN ((LM_ERROR,"TAO_SFP_Object::send_frame: transport is null\n"),-1);
- if (this->current_credit_ > 0)
+ if (this->current_credit_ != 0)
{
// if we have enough credit then we send.
int total_length = 0;
@@ -1167,7 +1173,8 @@ TAO_SFP_Object::send_frame (ACE_Message_Block *frame,
// Increment the sequence_num after sending the message.
this->sequence_num_++;
// Also reduce the number of credits.
- this->current_credit_--;
+ if (this->max_credit_ > 0)
+ this->current_credit_--;
}
}
else
@@ -1230,11 +1237,48 @@ TAO_SFP_Object::get_fragment (ACE_Message_Block *&mb,
return fragment_mb;
}
+int
+TAO_SFP_Object::set_policies (const TAO_AV_PolicyList& policies)
+{
+ TAO_AV_Policy *policy = 0;
+ for (u_int i=0;i<policies.length ();i++)
+ {
+ policy = policies[i];
+ switch (policies[i]->type ())
+ {
+
+ case TAO_AV_SFP_CREDIT_POLICY:
+ {
+ TAO_AV_SFP_Credit_Policy *credit_policy =
+ ACE_dynamic_cast (TAO_AV_SFP_Credit_Policy*,policy);
+ this->max_credit_ = credit_policy->value ();
+ }
+ default:
+ break;
+ }
+ }
+ return 0;
+}
+
// TAO_SFP_Consumer_Object
TAO_SFP_Consumer_Object::TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
- TAO_AV_Transport *transport)
+ TAO_AV_Transport *transport,
+ char *&sfp_options)
:TAO_SFP_Object (callback,transport)
{
+ TAO_AV_PolicyList policies = callback->get_policies ();
+ if (policies.length () == 0)
+ return;
+ this->set_policies (policies);
+ if (this->max_credit_ > 0)
+ {
+ ACE_NEW (sfp_options,
+ char [BUFSIZ]);
+
+ ACE_OS::sprintf (sfp_options,
+ "sfp:1.0:credit=%d",
+ max_credit_);
+ }
}
int
@@ -1254,17 +1298,37 @@ TAO_SFP_Consumer_Object::handle_input (void)
{
this->callback_->receive_frame (this->state_.frame_block_,
frame_info);
+ // Now release the memory for the frame.
+ if (this->state_.frame_block_ != &this->state_.static_frame_)
+ {
+ ACE_Message_Block *temp = 0;
+ for (temp = this->state_.frame_block_;
+ temp != 0;
+ temp = temp->cont ())
+ {
+ temp->release ();
+ delete temp;
+ }
+ }
this->state_.reset ();
}
return 0;
}
TAO_SFP_Producer_Object::TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
- TAO_AV_Transport *transport)
+ TAO_AV_Transport *transport,
+ char *&sfp_options)
:TAO_SFP_Object (callback,transport),
credit_sequence_num_ (0)
{
+ TAO_Tokenizer flow_string (sfp_options,':');
+ if (flow_string [2] != 0)
+ {
+ TAO_Tokenizer options (flow_string[2],'=');
+ if (options [1] != 0)
+ this->max_credit_ = ACE_OS::atoi (options[1]);
+ }
}
int
@@ -1347,13 +1411,15 @@ TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
TAO_AV_Callback *callback = 0;
endpoint->get_callback (entry->flowname (),
callback);
+ char *flow_string = entry->flow_protocol_str ();
switch (entry->role ())
{
case TAO_FlowSpec_Entry::TAO_AV_PRODUCER:
{
ACE_NEW_RETURN (object,
TAO_SFP_Producer_Object (callback,
- transport),
+ transport,
+ flow_string),
0);
}
break;
@@ -1361,7 +1427,8 @@ TAO_AV_SFP_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
{
ACE_NEW_RETURN (object,
TAO_SFP_Consumer_Object (callback,
- transport),
+ transport,
+ flow_string),
0);
}
break;
diff --git a/TAO/orbsvcs/orbsvcs/AV/sfp.h b/TAO/orbsvcs/orbsvcs/AV/sfp.h
index ea552319066..cbbc8365e22 100644
--- a/TAO/orbsvcs/orbsvcs/AV/sfp.h
+++ b/TAO/orbsvcs/orbsvcs/AV/sfp.h
@@ -79,6 +79,7 @@ public:
CORBA::Boolean more_fragments_;
ACE_Message_Block *frame_block_;
// boolean flags indicating that there are more fragments.
+ ACE_Message_Block static_frame_;
TAO_SFP_Fragment_Table_Map fragment_table_map_;
};
@@ -190,6 +191,7 @@ protected:
// dumps the buffer to the screen.
};
+// Beware the SFP_Base code relies on the Singleton being initialized.
typedef ACE_Singleton <TAO_SFP_Base,ACE_SYNCH_MUTEX> TAO_SFP_BASE;
class TAO_ORBSVCS_Export TAO_SFP_Object : public TAO_AV_Protocol_Object
@@ -197,6 +199,7 @@ class TAO_ORBSVCS_Export TAO_SFP_Object : public TAO_AV_Protocol_Object
public:
TAO_SFP_Object (TAO_AV_Callback *callback,
TAO_AV_Transport *transport);
+ // We should add a sfp options parameter.
virtual ~TAO_SFP_Object (void);
// Dtor
@@ -210,6 +213,8 @@ public:
TAO_AV_frame_info *frame_info = 0);
virtual int destroy (void);
+ virtual int set_policies (const TAO_AV_PolicyList &policies);
+
protected:
ACE_Message_Block *get_fragment (ACE_Message_Block *&frame,
size_t initial_len,
@@ -226,7 +231,8 @@ class TAO_ORBSVCS_Export TAO_SFP_Producer_Object : public TAO_SFP_Object
{
public:
TAO_SFP_Producer_Object (TAO_AV_Callback *callback,
- TAO_AV_Transport *transport);
+ TAO_AV_Transport *transport,
+ char *&flow_options);
virtual int handle_input (void);
protected:
CORBA::ULong credit_sequence_num_;
@@ -236,7 +242,8 @@ class TAO_ORBSVCS_Export TAO_SFP_Consumer_Object : public TAO_SFP_Object
{
public:
TAO_SFP_Consumer_Object (TAO_AV_Callback *callback,
- TAO_AV_Transport *transport);
+ TAO_AV_Transport *transport,
+ char *&flow_options);
virtual int handle_input (void);
};