diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp | 1130 |
1 files changed, 1130 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp new file mode 100644 index 00000000000..ae050b3d595 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp @@ -0,0 +1,1130 @@ +/* $Id$ */ + +/* Copyright (c) 1995 Oregon Graduate Institute of Science and Technology + * P.O.Box 91000-1000, Portland, OR 97291, USA; + * + * Permission to use, copy, modify, distribute, and sell this software and its + * documentation for any purpose is hereby granted without fee, provided that + * the above copyright notice appear in all copies and that both that + * copyright notice and this permission notice appear in supporting + * documentation, and that the name of O.G.I. not be used in advertising or + * publicity pertaining to distribution of the software without specific, + * written prior permission. O.G.I. makes no representations about the + * suitability of this software for any purpose. It is provided "as is" + * without express or implied warranty. + * + * O.G.I. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING + * ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL + * O.G.I. BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY + * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN + * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Author: Shanwei Cen + * Department of Computer Science and Engineering + * email: scen@cse.ogi.edu + */ + +extern int vsp[2]; +extern void set_exit_routine_tag(int tag); + +#include "ace/OS.h" +#include "vb.h" + +ACE_RCSID(mpeg_client, vb, "$Id$") + +block ** VideoBuffer::head = 0; +block ** VideoBuffer::tail = 0; +char * VideoBuffer::buf = 0; +int VideoBuffer::bufsize = -1; +int VideoBuffer::sid = -1; +int VideoBuffer::countid = -1; +int VideoBuffer::exit_tag = -1; +int VideoBuffer::conn_tag = -1; +int VideoBuffer::savedSocket = -1; + +//constructor. +VideoBuffer::VideoBuffer (void) + :msg (0), + packet (0), + msgsn (-1), + ptr (0), + ptr1 (0), + tmp_buf (0), + cmdsn (-1), + fb_state (0), + qosRecomputes (0), + f (0), + fa (0), + reach_limit (0), + not_action (1) +#ifdef STAT + ,to_count (1), + gap_msgsn (-1) +#endif +{ +} + +// Destructor. +VideoBuffer::~VideoBuffer (void) +{ + if (ACE_Reactor::instance ()->remove_handler (this->handler_,ACE_Event_Handler::READ_MASK) == -1) + ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for Video_Notification_Handler\n")); + + delete this->handler_; + if (ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK) == -1) + ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for VideoBuffer\n")); +} + +/* size in byte */ +void +VideoBuffer::VBinitBuf (int size) +{ + bufsize = size - sizeof(struct header); + buf = creat_shared_mem(size); + head = &((struct header *)buf)->h; + tail = &((struct header *)buf)->t; + buf += sizeof(struct header); + sid = creat_semaphore(); + countid = creat_semaphore(); + enter_cs(countid); + *head = *tail = (struct block *)buf; + (*tail)->full = 0; + (*tail)->next = NULL; + (*tail)->shcode = SHCODE; +} + +/* block version */ +char* +VideoBuffer::VBgetBuf (int size) +{ + return 0; +} + +/* non-block check, return True/False*/ +int +VideoBuffer::VBcheckBuf (int size) +{ + return 0; +} + +void +VideoBuffer::VBputMsg (char * msgPtr) +{ +} + +/* block version */ +char * +VideoBuffer::VBgetMsg () +{ + char *vb_ptr; + +#ifdef STAT + if (shared->collectStat && *head == *tail) + shared->stat.VBemptyTimes ++; +#endif + // ACE_DEBUG ((LM_DEBUG,"(%P)waiting for countid\n")); + enter_cs(countid); + enter_cs(sid); + while (*tail != *head && (*tail)->full == 0) + *tail = (*tail)->next; + leave_cs(sid); + if (*head == *tail) + { + fprintf(stderr, "VB: getMsg run out of msg unexpectedly.\n"); + ACE_OS::exit (1); + } + vb_ptr = ((char*)*tail)+sizeof(**tail)+sizeof(VideoMessage); + + // fprintf(stderr,"VBgetMsg: buf:%x, msg:%x\n", (int)buf, (int)vb_ptr); + + return vb_ptr; +} + +/* non-block check, return Number of Msgs in buffer */ +int +VideoBuffer::VBcheckMsg () +{ + return get_semval(countid); +} + +int +VideoBuffer::VBbufEmpty (void) +{ + /* + Fprintf(stderr, "VB countid %d\n", get_semval(countid)); + */ + return get_semval(countid) <= 0; +} + +void +VideoBuffer::VBreclaimMsg (char * msgPtr) +{ + enter_cs(sid); + *tail = (*tail)->next; + leave_cs(sid); +} + +void +VideoBuffer::VBdeleteBuf (void) +{ + remove_shared_mem (buf - sizeof(struct header)); +} + +void +VideoBuffer::VBdeleteSem (void) +{ + remove_semaphore(sid); + remove_semaphore(countid); +} + +int +VideoBuffer::VBprocess (int init_socket, int normal_socket) +{ + this->initSocket = init_socket; + this->normalSocket = normal_socket; + msgsn = -1; + dataSocket = initSocket; + exit_tag = 0; + conn_tag = shared->videoMaxPktSize; + savedSocket = normalSocket; + + // ACE_DEBUG ((LM_DEBUG,"VideoBuffer::VBProcess ()\n")); + /* buffer big enough for discard mode packet stream */ + if (conn_tag < 0) + { + tmp_buf = (char *)ACE_OS::malloc(-conn_tag); + if (tmp_buf == NULL) { + fprintf(stderr, "AB failed to allocate %d bytes"); + ACE_OS::perror ("of tmp_buf"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + } + + ACE_NEW_RETURN (this->handler_, + Video_Notification_Handler (), + -1); + + // Register the notification handler with the reactor. + int result = ACE_Reactor::instance ()->register_handler (this->handler_, + ACE_Event_Handler::READ_MASK); + if (result != 0) + return result; + + result = ACE_Reactor::instance ()->register_handler (this, + ACE_Event_Handler::READ_MASK); + + if (result != 0) + return result; + + this->state_ = READ_HEADER; + temp = (char *)&msghd; + bytes = sizeof (msghd); + return 0; +} + +ACE_HANDLE +VideoBuffer::get_handle (void) const +{ + if (this->socket_flag_) + return this->normalSocket; + else + return this->initSocket; +} + +int +VideoBuffer::handle_input (ACE_HANDLE fd) +{ + // ACE_DEBUG ((LM_DEBUG,"VideoBuffer::handle_input:state = %d\n",this->state_)); + switch (this->state_) + { + case READ_NEXT_HEADER: + case READ_HEADER: + { + if (conn_tag >= 0) + len = ACE_OS::read (dataSocket,temp,bytes); + else + { + len = ACE_OS::read (dataSocket,tmp_buf,-conn_tag); + // fprintf (stderr,"VB read packet len = %d\n",len); + ACE_OS::memcpy ((char *)&msghd, tmp_buf, sizeof(msghd)); + } + if (len == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + perror("VB sleep for 10ms"); + usleep(10000); + // set the pointers before going into the next loop. + temp = (char *)&msghd; + bytes = sizeof (msghd); + return 0; + } + ACE_OS::perror ("VB ACE_OS::read () data"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + if (len == 0) { /* EOF, connection closed by peer */ + fprintf(stderr, "Error: VB found dataSocket broken\n"); + for (;;) { + usleep(1000000); + } + } + if (conn_tag >= 0) + { + temp += len; + bytes -= len; + if (bytes != 0) + return 0; + else + len = sizeof (msghd); + } + if (len < sizeof(msghd)) + { + // go back to reading the next header. + temp = (char *)&msghd; + bytes = sizeof (msghd); + fprintf(stderr, "VD warn: PEEK1ed %dB < expected %dB\n",len, sizeof(msghd)); + // continue; + } +#ifdef NeedByteOrderConversion + msghd.packetsn = ntohl(msghd.packetsn); + msghd.packetSize = ntohl(msghd.packetSize); + msghd.msgsn = ntohl(msghd.msgsn); + msghd.msgOffset = ntohl(msghd.msgOffset); + msghd.msgSize = ntohl(msghd.msgSize); +#endif + + if (this->state_ == READ_NEXT_HEADER) + { +#ifdef STAT + { + int gap = msghd.msgsn - gap_msgsn; + gap = (gap >MSGGAP_MAX) ? MSGGAP_MAX : gap < MSGGAP_MIN ? MSGGAP_MIN : gap; + shared->stat.VBmsgGaps[gap - MSGGAP_MIN] ++; + if (gap >0) gap_msgsn = msghd.msgsn; + } +#endif + if (msghd.msgsn <= msgsn) + { /* outdated message, wait for next one */ + + fprintf(stderr, "VB discard outdated or dup msgsn %d, pktsn %d\n", + msghd.msgsn, msghd.packetsn); + + this->state_ = SKIP_NEXT_MESSAGE; + bytes = msghd.msgSize; + // skip_message(dataSocket, &msghd); + // continue; + return 0; + } + + if ((msghd.msgsn > msgsn + 1) || (msghd.msgOffset == 0)) + { + /* message out of order, abandon current packet */ + /* + fprintf(stderr, "VB msg out of order for current packet, discard it.\n"); + */ +#ifdef STAT + to_count = 0; +#endif + } + else + { + // ACE_DEBUG ((LM_DEBUG,"assigning next msgsn %d\n",msghd.msgsn)); + msgsn = msghd.msgsn; + this->state_ = READ_MESSAGE; + temp = ptr +sizeof (msghd); + bytes = msghd.msgSize; + // make a recursive call as we just have to do a memcpy from the buffer. + this->handle_input (dataSocket); + return 0; + } + } + + // fprintf(stderr, "VB PEEK1 a msg sn-%d, size-%d, pkt-%d, pktsize-%d\n",msghd.msgsn, msghd.msgSize, msghd.packetsn, msghd.packetSize); + +#ifdef STAT + if (to_count) { + int gap = msghd.msgsn - gap_msgsn; + gap = (gap >MSGGAP_MAX) ? MSGGAP_MAX : gap < MSGGAP_MIN ? MSGGAP_MIN : gap; + shared->stat.VBmsgGaps[gap - MSGGAP_MIN] ++; + if (gap >0) gap_msgsn = msghd.msgsn; + } + to_count = 1; +#endif + if (msghd.msgsn <= msgsn) /* outdated msg */ + { + fprintf(stderr, "VB discard outdated msgsn %d, pktsn %d when expecting first %d\n", + msghd.msgsn, msghd.packetsn,msgsn); + this->state_ = SKIP_MESSAGE; + bytes = msghd.msgSize; + // skip_message(dataSocket, &msghd); + // continue; + return 0; + } + else if (msghd.msgOffset != 0) /* not first msg of a packet */ + { + + /* + Fprintf(stderr, "VB discard non-first msg msgsn %d, pktsn %d\n", + msghd.msgsn, msghd.packetsn); + */ + this->state_ = SKIP_MESSAGE; + bytes = msghd.msgSize; + // skip_message(dataSocket, &msghd); + // continue; + return 0; + } + else + { + // ACE_DEBUG ((LM_DEBUG,"assigning msgsn %d\n",msghd.msgsn)); + msgsn = msghd.msgsn; + } + + /* allocate packet for the incoming msg */ + bsize = msghd.packetSize + sizeof(**head)*2 + sizeof(msghd); + bsize = ((bsize+3)>>2)<<2; + enter_cs(sid); + if (*head >= *tail) + { + if (bufsize - (int)((char*)*head - buf) >= bsize ) + msg =(VideoMessage *)((char*)*head + sizeof(**head)); + else if ((int)((char*)*tail - buf) >= bsize) + { + (*head)->next = (struct block *)buf; + (*head)->full = 0; + *head = (struct block *)buf; + msg = (VideoMessage *)(buf + sizeof(**head)); + *head = (struct block *)buf; + (*head)->shcode = SHCODE; + } + else /* not enough buffer, discard current message */ + { + leave_cs(sid); +#ifdef STAT + if (shared->collectStat) + shared->stat.VBdroppedFrames ++; +#endif + /* + Fprintf(stderr, "VB not enough space 1, drop msg.sn %d pktsn %d\n", + msghd.msgsn, msghd.packetsn); + */ + this->state_ = SKIP_MESSAGE; + bytes = msghd.msgSize; + // skip_message(dataSocket, &msghd); + // continue; + return 0; + } + } + else /* *head < *tail */ + if ((char*)*tail - (char*)*head >= bsize) + msg = (VideoMessage *)((char*)*head + sizeof(**head)); + else /* not enough buffer, abandon current message */ + { + leave_cs(sid); +#ifdef STAT + if (shared->collectStat) + shared->stat.VBdroppedFrames ++; +#endif + /* + Fprintf(stderr, "VB not enough space 1, drop msg.sn %d pktsn %d\n", + msghd.msgsn, msghd.packetsn); + + */ + this->state_ = SKIP_MESSAGE; + bytes = msghd.msgSize; + // skip_message(dataSocket, &msghd); + // continue; + return 0; + } + leave_cs(sid); + + //fprintf(stderr, "VB allocated a buffer for comming packet.\n"); + + psize = msghd.packetSize; + poffset = 0; + packet = (VideoPacket *)((char*)msg + sizeof(msghd)); + *(((int*)packet)+(msghd.packetSize>>2)) = 0; + /* clear the last no more than three bytes, for + proper detecting the end of packet by VD */ + ptr = (char*)msg; + this->state_ = READ_MESSAGE; + temp = ptr +sizeof (msghd); + bytes = msghd.msgSize; + } + break; + case SKIP_NEXT_MESSAGE: + case SKIP_MESSAGE: + { + char buffer[BUFSIZ]; + if (conn_tag >= 0) + { + int size = bytes > BUFSIZ ? BUFSIZ : bytes; + int res =ACE_OS::read (dataSocket, buffer, size); + bytes -= res; + + if (bytes != 0) + return 0; + } + if (this->state_ == SKIP_MESSAGE) + this->state_ = READ_HEADER; + else if (this->state_ == SKIP_NEXT_MESSAGE) + this->state_ = READ_NEXT_HEADER; + temp = (char *)&msghd; + bytes = sizeof (msghd); + break; + } + case READ_MESSAGE: + { + if (conn_tag >= 0) + { + int val; + val = ACE_OS::read (dataSocket,temp,bytes); + + if (val == -1 && (errno == EINTR || errno == EAGAIN | errno == EWOULDBLOCK)) + { /* interrupted or need to wait, try again */ + if (errno == EAGAIN | errno == EWOULDBLOCK) usleep(10000); + errno = 0; + return 0; + } + if (val == -1) + { + ACE_OS::perror ("Error -- Read from socket"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + if (val == 0) /* EOF encountered */ + { + ACE_DEBUG ((LM_DEBUG, "Error -- EOF reached while trying to read %d bytes.\n")); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + temp += val; + bytes -= val; + if (bytes < 0) /* weird thing is happening */ + { + ACE_DEBUG ((LM_DEBUG, "Error: read too much from socket, %d out of %d bytes.\n")); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + if (bytes != 0) + return 0; + } + else + memcpy(temp, tmp_buf + sizeof(msghd), bytes); + poffset += msghd.msgSize; + psize -= msghd.msgSize; + ptr += msghd.msgSize; + + // fprintf(stderr, "VB packet remain size %d\n", psize); + + if (psize == 0) + { + // ACE_DEBUG ((LM_DEBUG,"finished receiving current packet\n")); + /* finished receiving the current packet */ +#ifdef NeedByteOrderConversion + packet->cmd = ntohl(packet->cmd); + packet->cmdsn = ntohl(packet->cmdsn); + packet->sh = ntohl(packet->sh); + packet->gop = ntohl(packet->gop); + packet->frame = ntohl(packet->frame); + packet->display = ntohl(packet->display); + packet->future = ntohl(packet->future); + packet->past = ntohl(packet->past); + packet->currentUPF = ntohl(packet->currentUPF); + packet->dataBytes = ntohl(packet->dataBytes); +#endif + pcmdsn = packet->cmdsn; + pcmd = packet->cmd; + pfid = packet->frame; + pgop = packet->gop; + shared->VBheadFrame = (pcmd == CmdPLAY) ? pfid : pgop; + + shared->currentUPF = packet->currentUPF; + enter_cs(sid); + (*head)->full = 1; + psize = sizeof(**head) + sizeof(*msg) + msghd.packetSize; + psize = ((psize+3)>>2)<<2; + ptr = (char*)*head + psize; + (*head)->next = (struct block *) ptr; + (*head) = (struct block *)ptr; + (*head)->shcode = SHCODE; + leave_cs(countid); + leave_cs(sid); + + /* VB receives all frame except for the INIT one through normalSocket */ + if (dataSocket != normalSocket) + { + this->socket_flag_ = 1; + fprintf(stderr, "VB got INIT frame.\n"); + ACE_OS::write (initSocket, (char *)&initSocket, 1); /* write a garbage byte */ + // ACE_OS::close (initSocket); + // dataSocket = normalSocket; + // int result = ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK); + // if (result != 0) + // ACE_DEBUG ((LM_DEBUG,"remove handler failed for read_mask\n")); + + } + + /* following is synchronization feedback algorithm */ + this->sync_feedback (); + if (dataSocket != normalSocket) + { + dataSocket = normalSocket; + int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::READ_MASK); + if (result != 0) + ACE_DEBUG ((LM_DEBUG,"register handler failed for read_mask after datasocket change\n")); + return -1; + } + // return 0; + break; /* got the whole packet, break to the out-most loop for next packet */ + } /* end if (psize == 0) */ + else if (psize < 0) + { + fprintf(stderr, "VB error: received too many msgs for a packet.\n"); + ACE_Reactor::instance ()->end_event_loop (); + return -1; + } + this->state_ = READ_NEXT_HEADER; + temp = (char *)&msghd; + bytes = sizeof (msghd); + break; + } + } + return 0; +} + +int +VideoBuffer::handle_output (ACE_HANDLE fd) +{ + if ((this->state_ == WRITE_FEEDBACK1) || (this->state_ == WRITE_FEEDBACK2)) + { + // send the feedback to the server. + VideoFeedBackPara para; + para.cmdsn = htonl(shared->cmdsn); + para.addUsecPerFrame = htonl(fb_addupf); + para.addFrames = htonl(fb_addf); + para.needHeader = htonl(shared->needHeader); + shared->needHeader = 0; + para.frameRateLimit1000 = + htonl((long)(shared->frameRateLimit * 1000.0)); + para.sendPatternGops = htonl(shared->sendPatternGops); + ACE_OS::memcpy (para.sendPattern, shared->sendPattern, PATTERN_SIZE); + + // fprintf(stderr, "VB to send a fb packet..."); + + int res; + if (conn_tag != 0) + { /* packet stream */ + if (temp == 0) + { + temp = (char *)¶ + bytes = sizeof (para); + } + res = ACE_OS::write (dataSocket, temp, bytes); + if (res == -1) + { + if (errno == EINTR) + return 0; + if (errno == ENOBUFS) { + perror("VB Warning, fb packet discarded for"); + // Here we should handle the return -1 case! + fb_state = 4; + } + else + { + ACE_OS::perror ("VB error, fb packet sending failed"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + } + else if (res == 0) + { + ACE_OS::perror ("VB error, sending fb,socket closed"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + else + { + temp += res; + bytes -= res; + if (bytes != 0) + return 0; + } + } + else + { + res = ACE_OS::write (dataSocket, (char *)¶, sizeof(para)); + if (res == -1) + { + ACE_OS::perror ("VB error, fb packet sending failed"); + ACE_Reactor::instance ()->end_event_loop (); return -1; + } + if (res < sizeof(para)) + { + fprintf(stderr, "VB send_feedback() warn: res %dB < sizeof(para) %dB\n", + res, sizeof(para)); + } + } + if (errno != ENOBUFS) // fb_state == 4; + qosRecomputes = len; + ACE_Reactor::instance ()->remove_handler (this, + ACE_Event_Handler::WRITE_MASK); + if (this->state_ == WRITE_FEEDBACK2) + { + if (fb_state == 6) + { /* record the time if an action packet is + successfully send, and indicate that an + feedback action leads to state 6, which after + delay sometime leads to state 3. + The action_delay should have been related + to round-trip time. */ + action_time = get_usec(); + action_delay = shared->usecPerFrame * 100; + not_action = 0; + } + } + else if (this->state_ == WRITE_FEEDBACK2) + { + this->feedback_action (); + } +#ifdef STAT + { + int i; + if ((i = shared->stat.fbPacketNumber) < MAX_FB_PACKETS) { + shared->stat.fbPackets[i].frameId = shared->nextFrame; + shared->stat.fbPackets[i].addUsecPerFrame = addupf; + shared->stat.fbPackets[i].addFrames = addf; + shared->stat.fbPackets[i].frames = shared->sendPatternGops * + shared->patternSize; + shared->stat.fbPackets[i].framesDropped = shared->framesDropped; + shared->stat.fbPackets[i].frameRateLimit = shared->frameRateLimit; + shared->stat.fbPackets[i].advance = advance; + } + shared->stat.fbPacketNumber ++; + } +#endif + // Now return to the reading header position. + this->state_ = READ_HEADER; + temp = (char *)&msghd; + bytes = sizeof (msghd); + int result = ACE_Reactor::instance ()->remove_handler (this, + ACE_Event_Handler::WRITE_MASK); + if (result != 0) + ACE_DEBUG ((LM_DEBUG,"remove_handler failed for write")); + } + return 0; +} + +int +VideoBuffer::sync_feedback (void) +{ + int result; + if (shared->config.syncEffective) + { + if (fb_state > 1 && fb_state != 4 && (len = shared->qosRecomputes) != qosRecomputes) + { + /* QoS feedback packet is sent if at any time send pattern is + recomputed, and sync feedback is not in active fb_state*/ + this->state_ = WRITE_FEEDBACK1; + result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK); + if (result != 0) + return result; + fb_addupf = 0; + fb_addf = 0; + fb_advance = advance; + temp = 0; + return 0; + } + this->feedback_action (); + } /* end if (shared->config.syncEffective) */ + else + fb_state = 0; + return 0; +} + +int +VideoBuffer::feedback_action (void) +{ + switch (fb_state) + { + case 4: /* active */ + if (pcmdsn != cmdsn) + { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) + fb_state = 0; + else + fb_state = 1; + break; + } + if (pcmd == CmdPLAY && shared->usecPerFrame != upf) + { + /* jump to fb_state 5 if speed changes */ + fb_state = 5; + break; + } + { + int interval = shared->usecPerFrame; + double val = (double)(pcmd == CmdPLAY ? + pfid - shared->nextFrame : + (pcmd == CmdFF ? + pgop - shared->nextGroup : + shared->nextGroup - pgop)); + fv = DoFilter(f, val); /* get average #frames in the whole client + pipeline, including all stages */ + val = val - fv; + fav = DoFilter(fa, val >= 0.0 ? val : -val); + /* get average #frames jitter in the whole client pipeline */ + + val = fav * interval * 6; + /* convert deviation in frame into microseconds, 6 is a magic number */ + + /* tries to recompute advance (in microseconds), and med/high/low + in adaptation to current jitter level */ + if ((val > advance && !reach_limit) || + (advance > min_advance && val < advance >> 3)) { + advance = (int) max(2 * val, min_advance); + med = advance / interval; + /* + if (pcmd == CmdPLAY) { + if (med < shared->VDframeNumber) { + med = shared->VDframeNumber; + } + } + else + */ + if (med < 2 ) { /* but keep minimum buffer fill level */ + med = 2; + } + if (med > (VB_BUF_SIZE / shared->averageFrameSize) / 2) { + reach_limit = 1; + med = (VB_BUF_SIZE / shared->averageFrameSize) / 2; + Fprintf(stderr, + "VB VSadvance control: VBbuf limit reached, med %d.\n", med); + } + else reach_limit = 0; + high = med + med / 2; + low = med - med / 2; + period = med * MAX_CLOCK_DRIFT; + Fprintf(stderr, + "VB: VS advance control: fav %5.2f, med %d, advance %d at nextFrame %d\n", + fav, med, advance, shared->nextFrame); + } + } + /* record the current time (fid or gopid) if buffer fill level drift + only very little */ + if ((int)fv == med) + { + init_fv = fv; + init_pos = pcmd == CmdPLAY ? pfid : pgop; + break; + } + /* + fprintf(stderr, "VB fb: fv %lf\n", fv); + */ + /* try send action if low/high water mark is passed, or qos recomputed */ + /* There is problem here, the deltas of upf are sent, instead of upf + and frame themself. This scheme is not robust in case case feedback + packets are lost, and get resent */ + len = shared->qosRecomputes; + if (fv >= high || fv <= low || len != qosRecomputes) + { + int addupf, addf; + int pos = pcmd == CmdPLAY ? pfid : pgop; + int dist = (int)(pcmd == CmdFB ? init_pos - pos : pos - init_pos); + if (fv >= high || fv <= low) + { + if (dist < period) + { /* try skip or stall */ + addf = (int)(med - fv); + addupf = 0; + } + else + { /* try adjust VS clock rate */ + int added = (int)((double)cupf * (fv - (double)med) / (double) dist); + addf = (int)(med - fv); + addupf = added; + cupf += added; + } + fb_state = 6; + } + else + { /* fb only recomputed sendpattern, no fb_state change */ + addupf = 0; + addf = 0; + } + + /* tries to send a feedback packet. */ + if (shared->live) + /* no sync feedback with live video */ + qosRecomputes = len; + else + { + this->state_ = WRITE_FEEDBACK2; + int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK); + if (result != 0) + return result; + fb_addupf = addupf; + fb_addf = addf; + fb_advance = advance; + return 0; + } + if (fb_state == 6) + { /* record the time if an action packet is + successfully send, and indicate that an + feedback action leads to fb_state 6, which after + delay sometime leads to fb_state 3. + The action_delay should have been related + to round-trip time. */ + action_time = get_usec(); + action_delay = shared->usecPerFrame * 100; + not_action = 0; + } + } + break; + case 6: /* reset after action */ + if (pcmdsn != cmdsn) + { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) + fb_state = 0; + else + fb_state = 1; + break; + } + if (pcmd == CmdPLAY && shared->usecPerFrame != upf) + { + fb_state = 5; + break; + } + /* Jump to transition fb_state 3 only after delay for some + time, when feedback action has been taken, and the + effect has been propogated back to the client */ + if (get_duration(action_time, get_usec()) >= action_delay) + fb_state = 3; + break; + case 5: /* reset after speed change, feedback stays in this + fb_state as long as play speed is changing */ + if (pcmdsn != cmdsn) + { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) + fb_state = 0; + else + fb_state = 1; + break; + } + /* Jump to transition fb_state 3, indicating that the transition + is not caused by feedback action */ + if (shared->currentUPF == shared->usecPerFrame) + { + not_action = 1; + fb_state = 3; + } + break; + case 2: /* delay after start, this delay is for avoiding feedback + action during server fast start-up. */ + if (pcmdsn != cmdsn) { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) { + fb_state = 0; + } + else { + fb_state = 1; + } + break; + } + if (pcmd == CmdPLAY) { + if (pfid - startpos >= delay) { + advance = shared->VStimeAdvance; + fb_state = 3; + } + } + else if (pcmd == CmdFF) { + if (pgop - startpos >= delay) { + advance = shared->VStimeAdvance; + fb_state = 3; + } + } + else { /* CmdFB */ + if (startpos - pgop >= delay) { + advance = shared->VStimeAdvance; + fb_state = 3; + } + } + break; + case 3: /* transient fb_state, entered after start-up delay, + action-delay, or play-speed change */ + if (pcmdsn != cmdsn) { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) { + fb_state = 0; + } + else { + fb_state = 1; + } + break; + } + /* Initialize both buffer-fill-level and jitter filters */ + if (f == NULL) { + f = NewFilter(FILTER_LOWPASS, shared->config.filterPara >= 1 ? + shared->config.filterPara : 100); + } + else { + f = ResetFilter(f, shared->config.filterPara >= 1 ? + shared->config.filterPara : 100); + } + if (fa == NULL) { + fa = NewFilter(FILTER_LOWPASS, shared->config.filterPara >= 1 ? + shared->config.filterPara : 100); + DoFilter(fa, 0.0); + } + else if (not_action) { /* reset jitter level filter only + if entering this fb_state is not + cause by feedback action */ + fa = ResetFilter(fa, shared->config.filterPara >= 1 ? + shared->config.filterPara : 100); + } + if (f == NULL || fa == NULL) { + ACE_OS::perror ("VB failed to allocate space for filters"); + fb_state = 0; + } + else { + init_fv = + DoFilter(f, (double)(pcmd == CmdPLAY ? + pfid - shared->nextFrame : + (pcmd == CmdFF ? + pgop - shared->nextGroup : + shared->nextGroup - pgop))); + init_pos = pcmd == CmdPLAY ? pfid : pgop; + upf = shared->currentUPF; + cupf = upf; + { + int interval = shared->usecPerFrame; + + /* upon speed change, 'advance', in microseconds, will + not change, but med/high/low will be updated. This + may suggest that in the new toolkit version of the + same feedback systems, the buffer-fill level and + jitter is measured directly in microseconds, not in + #frames then converting to microseconds. */ + med = advance / interval; + /* + if (pcmd == CmdPLAY) { + if (med < shared->VDframeNumber) { + med = shared->VDframeNumber; + min_advance = advance = med * interval; + } + } + else + */ + if (med < 2 ) { + med = 2; + advance = med * interval; + min_advance = max(advance, shared->VStimeAdvance); + } + else min_advance = shared->VStimeAdvance; + if (med > (VB_BUF_SIZE / shared->averageFrameSize) / 2) { + reach_limit = 1; + med = (VB_BUF_SIZE / shared->averageFrameSize) / 2; + if (not_action) { + Fprintf(stderr, + "VB start/speed-change: VBbuf limit reached, med %d.\n", med); + } + } + else reach_limit = 0; + } + high = med + med / 2; + low = med - med / 2; + delay = med * (SPEEDUP_INV_SCALE - 1); /* this delay is to avoid + feedback when VS is in + fast-start period */ + period = med * MAX_CLOCK_DRIFT; + if (not_action) { + Fprintf(stderr, + "VB start/speed-change: med %d, advance %d at nextFrame %d\n", + med, advance, shared->nextFrame); + } + fb_state = 4; + } + break; + case 0: /* idle */ + if (pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB) { + cmdsn = pcmdsn; + fb_state = 1; + } + break; + case 1: /* start */ + if (pcmdsn != cmdsn) { + cmdsn = pcmdsn; + if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) { + fb_state = 0; + } + break; + } + startpos = pcmd == CmdPLAY ? pfid : pgop; + advance = shared->VStimeAdvance; + + /* following from vs.c: + timerAdjust = (VStimeAdvance * SPEEDUP_INV_SCALE) / currentUPF; + */ + med = advance / shared->usecPerFrame; + /* + if (pcmd != CmdPLAY) med /= shared->patternSize; + */ + delay = med * (SPEEDUP_INV_SCALE - 1); /* this delay is to avoid + feedback when VS is in + fast-start period */ + not_action = 1; + fb_state = 2; + break; + default: + fprintf(stderr, "VB: unknown Feedback fb_state %d reached.\n", fb_state); + fb_state = 0; + break; + } +} + +ACE_HANDLE +Notification_Handler::get_handle (void) const +{ + return -1; +} + +int +Notification_Handler::handle_input (ACE_HANDLE fd) +{ + ACE_DEBUG ((LM_DEBUG,"Notification_Handler::handle_input")); + char message[BUFSIZ]; + message [0] = 0; + // used to indicate that we should exit. + int result = + ACE_OS::read (fd,message,BUFSIZ); + + ACE_DEBUG ((LM_DEBUG,"result:%d,message[0]:%d\n",result,message[0])); + if (result == 0) + { + ACE_DEBUG ((LM_DEBUG,"AB process exiting, notification socket eof while reading\n")); + ACE_Reactor::instance ()->end_event_loop (); + return -1; + } + if (result == -1) + { + ACE_DEBUG ((LM_DEBUG,"AB process exiting, notification socket error while reading\n")); + ACE_Reactor::instance ()->end_event_loop (); + return -1; + } + ACE_DEBUG ((LM_DEBUG," %d %d\n",result,message[0])); + + switch (message[0]) + { + case EXIT: + ACE_DEBUG ((LM_DEBUG,"VB process exiting because of exit signal\n")); + set_exit_routine_tag(0); + VideoBuffer::VBdeleteBuf(); + ACE_Reactor::instance ()->end_event_loop (); + return -1; + default: + break; + } + return 0; +} + +ACE_HANDLE +Video_Notification_Handler::get_handle (void) const +{ + return vsp[1]; +} |