summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-10-18 14:11:45 +0000
committeryamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795>2003-10-18 14:11:45 +0000
commit09a189be12d2a96962aa5f536daba4515095d0b5 (patch)
tree2fce671add5fd5f91f6d23ca4e9165173577a6ed
parent4724f51e36663f90e94700d0a81e8a958b0eeb41 (diff)
downloadATCD-09a189be12d2a96962aa5f536daba4515095d0b5.tar.gz
ChangelogTag: Sat Oct 18 10:16:20 2003 Yamuna Krishnamurthy <yamuna@oomworks.com>
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp65
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/AV_Core.h5
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp174
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h12
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i48
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp827
-rw-r--r--TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h302
-rw-r--r--TAO/orbsvcs/orbsvcs/Makefile.av6
-rwxr-xr-xTAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl26
9 files changed, 1441 insertions, 24 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
index 031cef0c01c..08597e73734 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp
@@ -15,6 +15,10 @@
#include "orbsvcs/AV/QoS_UDP.h"
#endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */
+#if defined (ACE_HAS_SCTP)
+#include "orbsvcs/AV/SCTP_SEQ.h"
+#endif // ACE_HAS_SCTP
+
#include "tao/debug.h"
#include "tao/ORB_Core.h"
@@ -251,7 +255,6 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint,
{
if (entry->handler () != 0)
{
-
//Yamuna:PLEASE CHECK THIS LATER
#if defined ACE_HAS_RAPI || defined (ACE_HAS_WINSOCK2_GQOS)
// For IN flows on the A side we should remove the handlers from the reactor.
@@ -799,6 +802,37 @@ TAO_AV_Core::load_default_transport_factories (void)
this->transport_factories_.insert (udp_qos_item);
#endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */
+#if defined ACE_HAS_SCTP
+ const char *sctp_seq_factory_str = "SCTP_SEQ_Factory";
+
+ TAO_AV_Transport_Factory *sctp_seq_factory = 0;
+ TAO_AV_Transport_Item *sctp_seq_item = 0;
+
+ sctp_seq_factory =
+ ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (sctp_seq_factory_str);
+ if (sctp_seq_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "SCTP SEQ Factory"));
+
+ ACE_NEW_RETURN (sctp_seq_factory,
+ TAO_AV_SCTP_SEQ_Factory,
+ -1);
+ }
+ else sctp_seq_factory->ref_count = 1;
+
+ ACE_NEW_RETURN (sctp_seq_item,
+ TAO_AV_Transport_Item ("SCTP_SEQ_Factory"),
+ -1);
+
+ sctp_seq_item->factory (sctp_seq_factory);
+
+ this->transport_factories_.insert (sctp_seq_item);
+#endif /* ACE_HAS_SCTP */
+
return 0;
}
@@ -912,6 +946,35 @@ TAO_AV_Core::load_default_flow_protocol_factories (void)
#endif /* defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) */
+#if defined ACE_HAS_SCTP
+
+ const char *sctp_seq_flow = "SCTP_SEQ_Flow_Factory";
+ TAO_AV_Flow_Protocol_Factory *sctp_seq_flow_factory = 0;
+ TAO_AV_Flow_Protocol_Item *sctp_seq_flow_item = 0;
+
+ sctp_seq_flow_factory =
+ ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (sctp_seq_flow);
+ if (sctp_seq_flow_factory == 0)
+ {
+ if (TAO_debug_level)
+ ACE_ERROR ((LM_WARNING,
+ "(%P|%t) WARNING - No %s found in Service Repository."
+ " Using default instance.\n",
+ "SCTP SEQ Flow Factory"));
+
+ ACE_NEW_RETURN (sctp_seq_flow_factory,
+ TAO_AV_SCTP_SEQ_Flow_Factory,
+ -1);
+ }
+ else sctp_seq_flow_factory->ref_count = 1;
+
+ ACE_NEW_RETURN (sctp_seq_flow_item, TAO_AV_Flow_Protocol_Item ("SCTP_SEQ_Flow_Factory"), -1);
+ sctp_seq_flow_item->factory (sctp_seq_flow_factory);
+
+ this->flow_protocol_factories_.insert (sctp_seq_flow_item);
+
+#endif /* ACE_HAS_SCTP */
+
TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0;
TAO_AV_Flow_Protocol_Item *tcp_item = 0;
diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.h b/TAO/orbsvcs/orbsvcs/AV/AV_Core.h
index 77f6e89a218..d6ac2037153 100644
--- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.h
+++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.h
@@ -70,8 +70,9 @@ public:
TAO_AV_RTP_UDP_MCAST = 10,
TAO_AV_SFP_UDP_MCAST = 11,
TAO_AV_QOS_UDP = 12,
- TAO_AV_USERDEFINED_UDP = 13,
- TAO_AV_USERDEFINED_UDP_MCAST = 14
+ TAO_AV_USERDEFINED_UDP = 13,
+ TAO_AV_USERDEFINED_UDP_MCAST = 14,
+ TAO_AV_SCTP_SEQ = 15
};
enum Flow_Component
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
index d867b376023..e765fbdfe43 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp
@@ -33,6 +33,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void)
entry_ (),
is_multicast_ (0),
peer_addr_ (0),
+ local_sec_addr_ (0),
+ num_local_sec_addrs_ (0),
+ peer_sec_addr_ (0),
+ num_peer_sec_addrs_ (0),
peer_control_addr_ (0),
local_addr_ (0),
local_control_addr_ (0),
@@ -69,6 +73,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
entry_ (),
is_multicast_ (0),
peer_addr_ (0),
+ local_sec_addr_ (0),
+ num_local_sec_addrs_ (0),
+ peer_sec_addr_ (0),
+ num_peer_sec_addrs_ (0),
peer_control_addr_ (0),
local_addr_ (0),
local_control_addr_ (0),
@@ -105,6 +113,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname,
entry_ (),
is_multicast_ (0),
peer_addr_ (0),
+ local_sec_addr_ (0),
+ num_local_sec_addrs_ (0),
+ peer_sec_addr_ (0),
+ num_peer_sec_addrs_ (0),
peer_control_addr_ (0),
local_addr_ (0),
local_control_addr_ (0),
@@ -128,8 +140,8 @@ TAO_FlowSpec_Entry::~TAO_FlowSpec_Entry (void)
delete address_;
if (this->clean_up_control_address_)
delete control_address_;
- delete local_addr_;
- delete local_control_addr_;
+ if (local_control_addr_ != 0)
+ delete local_control_addr_;
}
int
@@ -139,6 +151,8 @@ TAO_FlowSpec_Entry::set_protocol (void)
{
if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"TCP") == 0)
this->protocol_ = TAO_AV_Core::TAO_AV_TCP;
+ else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"SCTP_SEQ") == 0)
+ this->protocol_ = TAO_AV_Core::TAO_AV_SCTP_SEQ;
else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"UDP") == 0)
this->protocol_ = TAO_AV_Core::TAO_AV_UDP;
else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"QoS_UDP") == 0)
@@ -218,6 +232,9 @@ int
TAO_FlowSpec_Entry::parse_address (const char *address,
TAO_AV_Core::Flow_Component flow_comp)
{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s\n", address));
+
if (address == 0)
return 0;
if (ACE_OS::strcmp (address,"") == 0)
@@ -232,11 +249,38 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
if (protocol_tokenizer [1] != 0)
{
+ ACE_DEBUG ((LM_DEBUG,
+ "Protocol tokenixer is not null\n"));
if ((flow_comp == TAO_AV_Core::TAO_AV_DATA) ||
+ //(flow_comp == TAO_AV_Core::TAO_AV_BOTH) ||
(flow_comp == TAO_AV_Core::TAO_AV_CONTROL) )
{
ACE_CString addr;
- addr += protocol_tokenizer[1];
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+ TAO_Tokenizer addr_token (protocol_tokenizer [1], ';');
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Number of local sec addresses = %d\n",
+ addr_token.num_tokens () - 1));
+
+ if (addr_token.num_tokens () != 0)
+ {
+ addr += addr_token [0];
+ ACE_NEW_RETURN (local_sec_addr_, char* [addr_token.num_tokens () - 1],-1);
+ for (int j = 1; j <= addr_token.num_tokens () - 1; j++)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "adding addresses to sequence %s\n",
+ addr_token [j]));
+
+ local_sec_addr_ [j-1] = CORBA::string_dup (addr_token [j]);
+ }
+ num_local_sec_addrs_ = addr_token.num_tokens () - 1;
+ }
+ }
+ else addr += protocol_tokenizer[1];
+
switch (this->protocol_)
{
@@ -244,6 +288,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
case TAO_AV_Core::TAO_AV_USERDEFINED_UDP:
case TAO_AV_Core::TAO_AV_RTP_UDP:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
case TAO_AV_Core::TAO_AV_UDP:
case TAO_AV_Core::TAO_AV_QOS_UDP:
{
@@ -264,9 +309,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
this->control_address_ = inet_addr;
}
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,inet_addr->get_ip_address () ));
-
if (IN_CLASSD (inet_addr->get_ip_address ()))
{
if (TAO_debug_level > 0)
@@ -299,6 +341,10 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
}
else
{
+ ACE_DEBUG ((LM_DEBUG,
+ "AV BOTH %s \n",
+ protocol_tokenizer[1]));
+
TAO_Tokenizer address_tokenizer (protocol_tokenizer[1], ':');
TAO_Tokenizer port_tokenizer (address_tokenizer[1], ';');
ACE_CString addr;
@@ -306,6 +352,28 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
addr += ":";
addr += port_tokenizer[0];
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Number of local sec addresses = %d\n",
+ port_tokenizer.num_tokens () - 1));
+
+ if (port_tokenizer.num_tokens () - 1 != 0)
+ {
+ ACE_NEW_RETURN (local_sec_addr_, char* [port_tokenizer.num_tokens () - 1],-1);
+ for (int j = 1; j <= port_tokenizer.num_tokens () - 1; j++)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "adding addresses to sequence %s\n",
+ port_tokenizer [j]));
+
+ local_sec_addr_ [j-1] = CORBA::string_dup (port_tokenizer [j]);
+ }
+ num_local_sec_addrs_ = port_tokenizer.num_tokens () - 1;
+ }
+ }
+
short control_port = ACE_OS::atoi(port_tokenizer[0]) + 1;
char control_port_str[6];
sprintf (control_port_str, "%d", control_port);
@@ -327,6 +395,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
case TAO_AV_Core::TAO_AV_USERDEFINED_UDP:
case TAO_AV_Core::TAO_AV_RTP_UDP:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
case TAO_AV_Core::TAO_AV_UDP:
case TAO_AV_Core::TAO_AV_QOS_UDP:
{
@@ -337,8 +406,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
-1);
this->clean_up_address_ = 1;
this->address_ = inet_addr;
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,inet_addr->get_ip_address () ));
if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"RTP/UDP") == 0)
{
@@ -348,8 +415,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
-1);
this->clean_up_control_address_ = 1;
this->control_address_ = control_inet_addr;
- if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,control_inet_addr->get_ip_address () ));
}
if (IN_CLASSD (inet_addr->get_ip_address ()))
@@ -383,6 +448,8 @@ TAO_FlowSpec_Entry::parse_address (const char *address,
}
}
}
+ ACE_DEBUG ((LM_DEBUG,
+ "Return from parse address\n"));
return 0;
}
@@ -485,10 +552,41 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry)
if (tokenizer [TAO_AV_PEER_ADDR] != 0)
{
- ACE_INET_Addr *addr;
- ACE_NEW_RETURN (addr,
- ACE_INET_Addr (tokenizer [TAO_AV_PEER_ADDR]),
- 0);
+ ACE_INET_Addr *addr = 0;
+
+
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+ TAO_Tokenizer addr_token (tokenizer [TAO_AV_PEER_ADDR], ';');
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Number of peer sec addresses = %d\n",
+ addr_token.num_tokens () - 1));
+
+ if (addr_token.num_tokens () != 0)
+ {
+ ACE_NEW_RETURN (addr,
+ ACE_INET_Addr (addr_token [0]),
+ 0);
+
+ ACE_NEW_RETURN (peer_sec_addr_, char* [addr_token.num_tokens () - 1],-1);
+ for (int j = 1; j <= addr_token.num_tokens () - 1; j++)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "adding addresses to sequence %s\n",
+ addr_token [j]));
+
+ peer_sec_addr_ [j-1] = CORBA::string_dup (addr_token [j]);
+ }
+ num_peer_sec_addrs_ = addr_token.num_tokens () - 1;
+ }
+ }
+ else
+ {
+ ACE_NEW_RETURN (addr,
+ ACE_INET_Addr (tokenizer [TAO_AV_PEER_ADDR]),
+ 0);
+ }
this->peer_addr_ = addr;
char buf [BUFSIZ];
@@ -496,12 +594,13 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry)
ACE_DEBUG ((LM_DEBUG,
"Peer Address %s \n",
buf));
- }
-
+
+ }
+
if (tokenizer [TAO_AV_FLOW_PROTOCOL] != 0)
if (this->parse_flow_protocol_string (tokenizer [TAO_AV_FLOW_PROTOCOL]) < 0)
return -1;
-
+
return 0;
}
@@ -529,7 +628,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
{
if (this->flowname_.length() == 0)
return "";
-
+
char address [BUFSIZ];
ACE_CString address_str;
ACE_CString peer_address_str;
@@ -548,6 +647,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
{
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->address_);
inet_addr->addr_to_string (address,BUFSIZ);
@@ -562,6 +662,15 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
address_str += "=";
address_str += cstring;
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+ for (int i = 0; i < this->num_local_sec_addrs_; i++)
+ {
+ address_str += ";";
+ address_str += this->local_sec_addr_ [i];
+ }
+ }
+
}
else
{
@@ -587,6 +696,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
{
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->address_);
control_port = inet_addr->get_port_number() + 1;
@@ -617,6 +727,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
{
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->peer_addr_);
inet_addr->addr_to_string (address,BUFSIZ);
@@ -635,6 +746,15 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
//peer_address_str = this->carrier_protocol_;
//peer_address_str += "=";
peer_address_str += cstring;
+
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+ for (int i = 0; i < this->num_peer_sec_addrs_; i++)
+ {
+ peer_address_str += ";";
+ peer_address_str += this->peer_sec_addr_ [i];
+ }
+ }
}
@@ -654,6 +774,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
{
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->control_address_);
control_port = inet_addr->get_port_number();
@@ -684,9 +805,11 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void)
this->entry_ += "\\";
this->entry_ += peer_address_str;
}
+ else ACE_DEBUG ((LM_DEBUG,
+ "No peer address specified\n"));
if (TAO_debug_level > 0)
- ACE_DEBUG ((LM_DEBUG,"entry_to_string: entry = %s\n",this->entry_.c_str()));
+ ACE_DEBUG ((LM_DEBUG,"Forward entry_to_string: entry = %s\n",this->entry_.c_str()));
return this->entry_.c_str();
}
@@ -804,6 +927,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
case TAO_AV_Core::TAO_AV_SFP_UDP:
case TAO_AV_Core::TAO_AV_USERDEFINED_UDP:
{
@@ -820,6 +944,15 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void)
address_str += "=";
address_str += cstring;
+ if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ)
+ {
+ for (int i = 0; i < this->num_local_sec_addrs_; i++)
+ {
+ address_str += ";";
+ address_str += this->local_sec_addr_ [i];
+ }
+ }
+
}
else
{
@@ -843,6 +976,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void)
case TAO_AV_Core::TAO_AV_QOS_UDP:
case TAO_AV_Core::TAO_AV_UDP_MCAST:
case TAO_AV_Core::TAO_AV_TCP:
+ case TAO_AV_Core::TAO_AV_SCTP_SEQ:
{
ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->control_address_);
control_port = inet_addr->get_port_number();
@@ -869,6 +1003,6 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void)
// this->entry_ += "\\";
// this->entry_ += format_;
- if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"entry_to_string: entry = %s\n",this->entry_.c_str() ));
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Reverse entry_to_string: entry = %s\n",this->entry_.c_str() ));
return this->entry_.c_str();
}
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
index 08fcaf6baf2..e171286341a 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h
@@ -152,6 +152,14 @@ public:
int set_peer_control_addr (ACE_Addr *peer_control_addr);
ACE_Addr *get_peer_control_addr (void);
+ int set_local_sec_addr (char** local_sec_addr, int size);
+ char** get_local_sec_addr (void);
+ int num_local_sec_addrs (void);
+
+ int set_peer_sec_addr (char** peer_sec_addr, int size);
+ char** get_peer_sec_addr (void);
+ int num_peer_sec_addrs (void);
+
int set_local_addr (ACE_Addr *local_addr);
ACE_Addr *get_local_addr (void);
char *get_local_addr_str (void);
@@ -232,6 +240,10 @@ protected:
int is_multicast_;
ACE_Addr *peer_addr_;
+ char** local_sec_addr_;
+ int num_local_sec_addrs_;
+ char** peer_sec_addr_;
+ int num_peer_sec_addrs_;
ACE_Addr *peer_control_addr_;
ACE_Addr *local_addr_;
ACE_Addr *local_control_addr_;
diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
index 3a940c75963..74f705772a2 100644
--- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
+++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i
@@ -178,6 +178,54 @@ TAO_FlowSpec_Entry::get_local_control_addr (void)
}
ACE_INLINE
+int
+TAO_FlowSpec_Entry::set_local_sec_addr (char** local_sec_addr,
+ int size)
+{
+ this->local_sec_addr_ = local_sec_addr;
+ this->num_local_sec_addrs_ = size;
+ return 0;
+}
+
+ACE_INLINE
+char**
+TAO_FlowSpec_Entry::get_local_sec_addr (void)
+{
+ return this->local_sec_addr_;
+}
+
+ACE_INLINE
+int
+TAO_FlowSpec_Entry::num_local_sec_addrs (void)
+{
+ return this->num_local_sec_addrs_;
+}
+
+ACE_INLINE
+int
+TAO_FlowSpec_Entry::set_peer_sec_addr (char** peer_sec_addr,
+ int size)
+{
+ this->peer_sec_addr_ = peer_sec_addr;
+ this->num_peer_sec_addrs_ = size;
+ return 0;
+}
+
+ACE_INLINE
+char**
+TAO_FlowSpec_Entry::get_peer_sec_addr (void)
+{
+ return this->peer_sec_addr_;
+}
+
+ACE_INLINE
+int
+TAO_FlowSpec_Entry::num_peer_sec_addrs (void)
+{
+ return this->num_peer_sec_addrs_;
+}
+
+ACE_INLINE
TAO_AV_Transport*
TAO_FlowSpec_Entry::transport (void)
{
diff --git a/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp
new file mode 100644
index 00000000000..e59c9ed57e1
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp
@@ -0,0 +1,827 @@
+// $Id$
+
+#include "SCTP_SEQ.h"
+#include "AVStreams_i.h"
+#include "ace/Multihomed_INET_Addr.h"
+#include "tao/debug.h"
+#include "ace/Arg_Shifter.h"
+
+ACE_RCSID (AV,
+ SCTP_SEQ,
+ "$Id$")
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Transport
+//------------------------------------------------------------
+
+TAO_AV_SCTP_SEQ_Transport::TAO_AV_SCTP_SEQ_Transport (void)
+ :handler_ (0)
+{
+}
+
+TAO_AV_SCTP_SEQ_Transport::TAO_AV_SCTP_SEQ_Transport (TAO_AV_SCTP_SEQ_Flow_Handler *handler)
+ :handler_ (handler)
+{
+}
+
+TAO_AV_SCTP_SEQ_Transport::~TAO_AV_SCTP_SEQ_Transport (void)
+{
+}
+
+int
+TAO_AV_SCTP_SEQ_Transport::open (ACE_Addr * /*address*/)
+{
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Transport::close (void)
+{
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Transport::mtu (void)
+{
+ return 0;
+}
+
+ACE_Addr*
+TAO_AV_SCTP_SEQ_Transport::get_peer_addr (void)
+{
+ return 0;
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *)
+{
+ // For the most part this was copied from GIOP::send_request and
+ // friends.
+
+ // ACE_Time_Value timeout;
+
+ iovec iov[ACE_IOV_MAX];
+ int iovcnt = 0;
+ ssize_t n = 0;
+ ssize_t nbytes = 0;
+
+ for (const ACE_Message_Block *i = mblk;
+ i != 0;
+ i = i->cont ())
+ {
+ // Make sure there is something to send!
+ if (i->length () > 0)
+ {
+ iov[iovcnt].iov_base = i->rd_ptr ();
+ iov[iovcnt].iov_len = ACE_static_cast (u_long, i->length ());
+ iovcnt++;
+
+ // The buffer is full make a OS call. @@ TODO this should
+ // be optimized on a per-platform basis, for instance, some
+ // platforms do not implement writev() there we should copy
+ // the data into a buffer and call send_n(). In other cases
+ // there may be some limits on the size of the iovec, there
+ // we should set ACE_IOV_MAX to that limit.
+ if (iovcnt == ACE_IOV_MAX)
+ {
+ n = this->handler_->peer ().sendv_n ((const iovec *) iov,
+ iovcnt);
+ //&timeout);
+ if (n < 1)
+ return n;
+
+ nbytes += n;
+ iovcnt = 0;
+ }
+ }
+ }
+
+ // Check for remaining buffers to be sent!
+ if (iovcnt != 0)
+ {
+ n = this->handler_->peer ().sendv_n ((const iovec *) iov,
+ iovcnt);
+ if (n < 1)
+ return n;
+
+ nbytes += n;
+ }
+
+ return nbytes;
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::send (const char *buf,
+ size_t len,
+ ACE_Time_Value *)
+{
+ return this->handler_->peer ().send_n (buf, len);
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::send (const iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *)
+{
+ return this->handler_->peer ().sendv_n (iov,
+ iovcnt);
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::recv (char *buf,
+ size_t len,
+ ACE_Time_Value *)
+{
+ return this->handler_->peer ().recv (buf, len);
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::recv (char *buf,
+ size_t len,
+ int flags,
+ ACE_Time_Value *)
+{
+ return this->handler_->peer ().recv (buf,
+ len,
+ flags);
+}
+
+ssize_t
+TAO_AV_SCTP_SEQ_Transport::recv (iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *)
+{
+ return handler_->peer ().recvv_n (iov, iovcnt);
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Base_Acceptor
+//------------------------------------------------------------
+
+int
+TAO_AV_SCTP_SEQ_Base_Acceptor::acceptor_open (TAO_AV_SCTP_SEQ_Acceptor *acceptor,
+ ACE_Reactor *reactor,
+ const ACE_INET_Addr &local_addr,
+ TAO_FlowSpec_Entry *entry)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "In base acceptor open"));
+
+ this->acceptor_ = acceptor;
+ this->reactor_ = reactor;
+ this->entry_ = entry;
+
+ ACE_UINT32 local_ip_addr [entry->num_local_sec_addrs ()];
+ ACE_INET_Addr ip_addr;
+ char** addrs = entry->get_local_sec_addr ();
+ for (int i = 0; i < entry->num_local_sec_addrs (); i++)
+ {
+ ACE_CString addr_str (addrs[i]);
+ addr_str += ":";
+ ip_addr.set (addr_str.c_str ());
+ local_ip_addr [i] = ip_addr.get_ip_address ();
+ }
+
+ ACE_Multihomed_INET_Addr multi_addr;
+ multi_addr.set (local_addr.get_port_number (),
+ local_addr.get_ip_address (),
+ 1,
+ local_ip_addr,
+ entry->num_local_sec_addrs ());
+
+ int result = this->open (multi_addr,reactor);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Acceptor::open failed\n"),-1);
+
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Base_Acceptor::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler)
+{
+ int result = this->acceptor_->make_svc_handler (handler);
+ if (result < 0)
+ return result;
+ handler->reactor (this->reactor_);
+ this->entry_->handler (handler);
+
+
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Acceptor
+//------------------------------------------------------------
+
+TAO_AV_SCTP_SEQ_Acceptor::TAO_AV_SCTP_SEQ_Acceptor (void)
+{
+}
+
+TAO_AV_SCTP_SEQ_Acceptor::~TAO_AV_SCTP_SEQ_Acceptor (void)
+{
+}
+
+int
+TAO_AV_SCTP_SEQ_Acceptor::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler)
+{
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_SCTP_SEQ_Acceptor::make_svc_handler\n"
+ ));
+
+ if (this->endpoint_ != 0)
+ {
+ ACE_NEW_RETURN (sctp_handler,
+ TAO_AV_SCTP_SEQ_Flow_Handler,
+ -1);
+
+ TAO_AV_Protocol_Object *object =
+ this->flow_protocol_factory_->make_protocol_object (this->entry_,
+ this->endpoint_,
+ sctp_handler,
+ sctp_handler->transport ());
+
+ sctp_handler->protocol_object (object);
+ this->endpoint_->set_flow_handler (this->flowname_.c_str (),sctp_handler);
+ this->entry_->protocol_object (object);
+ this->entry_->handler (sctp_handler);
+ }
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Acceptor::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory,
+ TAO_AV_Core::Flow_Component flow_comp)
+{
+ this->flow_protocol_factory_ = factory;
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_SCTP_SEQ_Acceptor::open "));
+
+ this->av_core_ = av_core;
+ this->endpoint_ = endpoint;
+ this->entry_ = entry;
+ if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
+ this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
+ else
+ this->flowname_ = entry->flowname ();
+ ACE_Addr *address = entry->address ();
+
+ ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) address;
+
+ inet_addr->set (inet_addr->get_port_number (),
+ inet_addr->get_host_name ());
+
+ char buf[BUFSIZ];
+ inet_addr->addr_to_string (buf,
+ BUFSIZ);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_SCTP_SEQ_Acceptor::open: %s",
+ buf
+ ));
+
+ //Add code for reading multihomed addresses and pass the multihomed
+ //addr to the following method. ACE_Multihomed_addr derives from
+ //ACE_INET_Addr, hence this should not be an issue.
+ int result = this->acceptor_.acceptor_open (this,
+ av_core->reactor (),
+ *inet_addr,
+ entry);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO_AV_SCTP_SEQ_Acceptor::open failed"),
+ -1);
+
+ entry->set_local_addr (address);
+
+
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory,
+ TAO_AV_Core::Flow_Component flow_comp)
+{
+ this->flow_protocol_factory_ = factory;
+ this->av_core_ = av_core;
+ this->endpoint_ = endpoint;
+ this->entry_ = entry;
+ if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
+ this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname());
+ else
+ this->flowname_ = entry->flowname ();
+
+ ACE_INET_Addr *address;
+ ACE_NEW_RETURN (address,
+ ACE_INET_Addr ("0"),
+ -1);
+
+ int result = this->acceptor_.acceptor_open (this,
+ av_core->reactor (),
+ *address,
+ entry);
+
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "TAO_AV_SCTP_SEQ_Acceptor::open failed"),
+ -1);
+
+ this->acceptor_.acceptor ().get_local_addr (*address);
+
+ address->set (address->get_port_number (),
+ address->get_host_name ());
+
+ char buf[BUFSIZ];
+ address->addr_to_string (buf,BUFSIZ);
+
+ if (TAO_debug_level > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "TAO_AV_SCTP_SEQ_Acceptor::open_default: %s\n",
+ buf));
+
+ entry->set_local_addr (address);
+
+ return 0;
+}
+
+
+int
+TAO_AV_SCTP_SEQ_Acceptor::close (void)
+{
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Base_Connector
+//------------------------------------------------------------
+
+int
+TAO_AV_SCTP_SEQ_Base_Connector::connector_open (TAO_AV_SCTP_SEQ_Connector *connector,
+ ACE_Reactor *reactor)
+{
+ this->connector_ = connector;
+ this->reactor_ = reactor;
+
+ int result = this->open (reactor);
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Connector::open failed\n"),-1);
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Base_Connector::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler)
+{
+ int result =
+ this->connector_->make_svc_handler (sctp_handler);
+ if (result < 0)
+ return result;
+ sctp_handler->reactor (this->reactor_);
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Base_Connector::connector_connect (TAO_AV_SCTP_SEQ_Flow_Handler *&handler,
+ const ACE_Multihomed_INET_Addr &remote_addr,
+ const ACE_Multihomed_INET_Addr &local_addr)
+{
+ int result = this->connect (handler,
+ remote_addr,
+ 0,
+ local_addr);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Connector::connect failed\n"),-1);
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Connector
+//------------------------------------------------------------
+TAO_AV_SCTP_SEQ_Connector::TAO_AV_SCTP_SEQ_Connector (void)
+{
+}
+
+TAO_AV_SCTP_SEQ_Connector::~TAO_AV_SCTP_SEQ_Connector (void)
+{
+}
+
+int
+TAO_AV_SCTP_SEQ_Connector::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Connector::make_svc_handler\n"));
+ // TAO_AV_Callback *callback = 0;
+ if (this->endpoint_ != 0)
+ {
+ // this->endpoint_->get_callback (this->flowname_.c_str (),
+ // callback);
+ ACE_NEW_RETURN (sctp_handler,
+ // TAO_AV_SCTP_SEQ_Flow_Handler (callback),
+ TAO_AV_SCTP_SEQ_Flow_Handler,
+ -1);
+ TAO_AV_Protocol_Object *object =
+ this->flow_protocol_factory_->make_protocol_object (this->entry_,
+ this->endpoint_,
+ sctp_handler,
+ sctp_handler->transport ());
+ sctp_handler->protocol_object (object);
+ // callback->protocol_object (object);
+ // this->endpoint_->set_protocol_object (this->flowname_.c_str (),
+ // object);
+ this->endpoint_->set_flow_handler (this->flowname_.c_str (),sctp_handler);
+ this->entry_->protocol_object (object);
+ this->entry_->handler (sctp_handler);
+ }
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Connector::open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_Flow_Protocol_Factory *factory)
+
+{
+ this->endpoint_ = endpoint;
+ this->flow_protocol_factory_ = factory;
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Connector::open "));
+ int result = this->connector_.connector_open(this,
+ av_core->reactor ());
+ return result;
+}
+
+int
+TAO_AV_SCTP_SEQ_Connector::connect (TAO_FlowSpec_Entry *entry,
+ TAO_AV_Transport *&transport,
+ TAO_AV_Core::Flow_Component flow_comp)
+{
+ this->entry_ = entry;
+ if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL)
+ this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ());
+ else
+ this->flowname_ = entry->flowname ();
+ ACE_Addr *remote_addr = entry->address ();
+ ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr *,remote_addr);
+ TAO_AV_SCTP_SEQ_Flow_Handler *handler = 0;
+
+ ACE_Multihomed_INET_Addr remote_multi_addr;
+ remote_multi_addr.set (inet_addr->get_port_number (),
+ inet_addr->get_ip_address (),
+ 1,
+ 0,
+ 0);
+
+ ACE_Multihomed_INET_Addr local_addr; //This can be a multihomed address
+ ACE_INET_Addr *addr;
+ if (entry->get_peer_addr () != 0)
+ {
+ addr = ACE_dynamic_cast (ACE_INET_Addr *, entry->get_peer_addr ());
+ }
+ else
+ {
+ ACE_NEW_RETURN (addr,
+ ACE_INET_Addr ("0"),
+ -1);
+ }
+
+ ACE_UINT32 local_ip_addr [entry->num_peer_sec_addrs ()];
+ ACE_INET_Addr ip_addr;
+ char** addrs = entry->get_peer_sec_addr ();
+ for (int i = 0; i < entry->num_peer_sec_addrs (); i++)
+ {
+ ACE_CString addr_str (addrs[i]);
+ addr_str += ":";
+ ip_addr.set (addr_str.c_str ());
+ local_ip_addr [i] = ip_addr.get_ip_address ();
+ }
+
+ local_addr.set (addr->get_port_number (),
+ addr->get_ip_address (),
+ 1,
+ local_ip_addr,
+ entry->num_peer_sec_addrs ());
+
+ int result = this->connector_.connector_connect (handler,
+ remote_multi_addr,
+ local_addr);
+
+ if (result < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_connector::connect failed\n"),-1);
+ entry->handler (handler);
+ transport = handler->transport ();
+
+ if (TAO_debug_level > 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Local Addrs\n"));
+ char buf [BUFSIZ];
+ size_t size = BUFSIZ;
+ ACE_INET_Addr *peer_addrs;
+ ACE_NEW_RETURN (peer_addrs,ACE_INET_Addr [size], -1);
+ handler->peer ().get_local_addrs (peer_addrs, size);
+ for (unsigned int i=0; i < size;i++)
+ {
+ peer_addrs [i].addr_to_string (buf,
+ BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,
+ "%s %d\n",
+ buf,
+ size));
+ }
+
+ ACE_DEBUG ((LM_DEBUG,
+ "Remote Addrs\n"));
+
+ size = BUFSIZ;
+ handler->peer ().get_remote_addrs (peer_addrs, size);
+ for (unsigned int i=0; i < size;i++)
+ {
+ peer_addrs [i].addr_to_string (buf,
+ BUFSIZ);
+ ACE_DEBUG ((LM_DEBUG,
+ "%s %d\n",
+ buf,
+ size));
+ }
+ delete peer_addrs;
+ }
+
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Connector::close (void)
+{
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Protocol_Factory
+//------------------------------------------------------------
+
+
+TAO_AV_SCTP_SEQ_Factory::TAO_AV_SCTP_SEQ_Factory (void)
+{
+}
+
+TAO_AV_SCTP_SEQ_Factory::~TAO_AV_SCTP_SEQ_Factory (void)
+{
+}
+
+
+int
+TAO_AV_SCTP_SEQ_Factory::match_protocol (const char *protocol_string)
+{
+ if (ACE_OS::strcasecmp (protocol_string,"SCTP_SEQ") == 0)
+ return 1;
+ return 0;
+}
+
+TAO_AV_Acceptor*
+TAO_AV_SCTP_SEQ_Factory::make_acceptor (void)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_acceptor\n"));
+ TAO_AV_Acceptor *acceptor = 0;
+ ACE_NEW_RETURN (acceptor,
+ TAO_AV_SCTP_SEQ_Acceptor,
+ 0);
+ return acceptor;
+}
+
+TAO_AV_Connector*
+TAO_AV_SCTP_SEQ_Factory::make_connector (void)
+{
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_connector\n"));
+ TAO_AV_Connector *connector = 0;
+ ACE_NEW_RETURN (connector,
+ TAO_AV_SCTP_SEQ_Connector,
+ 0);
+ return connector;
+}
+
+
+int
+TAO_AV_SCTP_SEQ_Factory::init (int,
+ char *[])
+{
+
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Object
+//------------------------------------------------------------
+
+int
+TAO_AV_SCTP_SEQ_Object::handle_input (void)
+{
+ int n = this->transport_->recv (this->frame_.rd_ptr (),
+ this->frame_.size ());
+ if (n == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input recv failed\n"),-1);
+ if (n == 0)
+ ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input connection closed\n"),-1);
+ this->frame_.wr_ptr (this->frame_.rd_ptr () + n);
+
+ return this->callback_->receive_frame (&this->frame_);
+}
+
+int
+TAO_AV_SCTP_SEQ_Object::send_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info * /*frame_info*/)
+{
+ int result = this->transport_->send (frame);
+ return result;
+}
+
+int
+TAO_AV_SCTP_SEQ_Object::send_frame (const iovec *iov,
+ int iovcnt,
+ TAO_AV_frame_info * /*frame_info*/)
+{
+ return this->transport_->send (iov,iovcnt);
+}
+
+int
+TAO_AV_SCTP_SEQ_Object::send_frame (const char*buf,
+ size_t len)
+{
+ int result = this->transport_->send (buf, len, 0);
+ return result;
+}
+
+
+TAO_AV_SCTP_SEQ_Object::TAO_AV_SCTP_SEQ_Object (TAO_AV_Callback *callback,
+ TAO_AV_Transport *transport)
+ :TAO_AV_Protocol_Object (callback,transport)
+{
+ // @@ Is this a good size?
+ this->frame_.size (BUFSIZ);
+}
+
+TAO_AV_SCTP_SEQ_Object::~TAO_AV_SCTP_SEQ_Object (void)
+{
+ // No-op
+}
+int
+TAO_AV_SCTP_SEQ_Object::destroy (void)
+{
+ this->callback_->handle_destroy ();
+ delete this;
+ return 0;
+}
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Flow_Handler
+//------------------------------------------------------------
+
+TAO_AV_SCTP_SEQ_Flow_Handler::TAO_AV_SCTP_SEQ_Flow_Handler (TAO_AV_Callback * /*callback*/)
+ // :TAO_AV_Flow_Handler (callback)
+{
+ ACE_NEW (this->transport_,
+ TAO_AV_SCTP_SEQ_Transport (this));
+}
+
+TAO_AV_SCTP_SEQ_Flow_Handler::~TAO_AV_SCTP_SEQ_Flow_Handler (void)
+{
+ delete this->transport_;
+}
+
+TAO_AV_Transport *
+TAO_AV_SCTP_SEQ_Flow_Handler::transport (void)
+{
+ return this->transport_;
+}
+
+int
+TAO_AV_SCTP_SEQ_Flow_Handler::open (void * /*arg*/)
+{
+ ACE_CDR::Long nodelay = 1;
+
+#if defined (SCTP_NODELAY)
+ if (this->peer ().set_option (IPPROTO_SCTP,
+ SCTP_NODELAY,
+ (void *) &nodelay,
+ sizeof (nodelay)) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "NODELAY failed\n"),
+ -1);
+#endif /* SCTP_NODELAY */
+
+ // Called by the <Strategy_Acceptor> when the handler is completely
+ // connected.
+ ACE_INET_Addr addr;
+
+ if (this->peer ().get_remote_addr (addr) == -1)
+ return -1;
+
+ char server[MAXHOSTNAMELEN + 16];
+
+ (void) addr.addr_to_string (server, sizeof (server));
+
+ if (TAO_debug_level > 0)
+ if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,
+ "(%P|%t) connection to server <%s> on %d\n",
+ server, this->peer ().get_handle ()));
+
+ this->peer ().disable (ACE_NONBLOCK);
+ // Register the handler with the reactor.
+ if (this->reactor ()
+ && this->reactor ()->register_handler
+ (this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("unable to register client handler")),
+ -1);
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Flow_Handler::handle_input (ACE_HANDLE /*fd*/)
+{
+ return this->protocol_object_->handle_input ();
+}
+
+int
+TAO_AV_SCTP_SEQ_Flow_Handler::handle_timeout (const ACE_Time_Value &tv,
+ const void *arg)
+{
+ return TAO_AV_Flow_Handler::handle_timeout (tv,arg);
+}
+
+
+//------------------------------------------------------------
+// TAO_AV_SCTP_SEQ_Flow_Factory
+//------------------------------------------------------------
+TAO_AV_SCTP_SEQ_Flow_Factory::TAO_AV_SCTP_SEQ_Flow_Factory (void)
+{
+}
+
+TAO_AV_SCTP_SEQ_Flow_Factory::~TAO_AV_SCTP_SEQ_Flow_Factory (void)
+{
+}
+
+int
+TAO_AV_SCTP_SEQ_Flow_Factory::init (int /* argc */,
+ char * /* argv */ [])
+{
+ return 0;
+}
+
+int
+TAO_AV_SCTP_SEQ_Flow_Factory::match_protocol (const char *flow_string)
+{
+ if (ACE_OS::strcasecmp (flow_string,"SCTP_SEQ") == 0)
+ return 1;
+ return 0;
+}
+
+TAO_AV_Protocol_Object*
+TAO_AV_SCTP_SEQ_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport)
+{
+ TAO_AV_Callback *callback = 0;
+ if( endpoint->get_callback (entry->flowname (), callback) ) {
+ ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0);
+ }
+
+ TAO_AV_SCTP_SEQ_Object *object = 0;
+ ACE_NEW_RETURN (object,
+ TAO_AV_SCTP_SEQ_Object (callback,
+ transport),
+ 0);
+ callback->open (object,
+ handler);
+ endpoint->set_protocol_object (entry->flowname (),
+ object);
+ return object;
+}
+
+ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Flow_Factory)
+ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Flow_Factory,
+ ACE_TEXT ("SCTP_SEQ_Flow_Factory"),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Flow_Factory),
+ ACE_Service_Type::DELETE_THIS |
+ ACE_Service_Type::DELETE_OBJ,
+ 0)
+
+ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Factory)
+ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Factory,
+ ACE_TEXT ("SCTP_SEQ_Factory"),
+ ACE_SVC_OBJ_T,
+ &ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Factory),
+ ACE_Service_Type::DELETE_THIS |
+ ACE_Service_Type::DELETE_OBJ,
+ 0)
diff --git a/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h
new file mode 100644
index 00000000000..4cd811b3692
--- /dev/null
+++ b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h
@@ -0,0 +1,302 @@
+/* -*- C++ -*- */
+
+//=============================================================================
+/**
+ * @file SCTP_SEQ.h
+ *
+ * $Id$
+ *
+ * @author Yamuna Krishnamurthy <yamuna@oomworks.com>
+ */
+//=============================================================================
+
+#ifndef TAO_AV_SCTP_SEQ_H
+#define TAO_AV_SCTP_SEQ_H
+#include /**/ "ace/pre.h"
+
+
+#include "ace/OS.h"
+#include "ace/Auto_Ptr.h"
+#include "Protocol_Factory.h"
+#include "ace/SOCK_SEQPACK_Association.h"
+#include "ace/SOCK_SEQPACK_Acceptor.h"
+#include "ace/SOCK_SEQPACK_Connector.h"
+
+extern "C" {
+#include <netinet/sctp.h>
+};
+
+
+typedef ACE_Unbounded_Set <ACE_CString> Interface_Seq;
+typedef ACE_Unbounded_Set_Iterator <ACE_CString> Interface_Seq_Itor;
+
+//typedef auto_ptr <Interface_Seq> Interface_Seq_Ptr;
+typedef ACE_Hash_Map_Manager <ACE_CString,Interface_Seq,ACE_Null_Mutex> Secondary_Addr_Map;
+typedef ACE_Hash_Map_Entry <ACE_CString,Interface_Seq> Secondary_Addr_Map_Entry;
+typedef ACE_Hash_Map_Iterator <ACE_CString,Interface_Seq,ACE_Null_Mutex> Secondary_Addr_Map_Iterator;
+
+//Secondary_Addr_Map sec_addr_map_;
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Factory
+ * @brief
+ */
+class TAO_AV_Export TAO_AV_SCTP_SEQ_Factory : public TAO_AV_Transport_Factory
+{
+public:
+ /// Initialization hook.
+ TAO_AV_SCTP_SEQ_Factory (void);
+ virtual ~TAO_AV_SCTP_SEQ_Factory (void);
+ virtual int init (int argc, char *argv[]);
+ virtual int match_protocol (const char *protocol_string);
+ virtual TAO_AV_Acceptor *make_acceptor (void);
+ virtual TAO_AV_Connector *make_connector (void);
+};
+
+class TAO_AV_SCTP_SEQ_Flow_Handler;
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Transport
+ * @brief A transport abstraction for udp sockets.
+ * Uses the ACE_SOCK_Dgram to send the data.
+ */
+class TAO_AV_Export TAO_AV_SCTP_SEQ_Transport
+ :public TAO_AV_Transport
+{
+public:
+ TAO_AV_SCTP_SEQ_Transport (void);
+
+ TAO_AV_SCTP_SEQ_Transport (TAO_AV_SCTP_SEQ_Flow_Handler *handler);
+
+ virtual ~TAO_AV_SCTP_SEQ_Transport (void);
+ virtual int open (ACE_Addr *addr);
+
+ virtual int close (void);
+
+ virtual int mtu (void);
+
+ virtual ACE_Addr *get_peer_addr (void);
+
+ /// Write the complete Message_Block chain to the connection.
+ virtual ssize_t send (const ACE_Message_Block *mblk,
+ ACE_Time_Value *s = 0);
+
+ /// Write the contents of the buffer of length len to the connection.
+ virtual ssize_t send (const char *buf,
+ size_t len,
+ ACE_Time_Value *s = 0);
+
+ /// Write the contents of iovcnt iovec's to the connection.
+ virtual ssize_t send (const iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *s = 0);
+
+ /// Read len bytes from into buf.
+ virtual ssize_t recv (char *buf,
+ size_t len,
+ ACE_Time_Value *s = 0);
+
+ /// Read len bytes from into buf using flags.
+ virtual ssize_t recv (char *buf,
+ size_t len,
+ int flags,
+ ACE_Time_Value *s = 0);
+
+ /// Read received data into the iovec buffers.
+ virtual ssize_t recv (iovec *iov,
+ int iovcnt,
+ ACE_Time_Value *s = 0);
+
+ TAO_AV_SCTP_SEQ_Flow_Handler *handler (void) { return this->handler_; }
+
+protected:
+ TAO_AV_SCTP_SEQ_Flow_Handler *handler_;
+ ACE_Addr *addr_;
+ ACE_INET_Addr peer_addr_;
+};
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Flow_Handler
+ * @brief
+ */
+class TAO_AV_SCTP_SEQ_Flow_Handler
+ :public virtual TAO_AV_Flow_Handler,
+ public ACE_Svc_Handler <ACE_SOCK_SEQPACK_ASSOCIATION, ACE_NULL_SYNCH>
+{
+public:
+ TAO_AV_SCTP_SEQ_Flow_Handler (TAO_AV_Callback *callback = 0);
+ virtual ~TAO_AV_SCTP_SEQ_Flow_Handler (void);
+ virtual TAO_AV_Transport *transport (void);
+ virtual int open (void * = 0);
+ virtual int handle_input (ACE_HANDLE fd);
+ virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg = 0);
+ virtual ACE_Event_Handler* event_handler (void){ return this; }
+protected:
+ TAO_AV_Core *av_core_;
+};
+
+class TAO_AV_SCTP_SEQ_Acceptor;
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Base_Acceptor
+ * @brief
+ */
+class TAO_AV_SCTP_SEQ_Base_Acceptor :public ACE_Acceptor <TAO_AV_SCTP_SEQ_Flow_Handler,ACE_SOCK_SEQPACK_ACCEPTOR>
+{
+ public:
+ virtual int acceptor_open (TAO_AV_SCTP_SEQ_Acceptor *acceptor,
+ ACE_Reactor *reactor,
+ const ACE_INET_Addr &local_addr,
+ TAO_FlowSpec_Entry *entry);
+ virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *& handler);
+ protected:
+ TAO_AV_SCTP_SEQ_Acceptor *acceptor_;
+ ACE_Reactor *reactor_;
+ TAO_FlowSpec_Entry *entry_;
+};
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Acceptor
+ * @brief
+ */
+class TAO_AV_Export TAO_AV_SCTP_SEQ_Acceptor
+ :public TAO_AV_Acceptor
+{
+public:
+ TAO_AV_SCTP_SEQ_Acceptor (void);
+ virtual ~TAO_AV_SCTP_SEQ_Acceptor (void);
+ virtual int open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory,
+ TAO_AV_Core::Flow_Component flow_comp =
+ TAO_AV_Core::TAO_AV_DATA);
+
+ virtual int open_default (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_FlowSpec_Entry *entry,
+ TAO_AV_Flow_Protocol_Factory *factory,
+ TAO_AV_Core::Flow_Component flow_comp =
+ TAO_AV_Core::TAO_AV_DATA);
+
+ virtual int close (void);
+ virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler);
+
+protected:
+ ACE_INET_Addr *address_;
+ TAO_AV_SCTP_SEQ_Base_Acceptor acceptor_;
+ TAO_Base_StreamEndPoint *endpoint_;
+ TAO_FlowSpec_Entry *entry_;
+ TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_;
+};
+
+class TAO_AV_SCTP_SEQ_Connector;
+
+/**
+ * @class TAO_AV_Base_Connector
+ * @brief
+ */
+class TAO_AV_SCTP_SEQ_Base_Connector : public ACE_Connector <TAO_AV_SCTP_SEQ_Flow_Handler,ACE_SOCK_SEQPACK_CONNECTOR>
+{
+public:
+ // To avoid warnings of open and connect hiding the base class functions these have to renamed.
+ int connector_open (TAO_AV_SCTP_SEQ_Connector *connector,
+ ACE_Reactor *reactor);
+ int connector_connect (TAO_AV_SCTP_SEQ_Flow_Handler *&handler,
+ const ACE_Multihomed_INET_Addr &remote_addr,
+ const ACE_Multihomed_INET_Addr &local_addr);
+ virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *& handler);
+protected:
+ TAO_AV_SCTP_SEQ_Connector *connector_;
+ ACE_Reactor *reactor_;
+};
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Connector
+ * @brief
+ */
+class TAO_AV_SCTP_SEQ_Connector : public TAO_AV_Connector
+{
+public:
+ TAO_AV_SCTP_SEQ_Connector (void);
+ virtual ~TAO_AV_SCTP_SEQ_Connector (void);
+
+ virtual int open (TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Core *av_core,
+ TAO_AV_Flow_Protocol_Factory *factory);
+
+ virtual int connect (TAO_FlowSpec_Entry *entry,
+ TAO_AV_Transport *&transport,
+ TAO_AV_Core::Flow_Component flow_comp =
+ TAO_AV_Core::TAO_AV_DATA);
+ virtual int close (void);
+ virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler);
+protected:
+ TAO_AV_Core *av_core_;
+ TAO_AV_SCTP_SEQ_Base_Connector connector_;
+ TAO_Base_StreamEndPoint *endpoint_;
+ TAO_FlowSpec_Entry *entry_;
+ TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_;
+};
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Object
+ * @brief TAO_AV_Protocol_Object for the User Datagram Protocol (SCTP_SEQ)
+ */
+class TAO_AV_Export TAO_AV_SCTP_SEQ_Object : public TAO_AV_Protocol_Object
+{
+public:
+ TAO_AV_SCTP_SEQ_Object (TAO_AV_Callback *callback,
+ TAO_AV_Transport *transport = 0);
+
+ /// Dtor
+ virtual ~TAO_AV_SCTP_SEQ_Object (void);
+
+ virtual int handle_input (void);
+
+ /// send a data frame.
+ virtual int send_frame (ACE_Message_Block *frame,
+ TAO_AV_frame_info *frame_info = 0);
+
+ virtual int send_frame (const iovec *iov,
+ int iovcnt,
+ TAO_AV_frame_info *frame_info = 0);
+
+ virtual int send_frame (const char*buf,
+ size_t len);
+
+ /// end the stream.
+ virtual int destroy (void);
+
+private:
+ /// Pre-allocated memory to receive the data...
+ ACE_Message_Block frame_;
+};
+
+/**
+ * @class TAO_AV_SCTP_SEQ_Flow_Factory
+ * @brief
+ */
+class TAO_AV_Export TAO_AV_SCTP_SEQ_Flow_Factory : public TAO_AV_Flow_Protocol_Factory
+{
+public:
+ /// Initialization hook.
+ TAO_AV_SCTP_SEQ_Flow_Factory (void);
+ virtual ~TAO_AV_SCTP_SEQ_Flow_Factory (void);
+ virtual int init (int argc, char *argv[]);
+ virtual int match_protocol (const char *flow_string);
+ TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry,
+ TAO_Base_StreamEndPoint *endpoint,
+ TAO_AV_Flow_Handler *handler,
+ TAO_AV_Transport *transport);
+};
+
+ACE_STATIC_SVC_DECLARE (TAO_AV_SCTP_SEQ_Flow_Factory)
+ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SCTP_SEQ_Flow_Factory)
+
+ACE_STATIC_SVC_DECLARE (TAO_AV_SCTP_SEQ_Factory)
+ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SCTP_SEQ_Factory)
+
+
+#include /**/ "ace/post.h"
+#endif /* TAO_AV_SCTP_SEQ_H */
diff --git a/TAO/orbsvcs/orbsvcs/Makefile.av b/TAO/orbsvcs/orbsvcs/Makefile.av
index 5b8223695e4..c5ed5958b14 100644
--- a/TAO/orbsvcs/orbsvcs/Makefile.av
+++ b/TAO/orbsvcs/orbsvcs/Makefile.av
@@ -70,6 +70,10 @@ ifeq ($(rapi),1)
AV/QoS_UDP
endif # rapi
+ifeq ($(sctp),openss7)
+ CPP_SRCS += AV/SCTP_SEQ
+endif # sctp
+
IDL_SRC = \
$(addsuffix S.cpp, $(IDL_FILES)) \
$(addsuffix C.cpp, $(IDL_FILES))
@@ -91,7 +95,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU
#----------------------------------------------------------------------------
LDFLAGS += -L$(TAO_ROOT)/tao -L$(TAO_ROOT)/orbsvcs/orbsvcs
-CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT)/orbsvcs/orbsvcs -DTAO_ORBSVCS_HAS_AV
+CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT)/orbsvcs/orbsvcs -DTAO_ORBSVCS_HAS_AV
ifeq ($(shared_libs),1)
ifneq ($(SHLIB),)
diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl
index e93ccdfbf52..1bcd4e6da62 100755
--- a/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl
+++ b/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl
@@ -107,6 +107,32 @@ if ($receiver != 0) {
$status = 1;
}
+$SV = new PerlACE::Process ("receiver", "-ORBInitRef NameService=file://$nsior -f sctp_output");
+$CL = new PerlACE::Process ("sender", "-ORBInitRef NameService=file://$nsior -p SCTP_SEQ");
+
+print STDERR "Using SCTP\n";
+print STDERR "Starting Receiver\n";
+
+$SV->Spawn ();
+
+sleep $sleeptime;
+
+print STDERR "Starting Sender\n";
+
+$sender = $CL->SpawnWaitKill (200);
+
+if ($sender != 0) {
+ print STDERR "ERROR: sender returned $sender\n";
+ $status = 1;
+}
+
+$receiver = $SV->TerminateWaitKill (5);
+
+if ($receiver != 0) {
+ print STDERR "ERROR: receiver returned $receiver\n";
+ $status = 1;
+}
+
$nserver = $NS->TerminateWaitKill (5);
if ($nserver != 0) {