diff options
Diffstat (limited to 'TAO/orbsvcs/orbsvcs/AV/RTCP.cpp')
-rw-r--r-- | TAO/orbsvcs/orbsvcs/AV/RTCP.cpp | 1258 |
1 files changed, 578 insertions, 680 deletions
diff --git a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp index cd661516434..7f2197c0622 100644 --- a/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp +++ b/TAO/orbsvcs/orbsvcs/AV/RTCP.cpp @@ -37,543 +37,303 @@ #include "RTCP.h" #include "media-timer.h" #include "tao/debug.h" +#include "global.h" +#include "md5.h" +#include "RTCP_Packet.h" -// TAO_AV_RTP_State -TAO_AV_RTP_State::TAO_AV_RTP_State (void) - :bad_version_(0), - badoptions_(0), - badfmt_(0), - badext_(0), - nrunt_(0), - last_np_(0), - sdes_seq_(0), - rtcp_inv_bw_(0.), - rtcp_avg_size_(128.), - confid_(-1) -{ - ACE_NEW (pktbuf_, - u_char [2 * RTP_MTU]); -} - -//------------------------------------------------------------ -// TAO_AV_RTCP -//------------------------------------------------------------ -u_char* -TAO_AV_RTCP::build_sdes_item (u_char* p, - int code, - TAO_AV_Source& s) +int +TAO_AV_RTCP_Callback::receive_control_frame (ACE_Message_Block *data, + const ACE_Addr &peer_address) { - const char* value = s.sdes (code); - if (value != 0) - { - int len = strlen (value); - *p++ = code; - *p++ = len; - memcpy (p, value, len); - p += len; - } - return (p); -} + int length = data->length (); + int more = length; + char *buf_ptr = data->rd_ptr (); + char first_rtcp_packet = 1; + RTCP_Channel_In *c; -int -TAO_AV_RTCP::build_sdes (rtcphdr* rh, - TAO_AV_Source& ls, - TAO_AV_RTP_State *state) -{ - int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES; - rh->rh_flags = htons (flags); - rh->rh_ssrc = ls.srcid (); - u_char* p = (u_char*) (rh + 1); - p = build_sdes_item (p, RTCP_SDES_CNAME, ls); - - /* - * We always send a cname plus one other sdes - * There's a schedule for what we send sequenced by sdes_seq_: - * - send 'email' every 0th & 4th packet - * - send 'note' every 2nd packet - * - send 'tool' every 6th packet - * - send 'name' in all the odd slots - * (if 'note' is not the empty string, we switch the roles - * of name & note) - */ - int nameslot, noteslot; - const char* note = ls.sdes (RTCP_SDES_NOTE); - if (note) + // This code performs the RTCP Header validity checks detailed in RFC 1889 + // Appendix A.2 + + while (more > 0) { - if (*note) - { - nameslot = RTCP_SDES_NOTE; - noteslot = RTCP_SDES_NAME; - } else + // the second byte of the control packet is the type + switch ((unsigned char)buf_ptr[length - more + 1]) + { + case RTCP_PT_SR: { - nameslot = RTCP_SDES_NAME; - noteslot = RTCP_SDES_NOTE; + RTCP_SR_Packet sr(&buf_ptr[length-more], + &more); + + if (!sr.is_valid(first_rtcp_packet)) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "warning invalid rtcp packet\n")); + + if (this->inputs_.find (sr.ssrc (), c) == -1) + { + ACE_NEW_RETURN (c, + RTCP_Channel_In (sr.ssrc (), + &peer_address), + -1); + this->inputs_.bind (sr.ssrc (), c); + } + c->updateStatistics (&sr); + + if (TAO_debug_level > 0) + sr.dump (); + break; } - } - else - { - nameslot = RTCP_SDES_NAME; - noteslot = RTCP_SDES_NAME; - } - u_int seq = (++state->sdes_seq_) & 0x7; - switch (seq) - { - case 0: - case 4: - p = build_sdes_item (p, RTCP_SDES_EMAIL, ls); - break; - case 2: - p = build_sdes_item (p, noteslot, ls); - break; - case 6: - p = build_sdes_item (p, RTCP_SDES_TOOL, ls); - break; - default: - p = build_sdes_item (p, nameslot, ls); - } - - int len = p - (u_char*)rh; - int pad = 4 - (len & 3); - len += pad; - rh->rh_len = htons ( (len >> 2) - 1); - while (--pad >= 0) - *p++ = 0; - - return (len); -} - -int -TAO_AV_RTCP::build_bye (rtcphdr* rh, - TAO_AV_Source& ls) -{ - int flags = - RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE; - rh->rh_flags = ntohs (flags); - rh->rh_len = htons (1); - rh->rh_ssrc = ls.srcid (); - return (8); -} + case RTCP_PT_RR: + { + RTCP_RR_Packet rr(&buf_ptr[length-more], + &more); + + if (!rr.is_valid(first_rtcp_packet)) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "warning invalid rtcp packet\n")); + + if (this->inputs_.find (rr.ssrc (), c) == -1) + { + ACE_NEW_RETURN (c, + RTCP_Channel_In (rr.ssrc (), + &peer_address), + -1); + this->inputs_.bind (rr.ssrc (), c); + } + + c->updateStatistics (&rr); + + if (TAO_debug_level > 0) + rr.dump (); + break; + } + case RTCP_PT_SDES: + { + RTCP_SDES_Packet sdes (&buf_ptr[length-more], + &more); -void -TAO_AV_RTCP::parse_rr_records (ACE_UINT32, - rtcp_rr*, - int, - const u_char*, - ACE_UINT32) -{ - if (TAO_debug_level > 0) ACE_DEBUG ( (LM_DEBUG,"TAO_AV_RTCP::parse_rr_records\n")); -} + if (!sdes.is_valid(first_rtcp_packet)) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "warning invalid rtcp packet\n")); -void -TAO_AV_RTCP::parse_sr (rtcphdr* rh, - int flags, - u_char*ep, - TAO_AV_Source* ps, - ACE_UINT32 addr, - TAO_AV_SourceManager *source_manager) -{ - rtcp_sr* sr = ACE_reinterpret_cast (rtcp_sr*, (rh + 1)); - TAO_AV_Source* s; - ACE_UINT32 ssrc = rh->rh_ssrc; - if (ps->srcid () != ssrc) - s = source_manager->lookup (ssrc, ssrc, addr); - else - s = ps; + if (TAO_debug_level > 0) + sdes.dump (); + break; + } + case RTCP_PT_BYE: + { + RTCP_BYE_Packet bye (&buf_ptr[length-more], + &more); - s->lts_ctrl (ACE_OS::gettimeofday ()); - s->sts_ctrl (ntohl (sr->sr_ntp.upper) << 16 | - ntohl (sr->sr_ntp.lower) >> 16); + if (!bye.is_valid(first_rtcp_packet)) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "warning invalid rtcp packet\n")); - int cnt = flags >> 8 & 0x1f; - parse_rr_records (ssrc, ACE_reinterpret_cast (rtcp_rr*, (sr + 1)), cnt, ep, addr); -} + // Inform the listener that a source(s) has left the session + ACE_UINT32 *ssrc_list; + unsigned char length; -void -TAO_AV_RTCP::parse_rr (rtcphdr* rh, - int flags, - u_char* ep, - TAO_AV_Source* ps, - ACE_UINT32 addr, - TAO_AV_SourceManager *source_manager) -{ - TAO_AV_Source* s; - ACE_UINT32 ssrc = rh->rh_ssrc; - if (ps->srcid () != ssrc) - s = source_manager->lookup (ssrc, ssrc, addr); - else - s = ps; + bye.ssrc_list(&ssrc_list, length); - s->lts_ctrl (ACE_OS::gettimeofday ()); - int cnt = flags >> 8 & 0x1f; - parse_rr_records (ssrc, ACE_reinterpret_cast (rtcp_rr*, (rh + 1)), cnt, ep, addr); -} + for (int i=0; i<length; i++) + { + RTCP_Channel_In *c = 0; -int -TAO_AV_RTCP::sdesbody (ACE_UINT32* p, - u_char* ep, - TAO_AV_Source* ps, - ACE_UINT32 addr, - ACE_UINT32 ssrc, - TAO_AV_SourceManager *source_manager) -{ - TAO_AV_Source* s; - ACE_UINT32 srcid = *p; - if (ps->srcid () != srcid) - s = source_manager->lookup (srcid, ssrc, addr); - else - s = ps; - if (s == 0) - return (0); - /* - * Note ctrl packet since we will never see any direct ctrl packets - * from a TAO_AV_Source through a mixer (and we don't want the TAO_AV_Source to - * time out). - */ - s->lts_ctrl (ACE_OS::gettimeofday ()); + // remove the channel from the list + this->inputs_.unbind(ssrc_list[i], c); - u_char* cp = (u_char*) (p + 1); - while (cp < ep) - { - char buf[256]; + if (c != 0) + delete c; + } - u_int type = cp[0]; - if (type == 0) - { - /* end of chunk */ - return ( ( (cp - (u_char*)p) >> 2) + 1); - } - u_int len = cp[1]; - u_char* eopt = cp + len + 2; - if (eopt > ep) - return (0); + if (TAO_debug_level > 0) + bye.dump (); - if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) - { - memcpy (buf, (char*)&cp[2], len); - buf[len] = 0; - s->sdes (type, buf); - } - else - /*XXX*/; + break; + } + case RTCP_PT_APP: + // If we receive one of these, ignore it. + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "APP packet - ignore\n")); + more -= (4 + (ACE_UINT16)buf_ptr[length - more + 2]); + break; + default: + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "UNKNOWN packet type %u; ignore the rest\n", + (int)buf_ptr[length - more + 1])); + more = 0; + } + + first_rtcp_packet = 0; - cp = eopt; } - return (0); -} -void -TAO_AV_RTCP::parse_sdes (rtcphdr* rh, - int flags, - u_char* ep, - TAO_AV_Source* ps, - ACE_UINT32 addr, - ACE_UINT32 ssrc, - TAO_AV_SourceManager *source_manager) -{ - int cnt = flags >> 8 & 0x1f; - ACE_UINT32* p = (ACE_UINT32*)&rh->rh_ssrc; - while (--cnt >= 0) - { - int n = TAO_AV_RTCP::sdesbody (p, ep, ps, addr, ssrc, source_manager); - if (n == 0) - break; - p += n; - } - if (cnt >= 0) - ps->badsdes (1); + if (more != 0) + ACE_DEBUG ((LM_DEBUG, + "TAO_AV_RTCP_Callback::receive_control_frame - " + "Error in overall packet length\n")); + return 0; } -void -TAO_AV_RTCP::parse_bye (rtcphdr* rh, - int flags, - u_char* ep, - TAO_AV_Source* ps, - TAO_AV_SourceManager *source_manager) -{ - int cnt = flags >> 8 & 0x1f; - ACE_UINT32* p = (ACE_UINT32*)&rh->rh_ssrc; +ACE_INT32 random32 (int); - while (--cnt >= 0) +ACE_UINT32 +TAO_AV_RTCP::alloc_srcid (ACE_UINT32 addr) +{ + struct { - if (p >= (ACE_UINT32*)ep) - { - ps->badbye (1); - return; - } - TAO_AV_Source* s; - if (ps->srcid () != rh->rh_ssrc) - s = source_manager->consult (*p); - else - s = ps; - if (s != 0) - s->lts_done (ACE_OS::gettimeofday ()); - ++p; - } + int type; + ACE_Time_Value tv; + pid_t pid; + pid_t pgid; + pid_t ppid; + uid_t uid; + gid_t gid; + } s; + + s.type = addr; + s.tv = ACE_OS::gettimeofday (); + s.pid = ACE_OS::getpid(); + s.pgid = ACE_OS::getpgid(s.pid); + s.ppid = ACE_OS::getppid(); + s.uid = ACE_OS::getuid(); + s.gid = ACE_OS::getgid(); + + unsigned char *string_val = (unsigned char *) &s; + int length = sizeof(s); + + MD5_CTX context; + union + { + char c[16]; + u_long x[4]; + } digest; + ACE_UINT32 r; + int i; + + MD5Init (&context); + MD5Update (&context, string_val, length); + MD5Final ((unsigned char*)&digest, &context); + r=0; + for (i=0; i<3; i++) + r ^= digest.x[i]; + + return r; + +/* used to be this + ACE_Time_Value tv = ACE_OS::gettimeofday (); + ACE_UINT32 srcid = ACE_UINT32 (tv.sec () + tv.usec ()); + srcid += (ACE_UINT32)ACE_OS::getuid(); + srcid += (ACE_UINT32)ACE_OS::getpid(); + srcid += addr; + return (srcid); +*/ } -/*XXX check for buffer overflow*/ -/* - * Send an RTPv2 report packet. - */ -void -TAO_AV_RTCP::send_report (int bye, - TAO_AV_Protocol_Object *protocol_object, - TAO_AV_SourceManager *source_manager, - TAO_AV_RTP_State *state, - TAO_AV_RTCP_Callback *callback) -{ - if (source_manager->localsrc () == 0) - return; - TAO_AV_Source& s = *source_manager->localsrc (); - rtcphdr* rh = (rtcphdr*)state->pktbuf_; - rh->rh_ssrc = s.srcid (); - int flags = RTP_VERSION << 14; - timeval now = ACE_OS::gettimeofday (); - s.lts_ctrl (now); - int we_sent = 0; - rtcp_rr* rr; - /* - * If we've sent data since our last sender report send a - * new report. The MediaTimer check is to make sure we still - * have a grabber -- if not, we won't be able to interpret the - * media timestamps so there's no point in sending an SR. - */ - MediaTimer* mt = MediaTimer::instance (); - if (s.np () != state->last_np_ && mt) { - state->last_np_ = s.np (); - we_sent = 1; - flags |= RTCP_PT_SR; - rtcp_sr* sr = ACE_reinterpret_cast (rtcp_sr*, (rh + 1)); - sr->sr_ntp = ntp64time (now); - sr->sr_ntp.upper = ACE_HTONL (sr->sr_ntp.upper); - sr->sr_ntp.lower = ACE_HTONL (sr->sr_ntp.lower); - sr->sr_ts = htonl (mt->ref_ts ()); - sr->sr_np = htonl (s.np ()); - sr->sr_nb = htonl (s.nb ()); - rr = ACE_reinterpret_cast (rtcp_rr*, (sr + 1)); - } else { - flags |= RTCP_PT_RR; - rr = ACE_reinterpret_cast (rtcp_rr*, (rh + 1)); - } - int nrr = 0; - int nsrc = 0; - /* - * we don't want to inflate report interval if user has set - * the flag that causes all TAO_AV_Sources to be 'kept' so we - * consider TAO_AV_Sources 'inactive' if we haven't heard a control - * msg from them for ~32 reporting intervals. - */ - u_int inactive = u_int (state->rint_ * (32./1000.)); - if (inactive < 2) - inactive = 2; - for (TAO_AV_Source* sp = source_manager->sources (); sp != 0; sp = sp->next_) { - ++nsrc; - int received = sp->np () - sp->snp (); - if (received == 0) { - if (u_int (now.tv_sec - sp->lts_ctrl ().tv_sec) > inactive) - --nsrc; - continue; - } - sp->snp (sp->np ()); - rr->rr_srcid = sp->srcid (); - int expected = sp->ns () - sp->sns (); - sp->sns (sp->ns ()); - ACE_UINT32 v; - int lost = expected - received; - if (lost <= 0) - v = 0; - else - /* expected != 0 if lost > 0 */ - v = ( (lost << 8) / expected) << 24; - /* XXX should saturate on over/underflow */ - v |= (sp->ns () - sp->np ()) & 0xffffff; - rr->rr_loss = htonl (v); - rr->rr_ehsr = htonl (sp->ehs ()); - rr->rr_dv = sp->delvar (); - rr->rr_lsr = htonl (sp->sts_ctrl ()); - if (sp->lts_ctrl ().tv_sec == 0) - rr->rr_dlsr = 0; - else { - ACE_UINT32 ntp_now = ntptime (now); - ACE_UINT32 ntp_then = ntptime (sp->lts_ctrl ()); - rr->rr_dlsr = htonl (ntp_now - ntp_then); - } - ++rr; - if (++nrr >= 31) - break; - } - flags |= nrr << 8; - rh->rh_flags = htons (flags); - int len = (u_char*)rr - state->pktbuf_; - rh->rh_len = htons ( (len >> 2) - 1); - - if (bye) - len += build_bye ( ACE_reinterpret_cast (rtcphdr*,rr), s); - else - len += build_sdes ( ACE_reinterpret_cast (rtcphdr*, rr), s,state); - - ACE_Message_Block mb ((char *)state->pktbuf_, len); - mb.wr_ptr (len); - protocol_object->send_frame (&mb); - - state->rtcp_avg_size_ += RTCP_SIZE_GAIN * (double (len + 28) - state->rtcp_avg_size_); - - /* - * compute the time to the next report. we do this here - * because we need to know if there were any active TAO_AV_Sources - * during the last report period (nrr above) & if we were - * a TAO_AV_Source. The bandwidth limit for rtcp traffic was set - * on startup from the session bandwidth. It is the inverse - * of bandwidth (ie., ms/byte) to avoid a divide below. - */ - double ibw = state->rtcp_inv_bw_; - if (nrr) { - /* there were active TAO_AV_Sources */ - if (we_sent) { - ibw *= 1./RTCP_SENDER_BW_FRACTION; - nsrc = nrr; - } else { - ibw *= 1./RTCP_RECEIVER_BW_FRACTION; - nsrc -= nrr; - } - } - double rint = state->rtcp_avg_size_ * double (nsrc) * ibw; - if (rint < RTCP_MIN_RPT_TIME * 1000.) - rint = RTCP_MIN_RPT_TIME * 1000.; - state->rint_ = rint; - callback->schedule (int (TAO_AV_RTCP::fmod (double (ACE_OS::rand ()), rint) + rint * .5 + .5)); - - source_manager->CheckActiveSources (rint); -} -int -TAO_AV_RTCP::handle_input (ACE_Message_Block *data, - const ACE_Addr &peer_address, - rtcphdr &header, - TAO_AV_SourceManager *source_manager, - TAO_AV_RTP_State *state) -{ - int cc = data->length (); - int size_phdr = ACE_static_cast (int, sizeof (rtcphdr)); - if (cc < size_phdr) - { - state->nrunt_++; - ACE_ERROR_RETURN ( (LM_ERROR,"TAO_AV_RTP::handle_input:invalid header\n"),-1); - } - if (peer_address == ACE_Addr::sap_any) - ACE_ERROR_RETURN ( (LM_ERROR,"TAO_AV_RTP::handle_input:get_peer_addr failed\n"),-1); - // @@ We need to be careful of this. - u_long addr = peer_address.hash (); - header = * (rtcphdr*) (data->rd_ptr ()); - rtcphdr *rh = (rtcphdr *)data->rd_ptr (); - /* - * try to filter out junk: first thing in packet must be - * sr, rr or bye & version number must be correct. - */ - switch (ntohs (rh->rh_flags) & 0xc0ff) +double +TAO_AV_RTCP::rtcp_interval (int members, + int senders, + double rtcp_bw, + int we_sent, + int packet_size, + int *avg_rtcp_size, + int initial) +{ + // Minimum time between RTCP packets from this site (in sec.). + // This time prevents the reports from 'clumping' when sessions + // are small and the law of large numbers isn't helping to smooth + // out the traffic. It also keeps the report interval from + // becoming ridiculously small during transient outages like a + // network partition. +// double const RTCP_MIN_TIME = 5.0; (from RTP.h) + + // Fraction of the RTCP bandwidth to be shared among active + // senders. (This fraction was chosen so that in a typical + // session with one or two active senders, the computed report + // time would be roughly equal to the minimum report time so that + // we don't unnecessarily slow down receiver reports.) The + // receiver fraction must be 1 - the sender fraction. +// double const RTCP_SENDER_BW_FRACTION = 0.25; (from RTP.h) +// double const RTCP_RCVR_BW_FRACTION = (1-RTCP_SENDER_BW_FRACTION); (from RTP.h) + + // Gain (smoothing constant) for the low-pass filter that + // estimates the average RTCP packet size +// double const RTCP_SIZE_GAIN = (1.0/16.0); (from RTP.h) + + double t; + double rtcp_min_time = RTCP_MIN_RPT_TIME; + int n; // number of members for computation + + // Very first call at application start-up uses half the min + // delay for quicker notification while still allowing some time + // before reporting for randomization and to learn about other + // sources so the report interval will converge to the correct + // interval more quickly. The average RTCP size is initialized + // to 128 octets which is conservative (it assumes everyone else + // is generating SRs instead of RRs: 20 IP + 8 UDP + 52 SR + 48 + // SDES CNAME). + if (initial) { - case RTP_VERSION << 14 | RTCP_PT_SR: - case RTP_VERSION << 14 | RTCP_PT_RR: - case RTP_VERSION << 14 | RTCP_PT_BYE: - break; - default: - /* - * XXX should further categorize this error -- it is - * likely that people mis-implement applications that - * don't put something other than SR,RR,BYE first. - */ - ++state->bad_version_; - return -1; - } - /* - * at this point we think the packet's valid. Update our average - * size estimator. Also, there's valid ssrc so charge errors to it - */ - - - double tmp = (cc + 28) - state->rtcp_avg_size_; - tmp *= RTCP_SIZE_GAIN; - state->rtcp_avg_size_ += ACE_static_cast (int, tmp); - /* - * First record in compound packet must be the ssrc of the - * sender of the packet. Pull it out here so we can use - * it in the sdes parsing, since the sdes record doesn't - * contain the ssrc of the sender (in the case of mixers). - */ - ACE_UINT32 ssrc = rh->rh_ssrc; - TAO_AV_Source* ps = source_manager->lookup (ssrc, ssrc, addr); - if (ps == 0) - return 0; + // initialize the random number generator + ACE_OS::srand((unsigned int)avg_rtcp_size); - /* - * Outer loop parses multiple RTCP records of a "compound packet". - * There is no framing between records. Boundaries are implicit - * and the overall length comes from UDP. - */ - u_char* epack = (u_char*)rh + cc; - while ( (u_char*)rh < epack) { - u_int len = (ntohs (rh->rh_len) << 2) + 4; - u_char* ep = (u_char*)rh + len; - if (ep > epack) { - ps->badsesslen (1); - return 0; - } - u_int flags = ntohs (rh->rh_flags); - if (flags >> 14 != RTP_VERSION) { - ps->badsessver (1); - return 0; + rtcp_min_time /= 2; + *avg_rtcp_size = 128; } - switch (flags & 0xff) { - case RTCP_PT_SR: - TAO_AV_RTCP::parse_sr (rh, flags, ep, ps, addr, source_manager); - break; + // If there were active senders, give them at least a minimum + // share of the RTCP bandwidth. Otherwise all participants share + // the RTCP bandwidth equally. + n = members; + if ((senders > 0) && (senders < members*RTCP_SENDER_BW_FRACTION)) + { + if (we_sent) + { + rtcp_bw *= RTCP_SENDER_BW_FRACTION; + n = senders; + } + else + { + rtcp_bw *= RTCP_RECEIVER_BW_FRACTION; + n -= senders; + } + } - case RTCP_PT_RR: - TAO_AV_RTCP::parse_rr (rh, flags, ep, ps, addr, source_manager); - break; + // Update the average size estimate by the size of the report + // packet we just sent. + *avg_rtcp_size += (int)((packet_size - *avg_rtcp_size)*RTCP_SIZE_GAIN); - case RTCP_PT_SDES: - TAO_AV_RTCP::parse_sdes (rh, flags, ep, ps, addr, ssrc, source_manager); - break; + // The effective number of sites times the average packet size is + // the total number of octets sent when each site sends a report. + // Dividing this by the effective bandwidth gives the time + // interval over which those packets must be sent in order to + // meet the bandwidth target, with a minimum enforced. In that + // time interval we send one report so this time is also our + // average time between reports. + t = (*avg_rtcp_size) * n / rtcp_bw; + if (t < rtcp_min_time) + t = rtcp_min_time; - case RTCP_PT_BYE: - TAO_AV_RTCP::parse_bye (rh, flags, ep, ps, source_manager); - break; + // To avoid traffic bursts from unintended synchronization with + // other sites, we then pick our actual next report interval as a + // random number uniformly distributed between 0.5*t and 1.5*t. - default: - ps->badsessopt (1); - break; - } - rh = (rtcphdr*)ep; - } - return 0; -} + // TODO: this may not be right. need a random number between 0 and 1 + int max_rand = 32768; -ACE_UINT32 -TAO_AV_RTCP::alloc_srcid (ACE_UINT32 addr) -{ - ACE_Time_Value tv = ACE_OS::gettimeofday (); - ACE_UINT32 srcid = ACE_UINT32 (tv.sec () + tv.usec ()); - srcid += (ACE_UINT32)ACE_OS::getuid(); - srcid += (ACE_UINT32)ACE_OS::getpid(); - srcid += addr; - return (srcid); + return t * ((double)ACE_OS::rand()/max_rand + 0.5); +// return t * (drand48() + 0.5); } -double -TAO_AV_RTCP::fmod (double dividend, double divisor) -{ - //Method to calculate the fmod (x,y) - int quotient = ACE_static_cast (int, (dividend / divisor)); - double product = quotient * divisor; - double remainder = dividend - product; - return remainder; -} - // TAO_AV_RTCP_Flow_Factory TAO_AV_RTCP_Flow_Factory::TAO_AV_RTCP_Flow_Factory (void) @@ -588,7 +348,8 @@ int TAO_AV_RTCP_Flow_Factory::match_protocol (const char *flow_string) { if (ACE_OS::strncasecmp (flow_string,"RTCP",4) == 0) - return 1; + return 1; + return 0; } @@ -600,25 +361,27 @@ TAO_AV_RTCP_Flow_Factory::init (int /* argc */, } TAO_AV_Protocol_Object* -TAO_AV_RTCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, - TAO_Base_StreamEndPoint *endpoint, +TAO_AV_RTCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry */*entry*/, + TAO_Base_StreamEndPoint */*endpoint*/, TAO_AV_Flow_Handler *handler, TAO_AV_Transport *transport) { - TAO_AV_Callback *callback = 0; - endpoint->get_control_callback (entry->flowname (), - callback); - if (callback == 0) - ACE_NEW_RETURN (callback, - TAO_AV_RTCP_Callback, - 0); + TAO_AV_Callback *client_cb = 0; + TAO_AV_RTCP_Callback *rtcp_cb = 0; + + // TODO: need to handle a client callback at some point +// endpoint->get_control_callback (entry->flowname (), +// client_cb); + TAO_AV_Protocol_Object *object = 0; ACE_NEW_RETURN (object, - TAO_AV_RTCP_Object (callback, + TAO_AV_RTCP_Object (client_cb, + rtcp_cb, transport), 0); - callback->open (object, - handler); + + rtcp_cb->open (object, handler); + return object; } @@ -626,19 +389,17 @@ TAO_AV_RTCP_Flow_Factory::make_protocol_object (TAO_FlowSpec_Entry *entry, int TAO_AV_RTCP_Object::handle_input (void) { - ACE_Message_Block *data; size_t bufsiz = 2*this->transport_->mtu (); - ACE_NEW_RETURN (data, - ACE_Message_Block (bufsiz), - -1); - int n = this->transport_->recv (data->rd_ptr (),bufsiz); + ACE_Message_Block data (bufsiz); + + int n = this->transport_->recv (data.rd_ptr (),bufsiz); if (n == 0) ACE_ERROR_RETURN ( (LM_ERROR,"TAO_AV_RTP::handle_input:connection closed\n"),-1); if (n < 0) ACE_ERROR_RETURN ( (LM_ERROR,"TAO_AV_RTP::handle_input:recv error\n"),-1); - data->wr_ptr (n); + data.wr_ptr (n); ACE_Addr *peer_addr = this->transport_->get_peer_addr (); - this->callback_->receive_control_frame (data,*peer_addr); + this->callback_->receive_control_frame (&data,*peer_addr); return 0; } @@ -665,10 +426,14 @@ TAO_AV_RTCP_Object::send_frame (const char*, return 0; } -TAO_AV_RTCP_Object::TAO_AV_RTCP_Object (TAO_AV_Callback *callback, +TAO_AV_RTCP_Object::TAO_AV_RTCP_Object (TAO_AV_Callback *client_cb, + TAO_AV_RTCP_Callback *&rtcp_cb, TAO_AV_Transport *transport) - :TAO_AV_Protocol_Object (callback,transport) + :TAO_AV_Protocol_Object (&rtcp_cb_, transport) { + rtcp_cb = &this->rtcp_cb_; + this->client_cb_ = client_cb; + } TAO_AV_RTCP_Object::~TAO_AV_RTCP_Object (void) @@ -678,6 +443,11 @@ TAO_AV_RTCP_Object::~TAO_AV_RTCP_Object (void) int TAO_AV_RTCP_Object::destroy (void) { + TAO_AV_UDP_Transport *my_transport = ACE_dynamic_cast (TAO_AV_UDP_Transport*, + this->transport_); + + my_transport->handler ()->cancel_timer (); + this->callback_->handle_destroy (); return 0; } @@ -704,51 +474,45 @@ int TAO_AV_RTCP_Object::handle_control_input (ACE_Message_Block *frame, const ACE_Addr &peer_address) { -// frame->rd_ptr ((size_t)0); -// // Since the rd_ptr would have been moved ahead. return this->callback_->receive_frame (frame, 0, peer_address); } -// TAO_AV_RTCP_Callback -TAO_AV_RTCP_Callback::TAO_AV_RTCP_Callback (void) +int +TAO_AV_RTCP_Object::handle_control_output (ACE_Message_Block *frame) { - ACE_NEW (source_manager_, - TAO_AV_SourceManager (this)); + TAO_AV_RTCP_Callback *cb = ACE_dynamic_cast (TAO_AV_RTCP_Callback*, + this->callback_); - ACE_NEW (this->state_, - TAO_AV_RTP_State); + return cb->send_frame (frame); } -TAO_AV_RTCP_Callback::~TAO_AV_RTCP_Callback (void) +void +TAO_AV_RTCP_Object::ts_offset (ACE_UINT32 ts_offset) { + TAO_AV_RTCP_Callback *cb = ACE_dynamic_cast (TAO_AV_RTCP_Callback*, + this->callback_); + return cb->ts_offset (ts_offset); } -TAO_AV_SourceManager* -TAO_AV_RTCP_Callback::source_manager (void) +// TAO_AV_RTCP_Callback +TAO_AV_RTCP_Callback::TAO_AV_RTCP_Callback (void) + :is_initial_timeout_(1), + packet_size_(0) { - return this->source_manager_; -} + char cname[256]; + char host[256]; + ACE_OS::hostname(host, sizeof(host)); -TAO_AV_RTP_State* -TAO_AV_RTCP_Callback::state (void) -{ - return this->state_; + // TODO: determine username auto-magically? + ACE_OS::sprintf(cname, "username@%s", host); + + this->output_.cname(cname); } -int -TAO_AV_RTCP_Callback::get_rtp_source (TAO_AV_Source *&source, - ACE_UINT32 srcid, - ACE_UINT32 ssrc, - ACE_UINT32 addr) -{ - ACE_NEW_RETURN (source, - TAO_AV_Source (srcid, - ssrc, - addr), - -1); - return 0; +TAO_AV_RTCP_Callback::~TAO_AV_RTCP_Callback (void) +{ } void @@ -757,25 +521,9 @@ TAO_AV_RTCP_Callback::schedule (int ms) this->timeout_ = ms; } - int TAO_AV_RTCP_Callback::handle_start (void) { - // - /* - * schedule a timer for our first report using half the - * min rtcp interval. This gives us some time before - * our first report to learn about other sources so our - * next report interval will account for them. The avg - * rtcp size was initialized to 128 bytes which is - * conservative (it assumes everyone else is generating - * SRs instead of RRs). - */ - double rint = this->state_->rtcp_avg_size_ * this->state_->rtcp_inv_bw_; - if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.) - rint = RTCP_MIN_RPT_TIME / 2. * 1000.; - this->state_->rint_ = rint; - this->timeout_ = int(TAO_AV_RTCP::fmod(double(ACE_OS::rand ()), rint) + rint * .5 + .5); return 0; } @@ -788,34 +536,244 @@ TAO_AV_RTCP_Callback::handle_stop (void) int TAO_AV_RTCP_Callback::handle_timeout (void * /*arg*/) { - // Here we do the send_report. - TAO_AV_RTCP::send_report (0, - this->protocol_object_, - this->source_manager_, - this->state_, - this); + return this->send_report(0); +} + +int +TAO_AV_RTCP_Callback::send_report (int bye) +{ + // get the RTCP control object in order to get the ssrc + TAO_AV_RTCP_Object *rtcp_prot_obj = ACE_dynamic_cast (TAO_AV_RTCP_Object*, + this->protocol_object_); + ACE_UINT32 my_ssrc = rtcp_prot_obj->ssrc (); + + RTCP_Packet *cp; + RTCP_SDES_Packet sdes; + ACE_CString value = ""; + ACE_CString note = ""; + ACE_UINT16 sdes_type = 0; + RTCP_BYE_Packet *bye_packet = 0; // only used for bye + ACE_UINT32 ssrc_list[1]; // only used for bye + + // get an iterator for the incoming channels. + ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_); + iter = this->inputs_.begin(); + + // first send an SR/RR + RR_Block *blocks = 0; + RR_Block *b_iter = 0; + RR_Block *b_ptr = 0; + + while (iter != this->inputs_.end() ) + { + if (!b_iter) + { + b_ptr = (*iter).int_id_->getRRBlock (); + if (b_ptr) + { + blocks = b_ptr; + b_iter = b_ptr; + } + } + else + { + b_ptr = (*iter).int_id_->getRRBlock (); + if (b_ptr) + { + b_iter->next_ = b_ptr; + } + } + + iter++; + } + + if (b_iter) + b_iter->next_ = 0; + + if (this->output_.active ()) + { + // get the NTP timestamp + ACE_Time_Value unix_now = ACE_OS::gettimeofday (); + TAO_AV_RTCP::ntp64 ntp_now = ntp64time (unix_now); + ACE_UINT32 rtp_ts = unix_now.sec () * 8000 + unix_now.usec () / 125 + + this->timestamp_offset_; + ACE_NEW_RETURN(cp, + RTCP_SR_Packet (my_ssrc, + ntp_now.upper, + ntp_now.lower, + rtp_ts, + this->output_.packets_sent (), + this->output_.octets_sent (), + blocks), + -1); + } + else + { + ACE_NEW_RETURN(cp, + RTCP_RR_Packet (my_ssrc, + blocks), + -1); + } + + /* + * We always send a cname plus one other sdes + * There's a schedule for what we send sequenced by sdes_seq_: + * - send 'email' every 0th & 4th packet + * - send 'note' every 2nd packet + * - send 'tool' every 6th packet + * - send 'name' in all the odd slots + * (if 'note' is not the empty string, we switch the roles + * of name & note) + */ + + // TODO: need capability to change these settings + switch (this->sdes_count_%8) + { + case 0: + case 4: + value = "tao-users@wustl.edu"; + sdes_type = RTCP_SDES_EMAIL; + break; + case 2: + if (note.length () > 0) + { + value = "Joe User"; + sdes_type = RTCP_SDES_NAME; + } + else + { + value = note; + sdes_type = RTCP_SDES_NOTE; + } + break; + case 6: + value = "TAO A/V Service"; + sdes_type = RTCP_SDES_TOOL; + break; + case 1: + case 3: + case 5: + case 7: + if (note.length () == 0) + { + value = "Joe User"; + sdes_type = RTCP_SDES_NAME; + } + else + { + value = "An important note..."; + sdes_type = RTCP_SDES_NOTE; + } + break; + } + + ++this->sdes_count_; + + sdes.add_item (my_ssrc, + RTCP_SDES_CNAME, + strlen(this->output_.cname()), + this->output_.cname()); + if (bye) + { + ssrc_list[0] = rtcp_prot_obj->ssrc (); + + ACE_NEW_RETURN (bye_packet, + RTCP_BYE_Packet(ssrc_list, + sizeof(ssrc_list)/sizeof(ssrc_list[0]), + "Got bored."), + -1); + } + else + sdes.add_item (my_ssrc, sdes_type, value.length (), value.c_str ()); + + // create the message block + char *cp_ptr; + char *sdes_ptr; + char *bye_ptr = 0; + ACE_UINT16 cp_length; + ACE_UINT16 sdes_length; + ACE_UINT16 bye_length = 0; + cp->get_packet_data (&cp_ptr, cp_length); + sdes.get_packet_data (&sdes_ptr, sdes_length); + if (bye_packet) + bye_packet->get_packet_data(&bye_ptr, bye_length); + + ACE_Message_Block mb (cp_length + sdes_length + bye_length); + + memcpy (mb.wr_ptr (), cp_ptr, cp_length); + mb.wr_ptr (cp_length); + memcpy (mb.wr_ptr (), sdes_ptr, sdes_length); + mb.wr_ptr (sdes_length); + if (bye_length) + { + memcpy (mb.wr_ptr (), bye_ptr, bye_length); + mb.wr_ptr (bye_length); + } + + // send the report + this->protocol_object_->send_frame (&mb); + + this->packet_size_ = cp_length + sdes_length + bye_length; + + delete cp; + if (bye_packet) + delete bye_packet; + return 0; } void +//TAO_AV_RTCP_Callback::get_timeout (ACE_Time_Value *tv, TAO_AV_RTCP_Callback::get_timeout (ACE_Time_Value *&tv, void *& /*arg*/) { + int senders = 0; + int members = 1; // count self as member + + // TODO: this should be 5% of the session bw + double rtcp_bw = 1000; + double interval; + + ACE_Hash_Map_Iterator<ACE_UINT32, RTCP_Channel_In*, ACE_Null_Mutex> iter (this->inputs_); + iter = this->inputs_.begin(); + + if (this->output_.active ()) + senders++; + + // determine the number of senders and members of this session + while (iter != this->inputs_.end ()) + { + if ((*iter).int_id_->active ()) + { + if ((*iter).int_id_->sender ()) + senders++; + members++; + } + iter++; + } + // Here we do the RTCP timeout calculation. + interval = TAO_AV_RTCP::rtcp_interval (members, // members + senders, // senders + rtcp_bw, // rtcp_bw + this->output_.active (), // we_sent + this->packet_size_, // packet_size + &this->avg_rtcp_size_, // avg_rtcp_size + this->is_initial_timeout_); // initial) + + this->is_initial_timeout_ = 0; + ACE_NEW (tv, - ACE_Time_Value (0,this->timeout_*ACE_ONE_SECOND_IN_MSECS)); + ACE_Time_Value); + + tv->sec ((int)interval); + tv->usec ((int)((interval - (int)interval) * 1000000)); } int TAO_AV_RTCP_Callback::handle_destroy (void) { - // Here we do the send_bye. - TAO_AV_RTCP::send_report (1, - this->protocol_object_, - this->source_manager_, - this->state_, - this); - return 0; + return this->send_report(1); } int @@ -823,122 +781,40 @@ TAO_AV_RTCP_Callback::receive_frame (ACE_Message_Block *frame, TAO_AV_frame_info *, const ACE_Addr &peer_address) { - char *buf = frame->rd_ptr (); - TAO_AV_RTP::rtphdr *rh = (TAO_AV_RTP::rtphdr *)buf; + RTCP_Channel_In *c; - frame->rd_ptr (sizeof (TAO_AV_RTP::rtphdr)); - int result = this->demux (rh, - frame, - peer_address); - frame->rd_ptr (buf); + RTP_Packet packet (frame->rd_ptr(), frame->length()); - if (result < 0) - return result; + if (this->inputs_.find (packet.ssrc(), c) < 0) + { + ACE_NEW_RETURN (c, + RTCP_Channel_In (packet.ssrc(), + &peer_address), + -1); + + this->inputs_.bind (packet.ssrc(), c); + } + c->recv_rtp_packet (frame, &peer_address); return 0; } - int -TAO_AV_RTCP_Callback::receive_control_frame (ACE_Message_Block *frame, - const ACE_Addr &peer_address) +TAO_AV_RTCP_Callback::send_frame (ACE_Message_Block *frame) { - // Here we do the processing of the RTCP frames. - TAO_AV_RTCP::rtcphdr header; - int result = TAO_AV_RTCP::handle_input (frame, - peer_address, - header, - this->source_manager_, - this->state_); - if (result < 0) - ACE_ERROR_RETURN ((LM_ERROR,"TAO_AV_RTCP::handle_input failed\n"),-1); + RTP_Packet packet (frame->rd_ptr(), frame->length()); + this->output_.updateStatistics (&packet); + return 0; } -int -TAO_AV_RTCP_Callback::demux (TAO_AV_RTP::rtphdr* rh, - ACE_Message_Block *data, - const ACE_Addr &address) +void +TAO_AV_RTCP_Callback::ts_offset (ACE_UINT32 offset) { - char *bp = data->rd_ptr (); - int cc = data->length (); - if (cc < 0) - { - ++this->state_->nrunt_; - return -1; - } - ACE_UINT32 srcid = rh->rh_ssrc; - int flags = ntohs (rh->rh_flags); - if ( (flags & RTP_X) != 0) - { - /* - * the minimal-control audio/video profile - * explicitly forbids extensions - */ - ++this->state_->badext_; - return -1; - } - - // @@Naga:Maybe the framework itself could check for formats making use of - // the property service to query the formats supported for this flow. - /* - * Check for illegal payload types. Most likely this is - * a session packet arriving on the data port. - */ -// int fmt = flags & 0x7f; -// if (!check_format (fmt)) -// { -// ++state->badfmt_; -// return; -// } - - u_long addr = address.hash (); - ACE_UINT16 seqno = ntohs (rh->rh_seqno); - TAO_AV_Source* s = this->source_manager_->demux (srcid, addr, seqno); - if (s == 0) - /* - * Takes a pair of validated packets before we will - * believe the source. This prevents a runaway - * allocation of Source data structures for a - * stream of garbage packets. - */ - return -1; - - ACE_Time_Value now = ACE_OS::gettimeofday (); - s->lts_data (now); - s->sts_data (rh->rh_ts); - - long cnt = (flags >> 8) & 0xf; - if (cnt > 0) - { - u_char* nh = (u_char*)rh + (cnt << 2); - while (--cnt >= 0) - { - ACE_UINT32 csrc = * (ACE_UINT32*)bp; - bp += 4; - TAO_AV_Source* cs = this->source_manager_->lookup (csrc, srcid, addr); - cs->lts_data (now); - cs->action (); - } - /*XXX move header up so it's contiguous with data*/ - TAO_AV_RTP::rtphdr hdr = *rh; - rh = (TAO_AV_RTP::rtphdr*)nh; - *rh = hdr; - } - else - s->action (); - - return 0; - /* - * This is a data packet. If the source needs activation, - * or the packet format has changed, deal with this. - * Then, hand the packet off to the packet handler. - * XXX might want to be careful about flip-flopping - * here when format changes due to misordered packets - * (easy solution -- keep rtp seqno of last fmt change). - */ + this->timestamp_offset_ = offset; } + ACE_FACTORY_DEFINE (TAO_AV, TAO_AV_RTCP_Flow_Factory) ACE_STATIC_SVC_DEFINE (TAO_AV_RTCP_Flow_Factory, ACE_TEXT ("RTCP_Flow_Factory"), @@ -949,4 +825,26 @@ ACE_STATIC_SVC_DEFINE (TAO_AV_RTCP_Flow_Factory, 0) +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) + +template class ACE_Hash_Map_Entry<ACE_UINT32,RTCP_Channel_In *>; +template class ACE_Hash_Map_Manager<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex>; +template class ACE_Hash_Map_Manager_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex>; +template class ACE_Hash_Map_Reverse_Iterator_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex>; + +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) + +#pragma instantiate ACE_Hash_Map_Entry<ACE_UINT32,RTCP_Channel_In *> +#pragma instantiate ACE_Hash_Map_Manager<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Manager_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Iterator_Base_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator<ACE_UINT32,RTCP_Channel_In *,ACE_Null_Mutex> +#pragma instantiate ACE_Hash_Map_Reverse_Iterator_Ex<ACE_UINT32, RTCP_Channel_In *, ACE_Hash<ACE_UINT32>, ACE_Equal_To<ACE_UINT32>, ACE_Null_Mutex> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |