summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp1129
1 files changed, 0 insertions, 1129 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp
deleted file mode 100644
index d8b42ba1f42..00000000000
--- a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/vb.cpp
+++ /dev/null
@@ -1,1129 +0,0 @@
-/* $Id$ */
-
-/* Copyright (c) 1995 Oregon Graduate Institute of Science and Technology
- * P.O.Box 91000-1000, Portland, OR 97291, USA;
- *
- * Permission to use, copy, modify, distribute, and sell this software and its
- * documentation for any purpose is hereby granted without fee, provided that
- * the above copyright notice appear in all copies and that both that
- * copyright notice and this permission notice appear in supporting
- * documentation, and that the name of O.G.I. not be used in advertising or
- * publicity pertaining to distribution of the software without specific,
- * written prior permission. O.G.I. makes no representations about the
- * suitability of this software for any purpose. It is provided "as is"
- * without express or implied warranty.
- *
- * O.G.I. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
- * ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL
- * O.G.I. BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY
- * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
- * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- *
- * Author: Shanwei Cen
- * Department of Computer Science and Engineering
- * email: scen@cse.ogi.edu
- */
-
-extern int vsp[2];
-extern void set_exit_routine_tag(int tag);
-
-#include "vb.h"
-
-ACE_RCSID(mpeg_client, vb, "$Id$")
-
-block ** VideoBuffer::head = 0;
-block ** VideoBuffer::tail = 0;
-char * VideoBuffer::buf = 0;
-int VideoBuffer::bufsize = -1;
-int VideoBuffer::sid = -1;
-int VideoBuffer::countid = -1;
-int VideoBuffer::exit_tag = -1;
-int VideoBuffer::conn_tag = -1;
-int VideoBuffer::savedSocket = -1;
-
-//constructor.
-VideoBuffer::VideoBuffer (void)
- :msg (0),
- packet (0),
- msgsn (-1),
- ptr (0),
- ptr1 (0),
- tmp_buf (0),
- cmdsn (-1),
- fb_state (0),
- qosRecomputes (0),
- f (0),
- fa (0),
- reach_limit (0),
- not_action (1)
-#ifdef STAT
- ,to_count (1),
- gap_msgsn (-1)
-#endif
-{
-}
-
-// Destructor.
-VideoBuffer::~VideoBuffer (void)
-{
- if (ACE_Reactor::instance ()->remove_handler (this->handler_,ACE_Event_Handler::READ_MASK) == -1)
- ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for Video_Notification_Handler\n"));
-
- delete this->handler_;
- if (ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK) == -1)
- ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for VideoBuffer\n"));
-}
-
-/* size in byte */
-void
-VideoBuffer::VBinitBuf (int size)
-{
- bufsize = size - sizeof(struct header);
- buf = creat_shared_mem(size);
- head = &((struct header *)buf)->h;
- tail = &((struct header *)buf)->t;
- buf += sizeof(struct header);
- sid = creat_semaphore();
- countid = creat_semaphore();
- enter_cs(countid);
- *head = *tail = (struct block *)buf;
- (*tail)->full = 0;
- (*tail)->next = NULL;
- (*tail)->shcode = SHCODE;
-}
-
-/* block version */
-char*
-VideoBuffer::VBgetBuf (int size)
-{
- return 0;
-}
-
-/* non-block check, return True/False*/
-int
-VideoBuffer::VBcheckBuf (int size)
-{
- return 0;
-}
-
-void
-VideoBuffer::VBputMsg (char * msgPtr)
-{
-}
-
-/* block version */
-char *
-VideoBuffer::VBgetMsg ()
-{
- char *vb_ptr;
-
-#ifdef STAT
- if (shared->collectStat && *head == *tail)
- shared->stat.VBemptyTimes ++;
-#endif
- // ACE_DEBUG ((LM_DEBUG,"(%P)waiting for countid\n"));
- enter_cs(countid);
- enter_cs(sid);
- while (*tail != *head && (*tail)->full == 0)
- *tail = (*tail)->next;
- leave_cs(sid);
- if (*head == *tail)
- {
- fprintf(stderr, "VB: getMsg run out of msg unexpectedly.\n");
- ACE_OS::exit (1);
- }
- vb_ptr = ((char*)*tail)+sizeof(**tail)+sizeof(VideoMessage);
-
- // fprintf(stderr,"VBgetMsg: buf:%x, msg:%x\n", (int)buf, (int)vb_ptr);
-
- return vb_ptr;
-}
-
-/* non-block check, return Number of Msgs in buffer */
-int
-VideoBuffer::VBcheckMsg ()
-{
- return get_semval(countid);
-}
-
-int
-VideoBuffer::VBbufEmpty (void)
-{
- /*
- Fprintf(stderr, "VB countid %d\n", get_semval(countid));
- */
- return get_semval(countid) <= 0;
-}
-
-void
-VideoBuffer::VBreclaimMsg (char * msgPtr)
-{
- enter_cs(sid);
- *tail = (*tail)->next;
- leave_cs(sid);
-}
-
-void
-VideoBuffer::VBdeleteBuf (void)
-{
- remove_shared_mem (buf - sizeof(struct header));
-}
-
-void
-VideoBuffer::VBdeleteSem (void)
-{
- remove_semaphore(sid);
- remove_semaphore(countid);
-}
-
-int
-VideoBuffer::VBprocess (int init_socket, int normal_socket)
-{
- this->initSocket = init_socket;
- this->normalSocket = normal_socket;
- msgsn = -1;
- dataSocket = initSocket;
- exit_tag = 0;
- conn_tag = shared->videoMaxPktSize;
- savedSocket = normalSocket;
-
- // ACE_DEBUG ((LM_DEBUG,"VideoBuffer::VBProcess ()\n"));
- /* buffer big enough for discard mode packet stream */
- if (conn_tag < 0)
- {
- tmp_buf = (char *)ACE_OS::malloc(-conn_tag);
- if (tmp_buf == NULL) {
- fprintf(stderr, "AB failed to allocate %d bytes");
- ACE_OS::perror ("of tmp_buf");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- }
-
- ACE_NEW_RETURN (this->handler_,
- Video_Notification_Handler (),
- -1);
-
- // Register the notification handler with the reactor.
- int result = ACE_Reactor::instance ()->register_handler (this->handler_,
- ACE_Event_Handler::READ_MASK);
- if (result != 0)
- return result;
-
- result = ACE_Reactor::instance ()->register_handler (this,
- ACE_Event_Handler::READ_MASK);
-
- if (result != 0)
- return result;
-
- this->state_ = READ_HEADER;
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- return 0;
-}
-
-ACE_HANDLE
-VideoBuffer::get_handle (void) const
-{
- if (this->socket_flag_)
- return this->normalSocket;
- else
- return this->initSocket;
-}
-
-int
-VideoBuffer::handle_input (ACE_HANDLE fd)
-{
- // ACE_DEBUG ((LM_DEBUG,"VideoBuffer::handle_input:state = %d\n",this->state_));
- switch (this->state_)
- {
- case READ_NEXT_HEADER:
- case READ_HEADER:
- {
- if (conn_tag >= 0)
- len = ACE_OS::read (dataSocket,temp,bytes);
- else
- {
- len = ACE_OS::read (dataSocket,tmp_buf,-conn_tag);
- // fprintf (stderr,"VB read packet len = %d\n",len);
- ACE_OS::memcpy ((char *)&msghd, tmp_buf, sizeof(msghd));
- }
- if (len == -1) {
- if (errno == EWOULDBLOCK || errno == EAGAIN) {
- perror("VB sleep for 10ms");
- usleep(10000);
- // set the pointers before going into the next loop.
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- return 0;
- }
- ACE_OS::perror ("VB ACE_OS::read () data");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- if (len == 0) { /* EOF, connection closed by peer */
- fprintf(stderr, "Error: VB found dataSocket broken\n");
- for (;;) {
- usleep(1000000);
- }
- }
- if (conn_tag >= 0)
- {
- temp += len;
- bytes -= len;
- if (bytes != 0)
- return 0;
- else
- len = sizeof (msghd);
- }
- if (len < sizeof(msghd))
- {
- // go back to reading the next header.
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- fprintf(stderr, "VD warn: PEEK1ed %dB < expected %dB\n",len, sizeof(msghd));
- // continue;
- }
-#ifdef NeedByteOrderConversion
- msghd.packetsn = ntohl(msghd.packetsn);
- msghd.packetSize = ntohl(msghd.packetSize);
- msghd.msgsn = ntohl(msghd.msgsn);
- msghd.msgOffset = ntohl(msghd.msgOffset);
- msghd.msgSize = ntohl(msghd.msgSize);
-#endif
-
- if (this->state_ == READ_NEXT_HEADER)
- {
-#ifdef STAT
- {
- int gap = msghd.msgsn - gap_msgsn;
- gap = (gap >MSGGAP_MAX) ? MSGGAP_MAX : gap < MSGGAP_MIN ? MSGGAP_MIN : gap;
- shared->stat.VBmsgGaps[gap - MSGGAP_MIN] ++;
- if (gap >0) gap_msgsn = msghd.msgsn;
- }
-#endif
- if (msghd.msgsn <= msgsn)
- { /* outdated message, wait for next one */
-
- fprintf(stderr, "VB discard outdated or dup msgsn %d, pktsn %d\n",
- msghd.msgsn, msghd.packetsn);
-
- this->state_ = SKIP_NEXT_MESSAGE;
- bytes = msghd.msgSize;
- // skip_message(dataSocket, &msghd);
- // continue;
- return 0;
- }
-
- if ((msghd.msgsn > msgsn + 1) || (msghd.msgOffset == 0))
- {
- /* message out of order, abandon current packet */
- /*
- fprintf(stderr, "VB msg out of order for current packet, discard it.\n");
- */
-#ifdef STAT
- to_count = 0;
-#endif
- }
- else
- {
- // ACE_DEBUG ((LM_DEBUG,"assigning next msgsn %d\n",msghd.msgsn));
- msgsn = msghd.msgsn;
- this->state_ = READ_MESSAGE;
- temp = ptr +sizeof (msghd);
- bytes = msghd.msgSize;
- // make a recursive call as we just have to do a memcpy from the buffer.
- this->handle_input (dataSocket);
- return 0;
- }
- }
-
- // fprintf(stderr, "VB PEEK1 a msg sn-%d, size-%d, pkt-%d, pktsize-%d\n",msghd.msgsn, msghd.msgSize, msghd.packetsn, msghd.packetSize);
-
-#ifdef STAT
- if (to_count) {
- int gap = msghd.msgsn - gap_msgsn;
- gap = (gap >MSGGAP_MAX) ? MSGGAP_MAX : gap < MSGGAP_MIN ? MSGGAP_MIN : gap;
- shared->stat.VBmsgGaps[gap - MSGGAP_MIN] ++;
- if (gap >0) gap_msgsn = msghd.msgsn;
- }
- to_count = 1;
-#endif
- if (msghd.msgsn <= msgsn) /* outdated msg */
- {
- fprintf(stderr, "VB discard outdated msgsn %d, pktsn %d when expecting first %d\n",
- msghd.msgsn, msghd.packetsn,msgsn);
- this->state_ = SKIP_MESSAGE;
- bytes = msghd.msgSize;
- // skip_message(dataSocket, &msghd);
- // continue;
- return 0;
- }
- else if (msghd.msgOffset != 0) /* not first msg of a packet */
- {
-
- /*
- Fprintf(stderr, "VB discard non-first msg msgsn %d, pktsn %d\n",
- msghd.msgsn, msghd.packetsn);
- */
- this->state_ = SKIP_MESSAGE;
- bytes = msghd.msgSize;
- // skip_message(dataSocket, &msghd);
- // continue;
- return 0;
- }
- else
- {
- // ACE_DEBUG ((LM_DEBUG,"assigning msgsn %d\n",msghd.msgsn));
- msgsn = msghd.msgsn;
- }
-
- /* allocate packet for the incoming msg */
- bsize = msghd.packetSize + sizeof(**head)*2 + sizeof(msghd);
- bsize = ((bsize+3)>>2)<<2;
- enter_cs(sid);
- if (*head >= *tail)
- {
- if (bufsize - (int)((char*)*head - buf) >= bsize )
- msg =(VideoMessage *)((char*)*head + sizeof(**head));
- else if ((int)((char*)*tail - buf) >= bsize)
- {
- (*head)->next = (struct block *)buf;
- (*head)->full = 0;
- *head = (struct block *)buf;
- msg = (VideoMessage *)(buf + sizeof(**head));
- *head = (struct block *)buf;
- (*head)->shcode = SHCODE;
- }
- else /* not enough buffer, discard current message */
- {
- leave_cs(sid);
-#ifdef STAT
- if (shared->collectStat)
- shared->stat.VBdroppedFrames ++;
-#endif
- /*
- Fprintf(stderr, "VB not enough space 1, drop msg.sn %d pktsn %d\n",
- msghd.msgsn, msghd.packetsn);
- */
- this->state_ = SKIP_MESSAGE;
- bytes = msghd.msgSize;
- // skip_message(dataSocket, &msghd);
- // continue;
- return 0;
- }
- }
- else /* *head < *tail */
- if ((char*)*tail - (char*)*head >= bsize)
- msg = (VideoMessage *)((char*)*head + sizeof(**head));
- else /* not enough buffer, abandon current message */
- {
- leave_cs(sid);
-#ifdef STAT
- if (shared->collectStat)
- shared->stat.VBdroppedFrames ++;
-#endif
- /*
- Fprintf(stderr, "VB not enough space 1, drop msg.sn %d pktsn %d\n",
- msghd.msgsn, msghd.packetsn);
-
- */
- this->state_ = SKIP_MESSAGE;
- bytes = msghd.msgSize;
- // skip_message(dataSocket, &msghd);
- // continue;
- return 0;
- }
- leave_cs(sid);
-
- //fprintf(stderr, "VB allocated a buffer for comming packet.\n");
-
- psize = msghd.packetSize;
- poffset = 0;
- packet = (VideoPacket *)((char*)msg + sizeof(msghd));
- *(((int*)packet)+(msghd.packetSize>>2)) = 0;
- /* clear the last no more than three bytes, for
- proper detecting the end of packet by VD */
- ptr = (char*)msg;
- this->state_ = READ_MESSAGE;
- temp = ptr +sizeof (msghd);
- bytes = msghd.msgSize;
- }
- break;
- case SKIP_NEXT_MESSAGE:
- case SKIP_MESSAGE:
- {
- char buffer[BUFSIZ];
- if (conn_tag >= 0)
- {
- int size = bytes > BUFSIZ ? BUFSIZ : bytes;
- int res =ACE_OS::read (dataSocket, buffer, size);
- bytes -= res;
-
- if (bytes != 0)
- return 0;
- }
- if (this->state_ == SKIP_MESSAGE)
- this->state_ = READ_HEADER;
- else if (this->state_ == SKIP_NEXT_MESSAGE)
- this->state_ = READ_NEXT_HEADER;
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- break;
- }
- case READ_MESSAGE:
- {
- if (conn_tag >= 0)
- {
- int val;
- val = ACE_OS::read (dataSocket,temp,bytes);
-
- if (val == -1 && (errno == EINTR || errno == EAGAIN | errno == EWOULDBLOCK))
- { /* interrupted or need to wait, try again */
- if (errno == EAGAIN | errno == EWOULDBLOCK) usleep(10000);
- errno = 0;
- return 0;
- }
- if (val == -1)
- {
- ACE_OS::perror ("Error -- Read from socket");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- if (val == 0) /* EOF encountered */
- {
- ACE_DEBUG ((LM_DEBUG, "Error -- EOF reached while trying to read %d bytes.\n"));
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- temp += val;
- bytes -= val;
- if (bytes < 0) /* weird thing is happening */
- {
- ACE_DEBUG ((LM_DEBUG, "Error: read too much from socket, %d out of %d bytes.\n"));
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- if (bytes != 0)
- return 0;
- }
- else
- memcpy(temp, tmp_buf + sizeof(msghd), bytes);
- poffset += msghd.msgSize;
- psize -= msghd.msgSize;
- ptr += msghd.msgSize;
-
- // fprintf(stderr, "VB packet remain size %d\n", psize);
-
- if (psize == 0)
- {
- // ACE_DEBUG ((LM_DEBUG,"finished receiving current packet\n"));
- /* finished receiving the current packet */
-#ifdef NeedByteOrderConversion
- packet->cmd = ntohl(packet->cmd);
- packet->cmdsn = ntohl(packet->cmdsn);
- packet->sh = ntohl(packet->sh);
- packet->gop = ntohl(packet->gop);
- packet->frame = ntohl(packet->frame);
- packet->display = ntohl(packet->display);
- packet->future = ntohl(packet->future);
- packet->past = ntohl(packet->past);
- packet->currentUPF = ntohl(packet->currentUPF);
- packet->dataBytes = ntohl(packet->dataBytes);
-#endif
- pcmdsn = packet->cmdsn;
- pcmd = packet->cmd;
- pfid = packet->frame;
- pgop = packet->gop;
- shared->VBheadFrame = (pcmd == CmdPLAY) ? pfid : pgop;
-
- shared->currentUPF = packet->currentUPF;
- enter_cs(sid);
- (*head)->full = 1;
- psize = sizeof(**head) + sizeof(*msg) + msghd.packetSize;
- psize = ((psize+3)>>2)<<2;
- ptr = (char*)*head + psize;
- (*head)->next = (struct block *) ptr;
- (*head) = (struct block *)ptr;
- (*head)->shcode = SHCODE;
- leave_cs(countid);
- leave_cs(sid);
-
- /* VB receives all frame except for the INIT one through normalSocket */
- if (dataSocket != normalSocket)
- {
- this->socket_flag_ = 1;
- fprintf(stderr, "VB got INIT frame.\n");
- ACE_OS::write (initSocket, (char *)&initSocket, 1); /* write a garbage byte */
- // ACE_OS::close (initSocket);
- // dataSocket = normalSocket;
- // int result = ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK);
- // if (result != 0)
- // ACE_DEBUG ((LM_DEBUG,"remove handler failed for read_mask\n"));
-
- }
-
- /* following is synchronization feedback algorithm */
- this->sync_feedback ();
- if (dataSocket != normalSocket)
- {
- dataSocket = normalSocket;
- int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::READ_MASK);
- if (result != 0)
- ACE_DEBUG ((LM_DEBUG,"register handler failed for read_mask after datasocket change\n"));
- return -1;
- }
- // return 0;
- break; /* got the whole packet, break to the out-most loop for next packet */
- } /* end if (psize == 0) */
- else if (psize < 0)
- {
- fprintf(stderr, "VB error: received too many msgs for a packet.\n");
- ACE_Reactor::instance ()->end_event_loop ();
- return -1;
- }
- this->state_ = READ_NEXT_HEADER;
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- break;
- }
- }
- return 0;
-}
-
-int
-VideoBuffer::handle_output (ACE_HANDLE fd)
-{
- if ((this->state_ == WRITE_FEEDBACK1) || (this->state_ == WRITE_FEEDBACK2))
- {
- // send the feedback to the server.
- VideoFeedBackPara para;
- para.cmdsn = htonl(shared->cmdsn);
- para.addUsecPerFrame = htonl(fb_addupf);
- para.addFrames = htonl(fb_addf);
- para.needHeader = htonl(shared->needHeader);
- shared->needHeader = 0;
- para.frameRateLimit1000 =
- htonl((long)(shared->frameRateLimit * 1000.0));
- para.sendPatternGops = htonl(shared->sendPatternGops);
- ACE_OS::memcpy (para.sendPattern, shared->sendPattern, PATTERN_SIZE);
-
- // fprintf(stderr, "VB to send a fb packet...");
-
- int res;
- if (conn_tag != 0)
- { /* packet stream */
- if (temp == 0)
- {
- temp = (char *)&para;
- bytes = sizeof (para);
- }
- res = ACE_OS::write (dataSocket, temp, bytes);
- if (res == -1)
- {
- if (errno == EINTR)
- return 0;
- if (errno == ENOBUFS) {
- perror("VB Warning, fb packet discarded for");
- // Here we should handle the return -1 case!
- fb_state = 4;
- }
- else
- {
- ACE_OS::perror ("VB error, fb packet sending failed");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- }
- else if (res == 0)
- {
- ACE_OS::perror ("VB error, sending fb,socket closed");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- else
- {
- temp += res;
- bytes -= res;
- if (bytes != 0)
- return 0;
- }
- }
- else
- {
- res = ACE_OS::write (dataSocket, (char *)&para, sizeof(para));
- if (res == -1)
- {
- ACE_OS::perror ("VB error, fb packet sending failed");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- if (res < sizeof(para))
- {
- fprintf(stderr, "VB send_feedback() warn: res %dB < sizeof(para) %dB\n",
- res, sizeof(para));
- }
- }
- if (errno != ENOBUFS) // fb_state == 4;
- qosRecomputes = len;
- ACE_Reactor::instance ()->remove_handler (this,
- ACE_Event_Handler::WRITE_MASK);
- if (this->state_ == WRITE_FEEDBACK2)
- {
- if (fb_state == 6)
- { /* record the time if an action packet is
- successfully send, and indicate that an
- feedback action leads to state 6, which after
- delay sometime leads to state 3.
- The action_delay should have been related
- to round-trip time. */
- action_time = get_usec();
- action_delay = shared->usecPerFrame * 100;
- not_action = 0;
- }
- }
- else if (this->state_ == WRITE_FEEDBACK2)
- {
- this->feedback_action ();
- }
-#ifdef STAT
- {
- int i;
- if ((i = shared->stat.fbPacketNumber) < MAX_FB_PACKETS) {
- shared->stat.fbPackets[i].frameId = shared->nextFrame;
- shared->stat.fbPackets[i].addUsecPerFrame = addupf;
- shared->stat.fbPackets[i].addFrames = addf;
- shared->stat.fbPackets[i].frames = shared->sendPatternGops *
- shared->patternSize;
- shared->stat.fbPackets[i].framesDropped = shared->framesDropped;
- shared->stat.fbPackets[i].frameRateLimit = shared->frameRateLimit;
- shared->stat.fbPackets[i].advance = advance;
- }
- shared->stat.fbPacketNumber ++;
- }
-#endif
- // Now return to the reading header position.
- this->state_ = READ_HEADER;
- temp = (char *)&msghd;
- bytes = sizeof (msghd);
- int result = ACE_Reactor::instance ()->remove_handler (this,
- ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- ACE_DEBUG ((LM_DEBUG,"remove_handler failed for write"));
- }
- return 0;
-}
-
-int
-VideoBuffer::sync_feedback (void)
-{
- int result;
- if (shared->config.syncEffective)
- {
- if (fb_state > 1 && fb_state != 4 && (len = shared->qosRecomputes) != qosRecomputes)
- {
- /* QoS feedback packet is sent if at any time send pattern is
- recomputed, and sync feedback is not in active fb_state*/
- this->state_ = WRITE_FEEDBACK1;
- result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- return result;
- fb_addupf = 0;
- fb_addf = 0;
- fb_advance = advance;
- temp = 0;
- return 0;
- }
- this->feedback_action ();
- } /* end if (shared->config.syncEffective) */
- else
- fb_state = 0;
- return 0;
-}
-
-int
-VideoBuffer::feedback_action (void)
-{
- switch (fb_state)
- {
- case 4: /* active */
- if (pcmdsn != cmdsn)
- {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB))
- fb_state = 0;
- else
- fb_state = 1;
- break;
- }
- if (pcmd == CmdPLAY && shared->usecPerFrame != upf)
- {
- /* jump to fb_state 5 if speed changes */
- fb_state = 5;
- break;
- }
- {
- int interval = shared->usecPerFrame;
- double val = (double)(pcmd == CmdPLAY ?
- pfid - shared->nextFrame :
- (pcmd == CmdFF ?
- pgop - shared->nextGroup :
- shared->nextGroup - pgop));
- fv = DoFilter(f, val); /* get average #frames in the whole client
- pipeline, including all stages */
- val = val - fv;
- fav = DoFilter(fa, val >= 0.0 ? val : -val);
- /* get average #frames jitter in the whole client pipeline */
-
- val = fav * interval * 6;
- /* convert deviation in frame into microseconds, 6 is a magic number */
-
- /* tries to recompute advance (in microseconds), and med/high/low
- in adaptation to current jitter level */
- if ((val > advance && !reach_limit) ||
- (advance > min_advance && val < advance >> 3)) {
- advance = (int) max(2 * val, min_advance);
- med = advance / interval;
- /*
- if (pcmd == CmdPLAY) {
- if (med < shared->VDframeNumber) {
- med = shared->VDframeNumber;
- }
- }
- else
- */
- if (med < 2 ) { /* but keep minimum buffer fill level */
- med = 2;
- }
- if (med > (VB_BUF_SIZE / shared->averageFrameSize) / 2) {
- reach_limit = 1;
- med = (VB_BUF_SIZE / shared->averageFrameSize) / 2;
- Fprintf(stderr,
- "VB VSadvance control: VBbuf limit reached, med %d.\n", med);
- }
- else reach_limit = 0;
- high = med + med / 2;
- low = med - med / 2;
- period = med * MAX_CLOCK_DRIFT;
- Fprintf(stderr,
- "VB: VS advance control: fav %5.2f, med %d, advance %d at nextFrame %d\n",
- fav, med, advance, shared->nextFrame);
- }
- }
- /* record the current time (fid or gopid) if buffer fill level drift
- only very little */
- if ((int)fv == med)
- {
- init_fv = fv;
- init_pos = pcmd == CmdPLAY ? pfid : pgop;
- break;
- }
- /*
- fprintf(stderr, "VB fb: fv %lf\n", fv);
- */
- /* try send action if low/high water mark is passed, or qos recomputed */
- /* There is problem here, the deltas of upf are sent, instead of upf
- and frame themself. This scheme is not robust in case case feedback
- packets are lost, and get resent */
- len = shared->qosRecomputes;
- if (fv >= high || fv <= low || len != qosRecomputes)
- {
- int addupf, addf;
- int pos = pcmd == CmdPLAY ? pfid : pgop;
- int dist = (int)(pcmd == CmdFB ? init_pos - pos : pos - init_pos);
- if (fv >= high || fv <= low)
- {
- if (dist < period)
- { /* try skip or stall */
- addf = (int)(med - fv);
- addupf = 0;
- }
- else
- { /* try adjust VS clock rate */
- int added = (int)((double)cupf * (fv - (double)med) / (double) dist);
- addf = (int)(med - fv);
- addupf = added;
- cupf += added;
- }
- fb_state = 6;
- }
- else
- { /* fb only recomputed sendpattern, no fb_state change */
- addupf = 0;
- addf = 0;
- }
-
- /* tries to send a feedback packet. */
- if (shared->live)
- /* no sync feedback with live video */
- qosRecomputes = len;
- else
- {
- this->state_ = WRITE_FEEDBACK2;
- int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- return result;
- fb_addupf = addupf;
- fb_addf = addf;
- fb_advance = advance;
- return 0;
- }
- if (fb_state == 6)
- { /* record the time if an action packet is
- successfully send, and indicate that an
- feedback action leads to fb_state 6, which after
- delay sometime leads to fb_state 3.
- The action_delay should have been related
- to round-trip time. */
- action_time = get_usec();
- action_delay = shared->usecPerFrame * 100;
- not_action = 0;
- }
- }
- break;
- case 6: /* reset after action */
- if (pcmdsn != cmdsn)
- {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB))
- fb_state = 0;
- else
- fb_state = 1;
- break;
- }
- if (pcmd == CmdPLAY && shared->usecPerFrame != upf)
- {
- fb_state = 5;
- break;
- }
- /* Jump to transition fb_state 3 only after delay for some
- time, when feedback action has been taken, and the
- effect has been propogated back to the client */
- if (get_duration(action_time, get_usec()) >= action_delay)
- fb_state = 3;
- break;
- case 5: /* reset after speed change, feedback stays in this
- fb_state as long as play speed is changing */
- if (pcmdsn != cmdsn)
- {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB))
- fb_state = 0;
- else
- fb_state = 1;
- break;
- }
- /* Jump to transition fb_state 3, indicating that the transition
- is not caused by feedback action */
- if (shared->currentUPF == shared->usecPerFrame)
- {
- not_action = 1;
- fb_state = 3;
- }
- break;
- case 2: /* delay after start, this delay is for avoiding feedback
- action during server fast start-up. */
- if (pcmdsn != cmdsn) {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) {
- fb_state = 0;
- }
- else {
- fb_state = 1;
- }
- break;
- }
- if (pcmd == CmdPLAY) {
- if (pfid - startpos >= delay) {
- advance = shared->VStimeAdvance;
- fb_state = 3;
- }
- }
- else if (pcmd == CmdFF) {
- if (pgop - startpos >= delay) {
- advance = shared->VStimeAdvance;
- fb_state = 3;
- }
- }
- else { /* CmdFB */
- if (startpos - pgop >= delay) {
- advance = shared->VStimeAdvance;
- fb_state = 3;
- }
- }
- break;
- case 3: /* transient fb_state, entered after start-up delay,
- action-delay, or play-speed change */
- if (pcmdsn != cmdsn) {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) {
- fb_state = 0;
- }
- else {
- fb_state = 1;
- }
- break;
- }
- /* Initialize both buffer-fill-level and jitter filters */
- if (f == NULL) {
- f = NewFilter(FILTER_LOWPASS, shared->config.filterPara >= 1 ?
- shared->config.filterPara : 100);
- }
- else {
- f = ResetFilter(f, shared->config.filterPara >= 1 ?
- shared->config.filterPara : 100);
- }
- if (fa == NULL) {
- fa = NewFilter(FILTER_LOWPASS, shared->config.filterPara >= 1 ?
- shared->config.filterPara : 100);
- DoFilter(fa, 0.0);
- }
- else if (not_action) { /* reset jitter level filter only
- if entering this fb_state is not
- cause by feedback action */
- fa = ResetFilter(fa, shared->config.filterPara >= 1 ?
- shared->config.filterPara : 100);
- }
- if (f == NULL || fa == NULL) {
- ACE_OS::perror ("VB failed to allocate space for filters");
- fb_state = 0;
- }
- else {
- init_fv =
- DoFilter(f, (double)(pcmd == CmdPLAY ?
- pfid - shared->nextFrame :
- (pcmd == CmdFF ?
- pgop - shared->nextGroup :
- shared->nextGroup - pgop)));
- init_pos = pcmd == CmdPLAY ? pfid : pgop;
- upf = shared->currentUPF;
- cupf = upf;
- {
- int interval = shared->usecPerFrame;
-
- /* upon speed change, 'advance', in microseconds, will
- not change, but med/high/low will be updated. This
- may suggest that in the new toolkit version of the
- same feedback systems, the buffer-fill level and
- jitter is measured directly in microseconds, not in
- #frames then converting to microseconds. */
- med = advance / interval;
- /*
- if (pcmd == CmdPLAY) {
- if (med < shared->VDframeNumber) {
- med = shared->VDframeNumber;
- min_advance = advance = med * interval;
- }
- }
- else
- */
- if (med < 2 ) {
- med = 2;
- advance = med * interval;
- min_advance = max(advance, shared->VStimeAdvance);
- }
- else min_advance = shared->VStimeAdvance;
- if (med > (VB_BUF_SIZE / shared->averageFrameSize) / 2) {
- reach_limit = 1;
- med = (VB_BUF_SIZE / shared->averageFrameSize) / 2;
- if (not_action) {
- Fprintf(stderr,
- "VB start/speed-change: VBbuf limit reached, med %d.\n", med);
- }
- }
- else reach_limit = 0;
- }
- high = med + med / 2;
- low = med - med / 2;
- delay = med * (SPEEDUP_INV_SCALE - 1); /* this delay is to avoid
- feedback when VS is in
- fast-start period */
- period = med * MAX_CLOCK_DRIFT;
- if (not_action) {
- Fprintf(stderr,
- "VB start/speed-change: med %d, advance %d at nextFrame %d\n",
- med, advance, shared->nextFrame);
- }
- fb_state = 4;
- }
- break;
- case 0: /* idle */
- if (pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB) {
- cmdsn = pcmdsn;
- fb_state = 1;
- }
- break;
- case 1: /* start */
- if (pcmdsn != cmdsn) {
- cmdsn = pcmdsn;
- if (!(pcmd == CmdPLAY || pcmd == CmdFF || pcmd == CmdFB)) {
- fb_state = 0;
- }
- break;
- }
- startpos = pcmd == CmdPLAY ? pfid : pgop;
- advance = shared->VStimeAdvance;
-
- /* following from vs.c:
- timerAdjust = (VStimeAdvance * SPEEDUP_INV_SCALE) / currentUPF;
- */
- med = advance / shared->usecPerFrame;
- /*
- if (pcmd != CmdPLAY) med /= shared->patternSize;
- */
- delay = med * (SPEEDUP_INV_SCALE - 1); /* this delay is to avoid
- feedback when VS is in
- fast-start period */
- not_action = 1;
- fb_state = 2;
- break;
- default:
- fprintf(stderr, "VB: unknown Feedback fb_state %d reached.\n", fb_state);
- fb_state = 0;
- break;
- }
-}
-
-ACE_HANDLE
-Notification_Handler::get_handle (void) const
-{
- return -1;
-}
-
-int
-Notification_Handler::handle_input (ACE_HANDLE fd)
-{
- ACE_DEBUG ((LM_DEBUG,"Notification_Handler::handle_input"));
- char message[BUFSIZ];
- message [0] = 0;
- // used to indicate that we should exit.
- int result =
- ACE_OS::read (fd,message,BUFSIZ);
-
- ACE_DEBUG ((LM_DEBUG,"result:%d,message[0]:%d\n",result,message[0]));
- if (result == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"AB process exiting, notification socket eof while reading\n"));
- ACE_Reactor::instance ()->end_event_loop ();
- return -1;
- }
- if (result == -1)
- {
- ACE_DEBUG ((LM_DEBUG,"AB process exiting, notification socket error while reading\n"));
- ACE_Reactor::instance ()->end_event_loop ();
- return -1;
- }
- ACE_DEBUG ((LM_DEBUG," %d %d\n",result,message[0]));
-
- switch (message[0])
- {
- case EXIT:
- ACE_DEBUG ((LM_DEBUG,"VB process exiting because of exit signal\n"));
- set_exit_routine_tag(0);
- VideoBuffer::VBdeleteBuf();
- ACE_Reactor::instance ()->end_event_loop ();
- return -1;
- default:
- break;
- }
- return 0;
-}
-
-ACE_HANDLE
-Video_Notification_Handler::get_handle (void) const
-{
- return vsp[1];
-}