diff options
author | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-10-18 14:11:45 +0000 |
---|---|---|
committer | yamuna <yamuna@ae88bc3d-4319-0410-8dbf-d08b4c9d3795> | 2003-10-18 14:11:45 +0000 |
commit | 09a189be12d2a96962aa5f536daba4515095d0b5 (patch) | |
tree | 2fce671add5fd5f91f6d23ca4e9165173577a6ed | |
parent | 4724f51e36663f90e94700d0a81e8a958b0eeb41 (diff) | |
download | ATCD-09a189be12d2a96962aa5f536daba4515095d0b5.tar.gz |
ChangelogTag: Sat Oct 18 10:16:20 2003 Yamuna Krishnamurthy <yamuna@oomworks.com>
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp | 65 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/AV_Core.h | 5 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp | 174 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h | 12 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i | 48 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp | 827 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h | 302 | ||||
-rw-r--r-- | TAO/orbsvcs/orbsvcs/Makefile.av | 6 | ||||
-rwxr-xr-x | TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl | 26 |
9 files changed, 1441 insertions, 24 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp index 031cef0c01c..08597e73734 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.cpp @@ -15,6 +15,10 @@ #include "orbsvcs/AV/QoS_UDP.h" #endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */ +#if defined (ACE_HAS_SCTP) +#include "orbsvcs/AV/SCTP_SEQ.h" +#endif // ACE_HAS_SCTP + #include "tao/debug.h" #include "tao/ORB_Core.h" @@ -251,7 +255,6 @@ TAO_AV_Core::init_forward_flows (TAO_Base_StreamEndPoint *endpoint, { if (entry->handler () != 0) { - //Yamuna:PLEASE CHECK THIS LATER #if defined ACE_HAS_RAPI || defined (ACE_HAS_WINSOCK2_GQOS) // For IN flows on the A side we should remove the handlers from the reactor. @@ -799,6 +802,37 @@ TAO_AV_Core::load_default_transport_factories (void) this->transport_factories_.insert (udp_qos_item); #endif /* ACE_HAS_RAPI || ACE_HAS_WINSOCK2_GQOS */ +#if defined ACE_HAS_SCTP + const char *sctp_seq_factory_str = "SCTP_SEQ_Factory"; + + TAO_AV_Transport_Factory *sctp_seq_factory = 0; + TAO_AV_Transport_Item *sctp_seq_item = 0; + + sctp_seq_factory = + ACE_Dynamic_Service<TAO_AV_Transport_Factory>::instance (sctp_seq_factory_str); + if (sctp_seq_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "SCTP SEQ Factory")); + + ACE_NEW_RETURN (sctp_seq_factory, + TAO_AV_SCTP_SEQ_Factory, + -1); + } + else sctp_seq_factory->ref_count = 1; + + ACE_NEW_RETURN (sctp_seq_item, + TAO_AV_Transport_Item ("SCTP_SEQ_Factory"), + -1); + + sctp_seq_item->factory (sctp_seq_factory); + + this->transport_factories_.insert (sctp_seq_item); +#endif /* ACE_HAS_SCTP */ + return 0; } @@ -912,6 +946,35 @@ TAO_AV_Core::load_default_flow_protocol_factories (void) #endif /* defined (ACE_HAS_RAPI) || defined (ACE_HAS_WINSOCK2_GQOS) */ +#if defined ACE_HAS_SCTP + + const char *sctp_seq_flow = "SCTP_SEQ_Flow_Factory"; + TAO_AV_Flow_Protocol_Factory *sctp_seq_flow_factory = 0; + TAO_AV_Flow_Protocol_Item *sctp_seq_flow_item = 0; + + sctp_seq_flow_factory = + ACE_Dynamic_Service<TAO_AV_Flow_Protocol_Factory>::instance (sctp_seq_flow); + if (sctp_seq_flow_factory == 0) + { + if (TAO_debug_level) + ACE_ERROR ((LM_WARNING, + "(%P|%t) WARNING - No %s found in Service Repository." + " Using default instance.\n", + "SCTP SEQ Flow Factory")); + + ACE_NEW_RETURN (sctp_seq_flow_factory, + TAO_AV_SCTP_SEQ_Flow_Factory, + -1); + } + else sctp_seq_flow_factory->ref_count = 1; + + ACE_NEW_RETURN (sctp_seq_flow_item, TAO_AV_Flow_Protocol_Item ("SCTP_SEQ_Flow_Factory"), -1); + sctp_seq_flow_item->factory (sctp_seq_flow_factory); + + this->flow_protocol_factories_.insert (sctp_seq_flow_item); + +#endif /* ACE_HAS_SCTP */ + TAO_AV_Flow_Protocol_Factory *tcp_flow_factory = 0; TAO_AV_Flow_Protocol_Item *tcp_item = 0; diff --git a/TAO/orbsvcs/orbsvcs/AV/AV_Core.h b/TAO/orbsvcs/orbsvcs/AV/AV_Core.h index 77f6e89a218..d6ac2037153 100644 --- a/TAO/orbsvcs/orbsvcs/AV/AV_Core.h +++ b/TAO/orbsvcs/orbsvcs/AV/AV_Core.h @@ -70,8 +70,9 @@ public: TAO_AV_RTP_UDP_MCAST = 10, TAO_AV_SFP_UDP_MCAST = 11, TAO_AV_QOS_UDP = 12, - TAO_AV_USERDEFINED_UDP = 13, - TAO_AV_USERDEFINED_UDP_MCAST = 14 + TAO_AV_USERDEFINED_UDP = 13, + TAO_AV_USERDEFINED_UDP_MCAST = 14, + TAO_AV_SCTP_SEQ = 15 }; enum Flow_Component diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp index d867b376023..e765fbdfe43 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.cpp @@ -33,6 +33,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (void) entry_ (), is_multicast_ (0), peer_addr_ (0), + local_sec_addr_ (0), + num_local_sec_addrs_ (0), + peer_sec_addr_ (0), + num_peer_sec_addrs_ (0), peer_control_addr_ (0), local_addr_ (0), local_control_addr_ (0), @@ -69,6 +73,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, entry_ (), is_multicast_ (0), peer_addr_ (0), + local_sec_addr_ (0), + num_local_sec_addrs_ (0), + peer_sec_addr_ (0), + num_peer_sec_addrs_ (0), peer_control_addr_ (0), local_addr_ (0), local_control_addr_ (0), @@ -105,6 +113,10 @@ TAO_FlowSpec_Entry::TAO_FlowSpec_Entry (const char *flowname, entry_ (), is_multicast_ (0), peer_addr_ (0), + local_sec_addr_ (0), + num_local_sec_addrs_ (0), + peer_sec_addr_ (0), + num_peer_sec_addrs_ (0), peer_control_addr_ (0), local_addr_ (0), local_control_addr_ (0), @@ -128,8 +140,8 @@ TAO_FlowSpec_Entry::~TAO_FlowSpec_Entry (void) delete address_; if (this->clean_up_control_address_) delete control_address_; - delete local_addr_; - delete local_control_addr_; + if (local_control_addr_ != 0) + delete local_control_addr_; } int @@ -139,6 +151,8 @@ TAO_FlowSpec_Entry::set_protocol (void) { if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"TCP") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_TCP; + else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"SCTP_SEQ") == 0) + this->protocol_ = TAO_AV_Core::TAO_AV_SCTP_SEQ; else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"UDP") == 0) this->protocol_ = TAO_AV_Core::TAO_AV_UDP; else if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"QoS_UDP") == 0) @@ -218,6 +232,9 @@ int TAO_FlowSpec_Entry::parse_address (const char *address, TAO_AV_Core::Flow_Component flow_comp) { + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s\n", address)); + if (address == 0) return 0; if (ACE_OS::strcmp (address,"") == 0) @@ -232,11 +249,38 @@ TAO_FlowSpec_Entry::parse_address (const char *address, if (protocol_tokenizer [1] != 0) { + ACE_DEBUG ((LM_DEBUG, + "Protocol tokenixer is not null\n")); if ((flow_comp == TAO_AV_Core::TAO_AV_DATA) || + //(flow_comp == TAO_AV_Core::TAO_AV_BOTH) || (flow_comp == TAO_AV_Core::TAO_AV_CONTROL) ) { ACE_CString addr; - addr += protocol_tokenizer[1]; + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + TAO_Tokenizer addr_token (protocol_tokenizer [1], ';'); + + ACE_DEBUG ((LM_DEBUG, + "Number of local sec addresses = %d\n", + addr_token.num_tokens () - 1)); + + if (addr_token.num_tokens () != 0) + { + addr += addr_token [0]; + ACE_NEW_RETURN (local_sec_addr_, char* [addr_token.num_tokens () - 1],-1); + for (int j = 1; j <= addr_token.num_tokens () - 1; j++) + { + ACE_DEBUG ((LM_DEBUG, + "adding addresses to sequence %s\n", + addr_token [j])); + + local_sec_addr_ [j-1] = CORBA::string_dup (addr_token [j]); + } + num_local_sec_addrs_ = addr_token.num_tokens () - 1; + } + } + else addr += protocol_tokenizer[1]; + switch (this->protocol_) { @@ -244,6 +288,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address, case TAO_AV_Core::TAO_AV_USERDEFINED_UDP: case TAO_AV_Core::TAO_AV_RTP_UDP: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: case TAO_AV_Core::TAO_AV_UDP: case TAO_AV_Core::TAO_AV_QOS_UDP: { @@ -264,9 +309,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address, this->control_address_ = inet_addr; } - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,inet_addr->get_ip_address () )); - if (IN_CLASSD (inet_addr->get_ip_address ())) { if (TAO_debug_level > 0) @@ -299,6 +341,10 @@ TAO_FlowSpec_Entry::parse_address (const char *address, } else { + ACE_DEBUG ((LM_DEBUG, + "AV BOTH %s \n", + protocol_tokenizer[1])); + TAO_Tokenizer address_tokenizer (protocol_tokenizer[1], ':'); TAO_Tokenizer port_tokenizer (address_tokenizer[1], ';'); ACE_CString addr; @@ -306,6 +352,28 @@ TAO_FlowSpec_Entry::parse_address (const char *address, addr += ":"; addr += port_tokenizer[0]; + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + + ACE_DEBUG ((LM_DEBUG, + "Number of local sec addresses = %d\n", + port_tokenizer.num_tokens () - 1)); + + if (port_tokenizer.num_tokens () - 1 != 0) + { + ACE_NEW_RETURN (local_sec_addr_, char* [port_tokenizer.num_tokens () - 1],-1); + for (int j = 1; j <= port_tokenizer.num_tokens () - 1; j++) + { + ACE_DEBUG ((LM_DEBUG, + "adding addresses to sequence %s\n", + port_tokenizer [j])); + + local_sec_addr_ [j-1] = CORBA::string_dup (port_tokenizer [j]); + } + num_local_sec_addrs_ = port_tokenizer.num_tokens () - 1; + } + } + short control_port = ACE_OS::atoi(port_tokenizer[0]) + 1; char control_port_str[6]; sprintf (control_port_str, "%d", control_port); @@ -327,6 +395,7 @@ TAO_FlowSpec_Entry::parse_address (const char *address, case TAO_AV_Core::TAO_AV_USERDEFINED_UDP: case TAO_AV_Core::TAO_AV_RTP_UDP: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: case TAO_AV_Core::TAO_AV_UDP: case TAO_AV_Core::TAO_AV_QOS_UDP: { @@ -337,8 +406,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address, -1); this->clean_up_address_ = 1; this->address_ = inet_addr; - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,inet_addr->get_ip_address () )); if (ACE_OS::strcasecmp (this->carrier_protocol_.c_str(),"RTP/UDP") == 0) { @@ -348,8 +415,6 @@ TAO_FlowSpec_Entry::parse_address (const char *address, -1); this->clean_up_control_address_ = 1; this->control_address_ = control_inet_addr; - if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG, "TAO_FlowSpec_Entry::parse_address %s %x\n", address,control_inet_addr->get_ip_address () )); } if (IN_CLASSD (inet_addr->get_ip_address ())) @@ -383,6 +448,8 @@ TAO_FlowSpec_Entry::parse_address (const char *address, } } } + ACE_DEBUG ((LM_DEBUG, + "Return from parse address\n")); return 0; } @@ -485,10 +552,41 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry) if (tokenizer [TAO_AV_PEER_ADDR] != 0) { - ACE_INET_Addr *addr; - ACE_NEW_RETURN (addr, - ACE_INET_Addr (tokenizer [TAO_AV_PEER_ADDR]), - 0); + ACE_INET_Addr *addr = 0; + + + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + TAO_Tokenizer addr_token (tokenizer [TAO_AV_PEER_ADDR], ';'); + + ACE_DEBUG ((LM_DEBUG, + "Number of peer sec addresses = %d\n", + addr_token.num_tokens () - 1)); + + if (addr_token.num_tokens () != 0) + { + ACE_NEW_RETURN (addr, + ACE_INET_Addr (addr_token [0]), + 0); + + ACE_NEW_RETURN (peer_sec_addr_, char* [addr_token.num_tokens () - 1],-1); + for (int j = 1; j <= addr_token.num_tokens () - 1; j++) + { + ACE_DEBUG ((LM_DEBUG, + "adding addresses to sequence %s\n", + addr_token [j])); + + peer_sec_addr_ [j-1] = CORBA::string_dup (addr_token [j]); + } + num_peer_sec_addrs_ = addr_token.num_tokens () - 1; + } + } + else + { + ACE_NEW_RETURN (addr, + ACE_INET_Addr (tokenizer [TAO_AV_PEER_ADDR]), + 0); + } this->peer_addr_ = addr; char buf [BUFSIZ]; @@ -496,12 +594,13 @@ TAO_Forward_FlowSpec_Entry::parse (const char *flowSpec_entry) ACE_DEBUG ((LM_DEBUG, "Peer Address %s \n", buf)); - } - + + } + if (tokenizer [TAO_AV_FLOW_PROTOCOL] != 0) if (this->parse_flow_protocol_string (tokenizer [TAO_AV_FLOW_PROTOCOL]) < 0) return -1; - + return 0; } @@ -529,7 +628,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) { if (this->flowname_.length() == 0) return ""; - + char address [BUFSIZ]; ACE_CString address_str; ACE_CString peer_address_str; @@ -548,6 +647,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: { ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->address_); inet_addr->addr_to_string (address,BUFSIZ); @@ -562,6 +662,15 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) address_str += "="; address_str += cstring; + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + for (int i = 0; i < this->num_local_sec_addrs_; i++) + { + address_str += ";"; + address_str += this->local_sec_addr_ [i]; + } + } + } else { @@ -587,6 +696,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: { ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->address_); control_port = inet_addr->get_port_number() + 1; @@ -617,6 +727,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: { ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->peer_addr_); inet_addr->addr_to_string (address,BUFSIZ); @@ -635,6 +746,15 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) //peer_address_str = this->carrier_protocol_; //peer_address_str += "="; peer_address_str += cstring; + + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + for (int i = 0; i < this->num_peer_sec_addrs_; i++) + { + peer_address_str += ";"; + peer_address_str += this->peer_sec_addr_ [i]; + } + } } @@ -654,6 +774,7 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: { ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->control_address_); control_port = inet_addr->get_port_number(); @@ -684,9 +805,11 @@ TAO_Forward_FlowSpec_Entry::entry_to_string (void) this->entry_ += "\\"; this->entry_ += peer_address_str; } + else ACE_DEBUG ((LM_DEBUG, + "No peer address specified\n")); if (TAO_debug_level > 0) - ACE_DEBUG ((LM_DEBUG,"entry_to_string: entry = %s\n",this->entry_.c_str())); + ACE_DEBUG ((LM_DEBUG,"Forward entry_to_string: entry = %s\n",this->entry_.c_str())); return this->entry_.c_str(); } @@ -804,6 +927,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: case TAO_AV_Core::TAO_AV_SFP_UDP: case TAO_AV_Core::TAO_AV_USERDEFINED_UDP: { @@ -820,6 +944,15 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void) address_str += "="; address_str += cstring; + if (this->protocol_ == TAO_AV_Core::TAO_AV_SCTP_SEQ) + { + for (int i = 0; i < this->num_local_sec_addrs_; i++) + { + address_str += ";"; + address_str += this->local_sec_addr_ [i]; + } + } + } else { @@ -843,6 +976,7 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void) case TAO_AV_Core::TAO_AV_QOS_UDP: case TAO_AV_Core::TAO_AV_UDP_MCAST: case TAO_AV_Core::TAO_AV_TCP: + case TAO_AV_Core::TAO_AV_SCTP_SEQ: { ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr*,this->control_address_); control_port = inet_addr->get_port_number(); @@ -869,6 +1003,6 @@ TAO_Reverse_FlowSpec_Entry::entry_to_string (void) // this->entry_ += "\\"; // this->entry_ += format_; - if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"entry_to_string: entry = %s\n",this->entry_.c_str() )); + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"Reverse entry_to_string: entry = %s\n",this->entry_.c_str() )); return this->entry_.c_str(); } diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h index 08fcaf6baf2..e171286341a 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.h @@ -152,6 +152,14 @@ public: int set_peer_control_addr (ACE_Addr *peer_control_addr); ACE_Addr *get_peer_control_addr (void); + int set_local_sec_addr (char** local_sec_addr, int size); + char** get_local_sec_addr (void); + int num_local_sec_addrs (void); + + int set_peer_sec_addr (char** peer_sec_addr, int size); + char** get_peer_sec_addr (void); + int num_peer_sec_addrs (void); + int set_local_addr (ACE_Addr *local_addr); ACE_Addr *get_local_addr (void); char *get_local_addr_str (void); @@ -232,6 +240,10 @@ protected: int is_multicast_; ACE_Addr *peer_addr_; + char** local_sec_addr_; + int num_local_sec_addrs_; + char** peer_sec_addr_; + int num_peer_sec_addrs_; ACE_Addr *peer_control_addr_; ACE_Addr *local_addr_; ACE_Addr *local_control_addr_; diff --git a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i index 3a940c75963..74f705772a2 100644 --- a/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i +++ b/TAO/orbsvcs/orbsvcs/AV/FlowSpec_Entry.i @@ -178,6 +178,54 @@ TAO_FlowSpec_Entry::get_local_control_addr (void) } ACE_INLINE +int +TAO_FlowSpec_Entry::set_local_sec_addr (char** local_sec_addr, + int size) +{ + this->local_sec_addr_ = local_sec_addr; + this->num_local_sec_addrs_ = size; + return 0; +} + +ACE_INLINE +char** +TAO_FlowSpec_Entry::get_local_sec_addr (void) +{ + return this->local_sec_addr_; +} + +ACE_INLINE +int +TAO_FlowSpec_Entry::num_local_sec_addrs (void) +{ + return this->num_local_sec_addrs_; +} + +ACE_INLINE +int +TAO_FlowSpec_Entry::set_peer_sec_addr (char** peer_sec_addr, + int size) +{ + this->peer_sec_addr_ = peer_sec_addr; + this->num_peer_sec_addrs_ = size; + return 0; +} + +ACE_INLINE +char** +TAO_FlowSpec_Entry::get_peer_sec_addr (void) +{ + return this->peer_sec_addr_; +} + +ACE_INLINE +int +TAO_FlowSpec_Entry::num_peer_sec_addrs (void) +{ + return this->num_peer_sec_addrs_; +} + +ACE_INLINE TAO_AV_Transport* TAO_FlowSpec_Entry::transport (void) { diff --git a/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp new file mode 100644 index 00000000000..e59c9ed57e1 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.cpp @@ -0,0 +1,827 @@ +// $Id$ + +#include "SCTP_SEQ.h" +#include "AVStreams_i.h" +#include "ace/Multihomed_INET_Addr.h" +#include "tao/debug.h" +#include "ace/Arg_Shifter.h" + +ACE_RCSID (AV, + SCTP_SEQ, + "$Id$") + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Transport +//------------------------------------------------------------ + +TAO_AV_SCTP_SEQ_Transport::TAO_AV_SCTP_SEQ_Transport (void) + :handler_ (0) +{ +} + +TAO_AV_SCTP_SEQ_Transport::TAO_AV_SCTP_SEQ_Transport (TAO_AV_SCTP_SEQ_Flow_Handler *handler) + :handler_ (handler) +{ +} + +TAO_AV_SCTP_SEQ_Transport::~TAO_AV_SCTP_SEQ_Transport (void) +{ +} + +int +TAO_AV_SCTP_SEQ_Transport::open (ACE_Addr * /*address*/) +{ + return 0; +} + +int +TAO_AV_SCTP_SEQ_Transport::close (void) +{ + return 0; +} + +int +TAO_AV_SCTP_SEQ_Transport::mtu (void) +{ + return 0; +} + +ACE_Addr* +TAO_AV_SCTP_SEQ_Transport::get_peer_addr (void) +{ + return 0; +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::send (const ACE_Message_Block *mblk, ACE_Time_Value *) +{ + // For the most part this was copied from GIOP::send_request and + // friends. + + // ACE_Time_Value timeout; + + iovec iov[ACE_IOV_MAX]; + int iovcnt = 0; + ssize_t n = 0; + ssize_t nbytes = 0; + + for (const ACE_Message_Block *i = mblk; + i != 0; + i = i->cont ()) + { + // Make sure there is something to send! + if (i->length () > 0) + { + iov[iovcnt].iov_base = i->rd_ptr (); + iov[iovcnt].iov_len = ACE_static_cast (u_long, i->length ()); + iovcnt++; + + // The buffer is full make a OS call. @@ TODO this should + // be optimized on a per-platform basis, for instance, some + // platforms do not implement writev() there we should copy + // the data into a buffer and call send_n(). In other cases + // there may be some limits on the size of the iovec, there + // we should set ACE_IOV_MAX to that limit. + if (iovcnt == ACE_IOV_MAX) + { + n = this->handler_->peer ().sendv_n ((const iovec *) iov, + iovcnt); + //&timeout); + if (n < 1) + return n; + + nbytes += n; + iovcnt = 0; + } + } + } + + // Check for remaining buffers to be sent! + if (iovcnt != 0) + { + n = this->handler_->peer ().sendv_n ((const iovec *) iov, + iovcnt); + if (n < 1) + return n; + + nbytes += n; + } + + return nbytes; +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::send (const char *buf, + size_t len, + ACE_Time_Value *) +{ + return this->handler_->peer ().send_n (buf, len); +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::send (const iovec *iov, + int iovcnt, + ACE_Time_Value *) +{ + return this->handler_->peer ().sendv_n (iov, + iovcnt); +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::recv (char *buf, + size_t len, + ACE_Time_Value *) +{ + return this->handler_->peer ().recv (buf, len); +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::recv (char *buf, + size_t len, + int flags, + ACE_Time_Value *) +{ + return this->handler_->peer ().recv (buf, + len, + flags); +} + +ssize_t +TAO_AV_SCTP_SEQ_Transport::recv (iovec *iov, + int iovcnt, + ACE_Time_Value *) +{ + return handler_->peer ().recvv_n (iov, iovcnt); +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Base_Acceptor +//------------------------------------------------------------ + +int +TAO_AV_SCTP_SEQ_Base_Acceptor::acceptor_open (TAO_AV_SCTP_SEQ_Acceptor *acceptor, + ACE_Reactor *reactor, + const ACE_INET_Addr &local_addr, + TAO_FlowSpec_Entry *entry) +{ + ACE_DEBUG ((LM_DEBUG, + "In base acceptor open")); + + this->acceptor_ = acceptor; + this->reactor_ = reactor; + this->entry_ = entry; + + ACE_UINT32 local_ip_addr [entry->num_local_sec_addrs ()]; + ACE_INET_Addr ip_addr; + char** addrs = entry->get_local_sec_addr (); + for (int i = 0; i < entry->num_local_sec_addrs (); i++) + { + ACE_CString addr_str (addrs[i]); + addr_str += ":"; + ip_addr.set (addr_str.c_str ()); + local_ip_addr [i] = ip_addr.get_ip_address (); + } + + ACE_Multihomed_INET_Addr multi_addr; + multi_addr.set (local_addr.get_port_number (), + local_addr.get_ip_address (), + 1, + local_ip_addr, + entry->num_local_sec_addrs ()); + + int result = this->open (multi_addr,reactor); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Acceptor::open failed\n"),-1); + + return 0; +} + +int +TAO_AV_SCTP_SEQ_Base_Acceptor::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler) +{ + int result = this->acceptor_->make_svc_handler (handler); + if (result < 0) + return result; + handler->reactor (this->reactor_); + this->entry_->handler (handler); + + + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Acceptor +//------------------------------------------------------------ + +TAO_AV_SCTP_SEQ_Acceptor::TAO_AV_SCTP_SEQ_Acceptor (void) +{ +} + +TAO_AV_SCTP_SEQ_Acceptor::~TAO_AV_SCTP_SEQ_Acceptor (void) +{ +} + +int +TAO_AV_SCTP_SEQ_Acceptor::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler) +{ + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_SCTP_SEQ_Acceptor::make_svc_handler\n" + )); + + if (this->endpoint_ != 0) + { + ACE_NEW_RETURN (sctp_handler, + TAO_AV_SCTP_SEQ_Flow_Handler, + -1); + + TAO_AV_Protocol_Object *object = + this->flow_protocol_factory_->make_protocol_object (this->entry_, + this->endpoint_, + sctp_handler, + sctp_handler->transport ()); + + sctp_handler->protocol_object (object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),sctp_handler); + this->entry_->protocol_object (object); + this->entry_->handler (sctp_handler); + } + return 0; +} + +int +TAO_AV_SCTP_SEQ_Acceptor::open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory, + TAO_AV_Core::Flow_Component flow_comp) +{ + this->flow_protocol_factory_ = factory; + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_SCTP_SEQ_Acceptor::open ")); + + this->av_core_ = av_core; + this->endpoint_ = endpoint; + this->entry_ = entry; + if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL) + this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ()); + else + this->flowname_ = entry->flowname (); + ACE_Addr *address = entry->address (); + + ACE_INET_Addr *inet_addr = (ACE_INET_Addr *) address; + + inet_addr->set (inet_addr->get_port_number (), + inet_addr->get_host_name ()); + + char buf[BUFSIZ]; + inet_addr->addr_to_string (buf, + BUFSIZ); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_SCTP_SEQ_Acceptor::open: %s", + buf + )); + + //Add code for reading multihomed addresses and pass the multihomed + //addr to the following method. ACE_Multihomed_addr derives from + //ACE_INET_Addr, hence this should not be an issue. + int result = this->acceptor_.acceptor_open (this, + av_core->reactor (), + *inet_addr, + entry); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_AV_SCTP_SEQ_Acceptor::open failed"), + -1); + + entry->set_local_addr (address); + + + return 0; +} + +int +TAO_AV_SCTP_SEQ_Acceptor::open_default (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory, + TAO_AV_Core::Flow_Component flow_comp) +{ + this->flow_protocol_factory_ = factory; + this->av_core_ = av_core; + this->endpoint_ = endpoint; + this->entry_ = entry; + if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL) + this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname()); + else + this->flowname_ = entry->flowname (); + + ACE_INET_Addr *address; + ACE_NEW_RETURN (address, + ACE_INET_Addr ("0"), + -1); + + int result = this->acceptor_.acceptor_open (this, + av_core->reactor (), + *address, + entry); + + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR, + "TAO_AV_SCTP_SEQ_Acceptor::open failed"), + -1); + + this->acceptor_.acceptor ().get_local_addr (*address); + + address->set (address->get_port_number (), + address->get_host_name ()); + + char buf[BUFSIZ]; + address->addr_to_string (buf,BUFSIZ); + + if (TAO_debug_level > 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_SCTP_SEQ_Acceptor::open_default: %s\n", + buf)); + + entry->set_local_addr (address); + + return 0; +} + + +int +TAO_AV_SCTP_SEQ_Acceptor::close (void) +{ + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Base_Connector +//------------------------------------------------------------ + +int +TAO_AV_SCTP_SEQ_Base_Connector::connector_open (TAO_AV_SCTP_SEQ_Connector *connector, + ACE_Reactor *reactor) +{ + this->connector_ = connector; + this->reactor_ = reactor; + + int result = this->open (reactor); + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Connector::open failed\n"),-1); + return 0; +} + +int +TAO_AV_SCTP_SEQ_Base_Connector::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler) +{ + int result = + this->connector_->make_svc_handler (sctp_handler); + if (result < 0) + return result; + sctp_handler->reactor (this->reactor_); + return 0; +} + +int +TAO_AV_SCTP_SEQ_Base_Connector::connector_connect (TAO_AV_SCTP_SEQ_Flow_Handler *&handler, + const ACE_Multihomed_INET_Addr &remote_addr, + const ACE_Multihomed_INET_Addr &local_addr) +{ + int result = this->connect (handler, + remote_addr, + 0, + local_addr); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Base_Connector::connect failed\n"),-1); + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Connector +//------------------------------------------------------------ +TAO_AV_SCTP_SEQ_Connector::TAO_AV_SCTP_SEQ_Connector (void) +{ +} + +TAO_AV_SCTP_SEQ_Connector::~TAO_AV_SCTP_SEQ_Connector (void) +{ +} + +int +TAO_AV_SCTP_SEQ_Connector::make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&sctp_handler) +{ + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Connector::make_svc_handler\n")); + // TAO_AV_Callback *callback = 0; + if (this->endpoint_ != 0) + { + // this->endpoint_->get_callback (this->flowname_.c_str (), + // callback); + ACE_NEW_RETURN (sctp_handler, + // TAO_AV_SCTP_SEQ_Flow_Handler (callback), + TAO_AV_SCTP_SEQ_Flow_Handler, + -1); + TAO_AV_Protocol_Object *object = + this->flow_protocol_factory_->make_protocol_object (this->entry_, + this->endpoint_, + sctp_handler, + sctp_handler->transport ()); + sctp_handler->protocol_object (object); + // callback->protocol_object (object); + // this->endpoint_->set_protocol_object (this->flowname_.c_str (), + // object); + this->endpoint_->set_flow_handler (this->flowname_.c_str (),sctp_handler); + this->entry_->protocol_object (object); + this->entry_->handler (sctp_handler); + } + return 0; +} + +int +TAO_AV_SCTP_SEQ_Connector::open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_AV_Flow_Protocol_Factory *factory) + +{ + this->endpoint_ = endpoint; + this->flow_protocol_factory_ = factory; + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Connector::open ")); + int result = this->connector_.connector_open(this, + av_core->reactor ()); + return result; +} + +int +TAO_AV_SCTP_SEQ_Connector::connect (TAO_FlowSpec_Entry *entry, + TAO_AV_Transport *&transport, + TAO_AV_Core::Flow_Component flow_comp) +{ + this->entry_ = entry; + if (flow_comp == TAO_AV_Core::TAO_AV_CONTROL) + this->flowname_ = TAO_AV_Core::get_control_flowname (entry->flowname ()); + else + this->flowname_ = entry->flowname (); + ACE_Addr *remote_addr = entry->address (); + ACE_INET_Addr *inet_addr = ACE_dynamic_cast (ACE_INET_Addr *,remote_addr); + TAO_AV_SCTP_SEQ_Flow_Handler *handler = 0; + + ACE_Multihomed_INET_Addr remote_multi_addr; + remote_multi_addr.set (inet_addr->get_port_number (), + inet_addr->get_ip_address (), + 1, + 0, + 0); + + ACE_Multihomed_INET_Addr local_addr; //This can be a multihomed address + ACE_INET_Addr *addr; + if (entry->get_peer_addr () != 0) + { + addr = ACE_dynamic_cast (ACE_INET_Addr *, entry->get_peer_addr ()); + } + else + { + ACE_NEW_RETURN (addr, + ACE_INET_Addr ("0"), + -1); + } + + ACE_UINT32 local_ip_addr [entry->num_peer_sec_addrs ()]; + ACE_INET_Addr ip_addr; + char** addrs = entry->get_peer_sec_addr (); + for (int i = 0; i < entry->num_peer_sec_addrs (); i++) + { + ACE_CString addr_str (addrs[i]); + addr_str += ":"; + ip_addr.set (addr_str.c_str ()); + local_ip_addr [i] = ip_addr.get_ip_address (); + } + + local_addr.set (addr->get_port_number (), + addr->get_ip_address (), + 1, + local_ip_addr, + entry->num_peer_sec_addrs ()); + + int result = this->connector_.connector_connect (handler, + remote_multi_addr, + local_addr); + + if (result < 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_connector::connect failed\n"),-1); + entry->handler (handler); + transport = handler->transport (); + + if (TAO_debug_level > 0) + { + ACE_DEBUG ((LM_DEBUG, + "Local Addrs\n")); + char buf [BUFSIZ]; + size_t size = BUFSIZ; + ACE_INET_Addr *peer_addrs; + ACE_NEW_RETURN (peer_addrs,ACE_INET_Addr [size], -1); + handler->peer ().get_local_addrs (peer_addrs, size); + for (unsigned int i=0; i < size;i++) + { + peer_addrs [i].addr_to_string (buf, + BUFSIZ); + ACE_DEBUG ((LM_DEBUG, + "%s %d\n", + buf, + size)); + } + + ACE_DEBUG ((LM_DEBUG, + "Remote Addrs\n")); + + size = BUFSIZ; + handler->peer ().get_remote_addrs (peer_addrs, size); + for (unsigned int i=0; i < size;i++) + { + peer_addrs [i].addr_to_string (buf, + BUFSIZ); + ACE_DEBUG ((LM_DEBUG, + "%s %d\n", + buf, + size)); + } + delete peer_addrs; + } + + return 0; +} + +int +TAO_AV_SCTP_SEQ_Connector::close (void) +{ + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Protocol_Factory +//------------------------------------------------------------ + + +TAO_AV_SCTP_SEQ_Factory::TAO_AV_SCTP_SEQ_Factory (void) +{ +} + +TAO_AV_SCTP_SEQ_Factory::~TAO_AV_SCTP_SEQ_Factory (void) +{ +} + + +int +TAO_AV_SCTP_SEQ_Factory::match_protocol (const char *protocol_string) +{ + if (ACE_OS::strcasecmp (protocol_string,"SCTP_SEQ") == 0) + return 1; + return 0; +} + +TAO_AV_Acceptor* +TAO_AV_SCTP_SEQ_Factory::make_acceptor (void) +{ + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_acceptor\n")); + TAO_AV_Acceptor *acceptor = 0; + ACE_NEW_RETURN (acceptor, + TAO_AV_SCTP_SEQ_Acceptor, + 0); + return acceptor; +} + +TAO_AV_Connector* +TAO_AV_SCTP_SEQ_Factory::make_connector (void) +{ + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG,"TAO_AV_SCTP_SEQ_Factory::make_connector\n")); + TAO_AV_Connector *connector = 0; + ACE_NEW_RETURN (connector, + TAO_AV_SCTP_SEQ_Connector, + 0); + return connector; +} + + +int +TAO_AV_SCTP_SEQ_Factory::init (int, + char *[]) +{ + + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Object +//------------------------------------------------------------ + +int +TAO_AV_SCTP_SEQ_Object::handle_input (void) +{ + int n = this->transport_->recv (this->frame_.rd_ptr (), + this->frame_.size ()); + if (n == -1) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input recv failed\n"),-1); + if (n == 0) + ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_SCTP_SEQ_Flow_Handler::handle_input connection closed\n"),-1); + this->frame_.wr_ptr (this->frame_.rd_ptr () + n); + + return this->callback_->receive_frame (&this->frame_); +} + +int +TAO_AV_SCTP_SEQ_Object::send_frame (ACE_Message_Block *frame, + TAO_AV_frame_info * /*frame_info*/) +{ + int result = this->transport_->send (frame); + return result; +} + +int +TAO_AV_SCTP_SEQ_Object::send_frame (const iovec *iov, + int iovcnt, + TAO_AV_frame_info * /*frame_info*/) +{ + return this->transport_->send (iov,iovcnt); +} + +int +TAO_AV_SCTP_SEQ_Object::send_frame (const char*buf, + size_t len) +{ + int result = this->transport_->send (buf, len, 0); + return result; +} + + +TAO_AV_SCTP_SEQ_Object::TAO_AV_SCTP_SEQ_Object (TAO_AV_Callback *callback, + TAO_AV_Transport *transport) + :TAO_AV_Protocol_Object (callback,transport) +{ + // @@ Is this a good size? + this->frame_.size (BUFSIZ); +} + +TAO_AV_SCTP_SEQ_Object::~TAO_AV_SCTP_SEQ_Object (void) +{ + // No-op +} +int +TAO_AV_SCTP_SEQ_Object::destroy (void) +{ + this->callback_->handle_destroy (); + delete this; + return 0; +} + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Flow_Handler +//------------------------------------------------------------ + +TAO_AV_SCTP_SEQ_Flow_Handler::TAO_AV_SCTP_SEQ_Flow_Handler (TAO_AV_Callback * /*callback*/) + // :TAO_AV_Flow_Handler (callback) +{ + ACE_NEW (this->transport_, + TAO_AV_SCTP_SEQ_Transport (this)); +} + +TAO_AV_SCTP_SEQ_Flow_Handler::~TAO_AV_SCTP_SEQ_Flow_Handler (void) +{ + delete this->transport_; +} + +TAO_AV_Transport * +TAO_AV_SCTP_SEQ_Flow_Handler::transport (void) +{ + return this->transport_; +} + +int +TAO_AV_SCTP_SEQ_Flow_Handler::open (void * /*arg*/) +{ + ACE_CDR::Long nodelay = 1; + +#if defined (SCTP_NODELAY) + if (this->peer ().set_option (IPPROTO_SCTP, + SCTP_NODELAY, + (void *) &nodelay, + sizeof (nodelay)) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + "NODELAY failed\n"), + -1); +#endif /* SCTP_NODELAY */ + + // Called by the <Strategy_Acceptor> when the handler is completely + // connected. + ACE_INET_Addr addr; + + if (this->peer ().get_remote_addr (addr) == -1) + return -1; + + char server[MAXHOSTNAMELEN + 16]; + + (void) addr.addr_to_string (server, sizeof (server)); + + if (TAO_debug_level > 0) + if (TAO_debug_level > 0) ACE_DEBUG ((LM_DEBUG, + "(%P|%t) connection to server <%s> on %d\n", + server, this->peer ().get_handle ())); + + this->peer ().disable (ACE_NONBLOCK); + // Register the handler with the reactor. + if (this->reactor () + && this->reactor ()->register_handler + (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("unable to register client handler")), + -1); + return 0; +} + +int +TAO_AV_SCTP_SEQ_Flow_Handler::handle_input (ACE_HANDLE /*fd*/) +{ + return this->protocol_object_->handle_input (); +} + +int +TAO_AV_SCTP_SEQ_Flow_Handler::handle_timeout (const ACE_Time_Value &tv, + const void *arg) +{ + return TAO_AV_Flow_Handler::handle_timeout (tv,arg); +} + + +//------------------------------------------------------------ +// TAO_AV_SCTP_SEQ_Flow_Factory +//------------------------------------------------------------ +TAO_AV_SCTP_SEQ_Flow_Factory::TAO_AV_SCTP_SEQ_Flow_Factory (void) +{ +} + +TAO_AV_SCTP_SEQ_Flow_Factory::~TAO_AV_SCTP_SEQ_Flow_Factory (void) +{ +} + +int +TAO_AV_SCTP_SEQ_Flow_Factory::init (int /* argc */, + char * /* argv */ []) +{ + return 0; +} + +int +TAO_AV_SCTP_SEQ_Flow_Factory::match_protocol (const char *flow_string) +{ + if (ACE_OS::strcasecmp (flow_string,"SCTP_SEQ") == 0) + return 1; + return 0; +} + +TAO_AV_Protocol_Object* +TAO_AV_SCTP_SEQ_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, + TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Flow_Handler *handler, + TAO_AV_Transport *transport) +{ + TAO_AV_Callback *callback = 0; + if( endpoint->get_callback (entry->flowname (), callback) ) { + ACE_ERROR_RETURN ((LM_ERROR, "(%N,%l) Invalid callback\n"), 0); + } + + TAO_AV_SCTP_SEQ_Object *object = 0; + ACE_NEW_RETURN (object, + TAO_AV_SCTP_SEQ_Object (callback, + transport), + 0); + callback->open (object, + handler); + endpoint->set_protocol_object (entry->flowname (), + object); + return object; +} + +ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Flow_Factory) +ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Flow_Factory, + ACE_TEXT ("SCTP_SEQ_Flow_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Flow_Factory), + ACE_Service_Type::DELETE_THIS | + ACE_Service_Type::DELETE_OBJ, + 0) + +ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_SCTP_SEQ_Factory) +ACE_STATIC_SVC_DEFINE (TAO_AV_SCTP_SEQ_Factory, + ACE_TEXT ("SCTP_SEQ_Factory"), + ACE_SVC_OBJ_T, + &ACE_SVC_NAME (TAO_AV_SCTP_SEQ_Factory), + ACE_Service_Type::DELETE_THIS | + ACE_Service_Type::DELETE_OBJ, + 0) diff --git a/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h new file mode 100644 index 00000000000..4cd811b3692 --- /dev/null +++ b/TAO/orbsvcs/orbsvcs/AV/SCTP_SEQ.h @@ -0,0 +1,302 @@ +/* -*- C++ -*- */ + +//============================================================================= +/** + * @file SCTP_SEQ.h + * + * $Id$ + * + * @author Yamuna Krishnamurthy <yamuna@oomworks.com> + */ +//============================================================================= + +#ifndef TAO_AV_SCTP_SEQ_H +#define TAO_AV_SCTP_SEQ_H +#include /**/ "ace/pre.h" + + +#include "ace/OS.h" +#include "ace/Auto_Ptr.h" +#include "Protocol_Factory.h" +#include "ace/SOCK_SEQPACK_Association.h" +#include "ace/SOCK_SEQPACK_Acceptor.h" +#include "ace/SOCK_SEQPACK_Connector.h" + +extern "C" { +#include <netinet/sctp.h> +}; + + +typedef ACE_Unbounded_Set <ACE_CString> Interface_Seq; +typedef ACE_Unbounded_Set_Iterator <ACE_CString> Interface_Seq_Itor; + +//typedef auto_ptr <Interface_Seq> Interface_Seq_Ptr; +typedef ACE_Hash_Map_Manager <ACE_CString,Interface_Seq,ACE_Null_Mutex> Secondary_Addr_Map; +typedef ACE_Hash_Map_Entry <ACE_CString,Interface_Seq> Secondary_Addr_Map_Entry; +typedef ACE_Hash_Map_Iterator <ACE_CString,Interface_Seq,ACE_Null_Mutex> Secondary_Addr_Map_Iterator; + +//Secondary_Addr_Map sec_addr_map_; + +/** + * @class TAO_AV_SCTP_SEQ_Factory + * @brief + */ +class TAO_AV_Export TAO_AV_SCTP_SEQ_Factory : public TAO_AV_Transport_Factory +{ +public: + /// Initialization hook. + TAO_AV_SCTP_SEQ_Factory (void); + virtual ~TAO_AV_SCTP_SEQ_Factory (void); + virtual int init (int argc, char *argv[]); + virtual int match_protocol (const char *protocol_string); + virtual TAO_AV_Acceptor *make_acceptor (void); + virtual TAO_AV_Connector *make_connector (void); +}; + +class TAO_AV_SCTP_SEQ_Flow_Handler; + +/** + * @class TAO_AV_SCTP_SEQ_Transport + * @brief A transport abstraction for udp sockets. + * Uses the ACE_SOCK_Dgram to send the data. + */ +class TAO_AV_Export TAO_AV_SCTP_SEQ_Transport + :public TAO_AV_Transport +{ +public: + TAO_AV_SCTP_SEQ_Transport (void); + + TAO_AV_SCTP_SEQ_Transport (TAO_AV_SCTP_SEQ_Flow_Handler *handler); + + virtual ~TAO_AV_SCTP_SEQ_Transport (void); + virtual int open (ACE_Addr *addr); + + virtual int close (void); + + virtual int mtu (void); + + virtual ACE_Addr *get_peer_addr (void); + + /// Write the complete Message_Block chain to the connection. + virtual ssize_t send (const ACE_Message_Block *mblk, + ACE_Time_Value *s = 0); + + /// Write the contents of the buffer of length len to the connection. + virtual ssize_t send (const char *buf, + size_t len, + ACE_Time_Value *s = 0); + + /// Write the contents of iovcnt iovec's to the connection. + virtual ssize_t send (const iovec *iov, + int iovcnt, + ACE_Time_Value *s = 0); + + /// Read len bytes from into buf. + virtual ssize_t recv (char *buf, + size_t len, + ACE_Time_Value *s = 0); + + /// Read len bytes from into buf using flags. + virtual ssize_t recv (char *buf, + size_t len, + int flags, + ACE_Time_Value *s = 0); + + /// Read received data into the iovec buffers. + virtual ssize_t recv (iovec *iov, + int iovcnt, + ACE_Time_Value *s = 0); + + TAO_AV_SCTP_SEQ_Flow_Handler *handler (void) { return this->handler_; } + +protected: + TAO_AV_SCTP_SEQ_Flow_Handler *handler_; + ACE_Addr *addr_; + ACE_INET_Addr peer_addr_; +}; + +/** + * @class TAO_AV_SCTP_SEQ_Flow_Handler + * @brief + */ +class TAO_AV_SCTP_SEQ_Flow_Handler + :public virtual TAO_AV_Flow_Handler, + public ACE_Svc_Handler <ACE_SOCK_SEQPACK_ASSOCIATION, ACE_NULL_SYNCH> +{ +public: + TAO_AV_SCTP_SEQ_Flow_Handler (TAO_AV_Callback *callback = 0); + virtual ~TAO_AV_SCTP_SEQ_Flow_Handler (void); + virtual TAO_AV_Transport *transport (void); + virtual int open (void * = 0); + virtual int handle_input (ACE_HANDLE fd); + virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg = 0); + virtual ACE_Event_Handler* event_handler (void){ return this; } +protected: + TAO_AV_Core *av_core_; +}; + +class TAO_AV_SCTP_SEQ_Acceptor; + +/** + * @class TAO_AV_SCTP_SEQ_Base_Acceptor + * @brief + */ +class TAO_AV_SCTP_SEQ_Base_Acceptor :public ACE_Acceptor <TAO_AV_SCTP_SEQ_Flow_Handler,ACE_SOCK_SEQPACK_ACCEPTOR> +{ + public: + virtual int acceptor_open (TAO_AV_SCTP_SEQ_Acceptor *acceptor, + ACE_Reactor *reactor, + const ACE_INET_Addr &local_addr, + TAO_FlowSpec_Entry *entry); + virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *& handler); + protected: + TAO_AV_SCTP_SEQ_Acceptor *acceptor_; + ACE_Reactor *reactor_; + TAO_FlowSpec_Entry *entry_; +}; + +/** + * @class TAO_AV_SCTP_SEQ_Acceptor + * @brief + */ +class TAO_AV_Export TAO_AV_SCTP_SEQ_Acceptor + :public TAO_AV_Acceptor +{ +public: + TAO_AV_SCTP_SEQ_Acceptor (void); + virtual ~TAO_AV_SCTP_SEQ_Acceptor (void); + virtual int open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory, + TAO_AV_Core::Flow_Component flow_comp = + TAO_AV_Core::TAO_AV_DATA); + + virtual int open_default (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_FlowSpec_Entry *entry, + TAO_AV_Flow_Protocol_Factory *factory, + TAO_AV_Core::Flow_Component flow_comp = + TAO_AV_Core::TAO_AV_DATA); + + virtual int close (void); + virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler); + +protected: + ACE_INET_Addr *address_; + TAO_AV_SCTP_SEQ_Base_Acceptor acceptor_; + TAO_Base_StreamEndPoint *endpoint_; + TAO_FlowSpec_Entry *entry_; + TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_; +}; + +class TAO_AV_SCTP_SEQ_Connector; + +/** + * @class TAO_AV_Base_Connector + * @brief + */ +class TAO_AV_SCTP_SEQ_Base_Connector : public ACE_Connector <TAO_AV_SCTP_SEQ_Flow_Handler,ACE_SOCK_SEQPACK_CONNECTOR> +{ +public: + // To avoid warnings of open and connect hiding the base class functions these have to renamed. + int connector_open (TAO_AV_SCTP_SEQ_Connector *connector, + ACE_Reactor *reactor); + int connector_connect (TAO_AV_SCTP_SEQ_Flow_Handler *&handler, + const ACE_Multihomed_INET_Addr &remote_addr, + const ACE_Multihomed_INET_Addr &local_addr); + virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *& handler); +protected: + TAO_AV_SCTP_SEQ_Connector *connector_; + ACE_Reactor *reactor_; +}; + +/** + * @class TAO_AV_SCTP_SEQ_Connector + * @brief + */ +class TAO_AV_SCTP_SEQ_Connector : public TAO_AV_Connector +{ +public: + TAO_AV_SCTP_SEQ_Connector (void); + virtual ~TAO_AV_SCTP_SEQ_Connector (void); + + virtual int open (TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Core *av_core, + TAO_AV_Flow_Protocol_Factory *factory); + + virtual int connect (TAO_FlowSpec_Entry *entry, + TAO_AV_Transport *&transport, + TAO_AV_Core::Flow_Component flow_comp = + TAO_AV_Core::TAO_AV_DATA); + virtual int close (void); + virtual int make_svc_handler (TAO_AV_SCTP_SEQ_Flow_Handler *&handler); +protected: + TAO_AV_Core *av_core_; + TAO_AV_SCTP_SEQ_Base_Connector connector_; + TAO_Base_StreamEndPoint *endpoint_; + TAO_FlowSpec_Entry *entry_; + TAO_AV_Flow_Protocol_Factory *flow_protocol_factory_; +}; + +/** + * @class TAO_AV_SCTP_SEQ_Object + * @brief TAO_AV_Protocol_Object for the User Datagram Protocol (SCTP_SEQ) + */ +class TAO_AV_Export TAO_AV_SCTP_SEQ_Object : public TAO_AV_Protocol_Object +{ +public: + TAO_AV_SCTP_SEQ_Object (TAO_AV_Callback *callback, + TAO_AV_Transport *transport = 0); + + /// Dtor + virtual ~TAO_AV_SCTP_SEQ_Object (void); + + virtual int handle_input (void); + + /// send a data frame. + virtual int send_frame (ACE_Message_Block *frame, + TAO_AV_frame_info *frame_info = 0); + + virtual int send_frame (const iovec *iov, + int iovcnt, + TAO_AV_frame_info *frame_info = 0); + + virtual int send_frame (const char*buf, + size_t len); + + /// end the stream. + virtual int destroy (void); + +private: + /// Pre-allocated memory to receive the data... + ACE_Message_Block frame_; +}; + +/** + * @class TAO_AV_SCTP_SEQ_Flow_Factory + * @brief + */ +class TAO_AV_Export TAO_AV_SCTP_SEQ_Flow_Factory : public TAO_AV_Flow_Protocol_Factory +{ +public: + /// Initialization hook. + TAO_AV_SCTP_SEQ_Flow_Factory (void); + virtual ~TAO_AV_SCTP_SEQ_Flow_Factory (void); + virtual int init (int argc, char *argv[]); + virtual int match_protocol (const char *flow_string); + TAO_AV_Protocol_Object* make_protocol_object (TAO_FlowSpec_Entry *entry, + TAO_Base_StreamEndPoint *endpoint, + TAO_AV_Flow_Handler *handler, + TAO_AV_Transport *transport); +}; + +ACE_STATIC_SVC_DECLARE (TAO_AV_SCTP_SEQ_Flow_Factory) +ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SCTP_SEQ_Flow_Factory) + +ACE_STATIC_SVC_DECLARE (TAO_AV_SCTP_SEQ_Factory) +ACE_FACTORY_DECLARE (TAO_AV, TAO_AV_SCTP_SEQ_Factory) + + +#include /**/ "ace/post.h" +#endif /* TAO_AV_SCTP_SEQ_H */ diff --git a/TAO/orbsvcs/orbsvcs/Makefile.av b/TAO/orbsvcs/orbsvcs/Makefile.av index 5b8223695e4..c5ed5958b14 100644 --- a/TAO/orbsvcs/orbsvcs/Makefile.av +++ b/TAO/orbsvcs/orbsvcs/Makefile.av @@ -70,6 +70,10 @@ ifeq ($(rapi),1) AV/QoS_UDP endif # rapi +ifeq ($(sctp),openss7) + CPP_SRCS += AV/SCTP_SEQ +endif # sctp + IDL_SRC = \ $(addsuffix S.cpp, $(IDL_FILES)) \ $(addsuffix C.cpp, $(IDL_FILES)) @@ -91,7 +95,7 @@ include $(ACE_ROOT)/include/makeinclude/rules.local.GNU #---------------------------------------------------------------------------- LDFLAGS += -L$(TAO_ROOT)/tao -L$(TAO_ROOT)/orbsvcs/orbsvcs -CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT)/orbsvcs/orbsvcs -DTAO_ORBSVCS_HAS_AV +CPPFLAGS += -I$(TAO_ROOT) -I$(TAO_ROOT)/orbsvcs -I$(TAO_ROOT)/orbsvcs/orbsvcs -DTAO_ORBSVCS_HAS_AV ifeq ($(shared_libs),1) ifneq ($(SHLIB),) diff --git a/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl b/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl index e93ccdfbf52..1bcd4e6da62 100755 --- a/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl +++ b/TAO/orbsvcs/tests/AVStreams/Simple_Two_Stage/run_test.pl @@ -107,6 +107,32 @@ if ($receiver != 0) { $status = 1; } +$SV = new PerlACE::Process ("receiver", "-ORBInitRef NameService=file://$nsior -f sctp_output"); +$CL = new PerlACE::Process ("sender", "-ORBInitRef NameService=file://$nsior -p SCTP_SEQ"); + +print STDERR "Using SCTP\n"; +print STDERR "Starting Receiver\n"; + +$SV->Spawn (); + +sleep $sleeptime; + +print STDERR "Starting Sender\n"; + +$sender = $CL->SpawnWaitKill (200); + +if ($sender != 0) { + print STDERR "ERROR: sender returned $sender\n"; + $status = 1; +} + +$receiver = $SV->TerminateWaitKill (5); + +if ($receiver != 0) { + print STDERR "ERROR: receiver returned $receiver\n"; + $status = 1; +} + $nserver = $NS->TerminateWaitKill (5); if ($nserver != 0) { |