diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_server/Globals.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_server/Globals.cpp | 2958 |
1 files changed, 2958 insertions, 0 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_server/Globals.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_server/Globals.cpp new file mode 100644 index 00000000000..7f792d32392 --- /dev/null +++ b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_server/Globals.cpp @@ -0,0 +1,2958 @@ +// $Id$ + +/* Copyright (c) 1995 Oregon Graduate Institute of Science and Technology + * P.O.Box 91000-1000, Portland, OR 97291, USA; + * + * Permission to use, copy, modify, distribute, and sell this software and its + * documentation for any purpose is hereby granted without fee, provided that + * the above copyright notice appear in all copies and that both that + * copyright notice and this permission notice appear in supporting + * documentation, and that the name of O.G.I. not be used in advertising or + * publicity pertaining to distribution of the software without specific, + * written prior permission. O.G.I. makes no representations about the + * suitability of this software for any purpose. It is provided "as is" + * without express or implied warranty. + * + * O.G.I. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING + * ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL + * O.G.I. BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY + * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN + * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Author: Shanwei Cen + * Department of Computer Science and Engineering + * email: scen@cse.ogi.edu + */ + +#include "ace/OS.h" +#include "Globals.h" + +ACE_RCSID(mpeg_server, Globals, "$Id$") + +int Mpeg_Global::parentpid = -1; +int Mpeg_Global::listenSocketIn = -1; +int Mpeg_Global::listenSocketUn = -1; +struct linger Mpeg_Global::linger = {1,1}; +int Mpeg_Global::live_audio = 0; +int Mpeg_Global::live_video = 0; /* 0 - no, 1 - to open, 2 - opened */ +int Mpeg_Global::drift_ppm = 0; /* clock drift in ppm */ +int Mpeg_Global::session_limit = SESSION_NUM; +int Mpeg_Global::session_num = 0; +int Mpeg_Global::rttag = 0; + +int Video_Timer_Global::timerHeader = 0; +int Video_Timer_Global::timerGroup = 0; +int Video_Timer_Global::timerFrame = 0; +int Video_Timer_Global::timerOn = 0; +int Video_Timer_Global::timerAdjust = 0; +int Video_Timer_Global::preTimerVal = 0; + +// Initialize the nasty int's, doubles and their friends. + +Video_Global::Video_Global (void) +{ + data_host = 0; + live_source = 0; + video_format = 0; + + pkts_sent = 0; + start_time = 0; + + conn_tag = -1; + + normalExit = 1; + + serviceSocket = 0; + videoSocket = -1; + + ACE_OS::memset (videoFile, + 0, + PATH_SIZE); + fp = 0; + + needHeader = 0; + + lastRef [0] = lastRef [1] = 0; + + lastRefPtr = 0; + currentUPF = 0; + addedUPF = 0; + addedSignals = 0; + VStimeAdvance = 0; + fps = 0; /* current frames-per-second: playback speed */ + frameRateLimit = 0; + + packet = 0; + packetBufSize = 0; + msgsn = 0; + packetsn = 0; + msgsize = 0; + + precmd = 0; + cmd = 0; + cmdsn = 0; + nextFrame = 0; + nextGroup = 0; + firstPatternSize = 0; + firstSendPattern = 0; + sendPatternGops = 0; + ACE_OS::memset (sendPattern, + 0, + PATTERN_SIZE); + +#ifdef STAT + framesSent = 0; +#endif /* STAT */ + + fileSize = 0; + maxS = 0; + maxG = 0; + maxI = 0; + maxP = 0; + maxB = 0; + minS = 0x7fffffff; + minG = 0x7fffffff; + minI = 0x7fffffff; + minP = 0x7fffffff; + minB = 0x7fffffff; + + numS = 0; + numG = 0; + numF = 0; + numI = 0; + numP = 0; + numB = 0; + + averageFrameSize = 0; + horizontalSize = 0; + verticalSize = 0; + pelAspectRatio = 0; + pictureRate = 0; + vbvBufferSize = 0; + firstGopFrames = 0; + patternSize = 0; + + ACE_OS::memset (pattern, + 0, + PATTERN_SIZE); + + // struct pointers + systemHeader = 0; + + gopTable = 0; + + frameTable = 0; + + // playvideo local vars + + preGroup = -1; + preHeader = -1; + preFrame = -1; + + fast_preGroup = -1; + fast_preHeader= -1; +} + +int +Video_Global::FBread (char *buf, int size) +{ + int res; + + while ((res = (this->conn_tag >= 0 ? wait_read_bytes (this->videoSocket, buf, size) : + read (this->videoSocket, buf, size))) == -1) + { + if (errno == EINTR) {errno = 0; continue; } + if (errno == EPIPE || errno == ECONNRESET) exit (0); + perror ("VS reads Feedback this->packet"); + return -1; + } + + if (res < size) + { + if (res) + // @@ Can you please convert the printfs() and perrors to use + // the appropriate ACE_DEBUG and ACE_ERROR macros? + fprintf (stderr, "VS warn: FBread () res %dB < size %dB\n", res, size); + return -1; + } + return 0; +} + +// send a given this->packet pointed by 'this->packet' to the network. + +int +Video_Global::first_packet_send_to_network (int timeToUse) +{ + int count = 0; + VideoMessage * msghd = (VideoMessage *) (((char *) this->packet) - sizeof (VideoMessage)); + int sent = 0; + int packetSize = ntohl (this->packet->dataBytes); + + msghd->packetsn = htonl (this->packetsn ++); + msghd->packetSize = htonl (packetSize + sizeof (* this->packet)); + + fprintf (stderr, "VS to send FIRST pkt %d of size %d.\n", + ntohl (msghd->packetsn), ntohl (msghd->packetSize)); + + + { + VideoMessage * msg = NULL; + int size = packetSize + sizeof (* this->packet); /* msghd->this->packetSize */ + int offset = 0; + int targetTime; + + if (size > this->msgsize) + { + if (!timeToUse) + { + timeToUse = (this->msgsize + sizeof (*msg) + 28) * 2; + /* + set the max network as 500KB. + 28 - UDP header size + */ + /* + fprintf (stderr, "computed timeToUse %d. ", timeToUse); + */ + } + else + { + timeToUse = (timeToUse * 7) >> 3; + /* + fprintf (stderr, "preset timeToUse %d.", timeToUse); + */ + timeToUse /= (size + this->msgsize - 1) / this->msgsize; + timeToUse = min (timeToUse, (this->msgsize + sizeof (*msg) + 28) * 100); + /* limit min network bandwidth = 10K */ + } + + } + while (size > 0) + { + int segsize, sentsize; + int resent = 0; + + if (msg == NULL) + { /* first message for current this->packet + */ + count = 0; + msg = msghd; + targetTime = get_usec (); + } + else { +#if 0 + /* the select () is not precise enough for being used here*/ + int sleepTime; + targetTime += timeToUse; + sleepTime = get_duration (get_usec (), targetTime); + if (sleepTime >= 5000) { /* resolution of timer is 10,000 usec */ + usleep (sleepTime); /* not first message, wait for a while */ + } +#endif + /* + count ++; + if (! (count % 10)) usleep (10000); + */ + msg = (VideoMessage *) ((char *)msg + this->msgsize); + memcpy ((char *)msg, (char *)msghd, sizeof (* msg)); + } + + msg->msgsn = htonl (this->msgsn++); + msg->msgOffset = htonl (offset); + msg->msgSize = htonl (min (size, this->msgsize)); + // send the header seperately first + segsize = sizeof (*msg); + // ACE_DEBUG ((LM_DEBUG, + // "(%P|%t) Sending the header, of size %d\n", + // segsize)); + + while (write (this->videoSocket, + (char *)msg, + segsize) == -1) + { + if (errno == EINTR) + continue; + if (errno == ENOBUFS) { + if (resent) { + perror ("Warning, pkt discarded because"); + sent = -1; + break; + } + else { + resent = 1; + perror ("VS to sleep 5ms"); + usleep (5000); + continue; + } + } + if (errno != EPIPE) { + fprintf (stderr, "VS error on send this->packet %d of size %d ", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + perror (""); + } + exit (errno != EPIPE); + } + + + // segsize = min (size, this->msgsize)+sizeof (*msg); + segsize = min (size, this->msgsize); + + if (this->conn_tag != 0) { /* this->packet stream */ + // cerr << "vs sending " << segsize << " on fd = " << this->videoSocket << endl; + // ACE_DEBUG ((LM_DEBUG,"packetsn = %d,msgsn = %d\n", + // msg->packetsn,msg->msgsn)); + + while ((sentsize = write (this->videoSocket, + (char *)msg + sizeof (*msg), + segsize)) == -1) { + if (errno == EINTR) + continue; + if (errno == ENOBUFS) { + if (resent) { + perror ("Warning, pkt discarded because"); + sent = -1; + break; + } + else { + resent = 1; + perror ("VS to sleep 5ms"); + usleep (5000); + continue; + } + } + if (errno != EPIPE) { + fprintf (stderr, "VS error on send this->packet %d of size %d ", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + perror (""); + } + exit (errno != EPIPE); + } + } + else { + sentsize = wait_write_bytes (this->videoSocket, (char *)msg, segsize); + if (sentsize == -1) { + if (errno != EPIPE) { + fprintf (stderr, "VS error on send this->packet %d of size %d ", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + perror (""); + } + exit (errno != EPIPE); + } + } + if (sentsize < segsize) { + SFprintf (stderr, "VS warning: message size %dB, sent only %dB\n", + segsize, sentsize); + } + if (sent == -1) + break; + /* + fprintf (stderr, "VS: message %d of size %d sent.\n", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + */ + size -= this->msgsize; + offset += this->msgsize; + } + } + + fprintf (stderr, "sent = %d\n", sent); + + if (!sent) this->pkts_sent ++; + return sent; +} + +/* + * send a this->packet with given this->systemHeader (optional), gop (optional) and frame. + * + * sh - system header id, if -1, then no system header will be sent. + * otherwise, only when frame == 0, the given system header will be sent. + * gop - group of pictures, gop header will be sent when frame == 0 + * (first I frame ); + * frame - frame to be sent, offset internal to given gop. + */ + +/* returns: 0 - this->packet sent, -1 - this->packet not sent (failed) */ + +int +Video_Global::SendPacket (int shtag, + int gop, + int frame, + int timeToUse, + int first_time) +/* frame maybe out of range (PLAY, STEP), in this case, END_SEQ is sent + to force display of last frame in VD */ +{ + char * buf = ((char *) this->packet) + sizeof (VideoPacket); + int f = this->gopTable[gop].previousFrames + frame; + int sh = this->gopTable[gop].systemHeader; + /* + SFprintf (stderr, "VS to send this->packet gop-%d, frame-%d.\n", gop, frame); + */ + + this->packet->currentUPF = ntohl (this->currentUPF); + + if (frame >= this->gopTable[gop].totalFrames) + { + this->packet->cmd = htonl (this->cmd); + this->packet->cmdsn = htonl (this->cmdsn); + this->packet->sh = htonl (sh); + this->packet->gop = htonl (gop); + this->packet->frame = htonl (this->numF); + this->packet->display = htonl (this->numF-1); + this->packet->future = htonl ((unsigned)-1); + this->packet->past = htonl ((unsigned)-1); + this->packet->dataBytes = htonl (4); + * (int*) ((char*)this->packet + sizeof (*this->packet)) = htonl (SEQ_END_CODE); + + return send_to_network (timeToUse); + } + + if (frame) + shtag = 0; + else if (this->needHeader) + { + shtag = 1; + this->needHeader = 0; + } + + this->packet->cmd = htonl (this->cmd); + this->packet->cmdsn = htonl (this->cmdsn); + this->packet->sh = htonl (sh); + this->packet->gop = htonl (gop); + this->packet->frame = htonl (f); + if (this->frameTable[f].type == 'B') + { + int pre1 = -1, pre2 = -1, i = f; + while (i>0) + if (this->frameTable[--i].type != 'B') + { + pre1 = i; + break; + } + while (i>0) + if (this->frameTable[--i].type != 'B') + { + pre2 = i; + break; + } + if (pre2 == -1) + { + /* + fprintf (stderr, + "frame %d-%d (%d) is a B without past ref, no to be sent.\n", + gop, frame, f); + */ + return -1; + } + if (pre1 != this->lastRef[this->lastRefPtr] || + pre2 != this->lastRef[1 - this->lastRefPtr]) + { + /* + fprintf (stderr, + "send of B frame %d gaveup for past %d/future %d ref not sent.\n", + f, pre2, pre1); + */ + return -1; + } + this->packet->display = htonl (f); + this->packet->future = htonl (pre1); + this->packet->past = htonl (pre2); + } + else + { + int next = f; + int pre = f; + + while (next < this->numF && this->frameTable[++next].type == 'B'); + while (pre > 0 && this->frameTable[--pre].type == 'B'); + if (this->frameTable[f].type == 'P' && pre != this->lastRef[this->lastRefPtr]) + { + /* + fprintf (stderr, + "send of P frame %d gaveup for past ref %d not sent.\n", + f, pre); + fprintf (stderr, "ref0=%d, ref1=%d, ptr=%d.\n", + this->lastRef[0], this->lastRef[1], this->lastRefPtr); + */ + return -1; + } + this->packet->display = htonl (next); + this->packet->future = htonl ((unsigned)-1); + this->packet->past = htonl (this->frameTable[f].type == 'P' ? pre : (unsigned)-1); + } + { + char * ptr = buf; + int size = 0, offset = 0, i; + if (shtag) /* send system header */ + { + size = this->systemHeader[sh].size; + FileRead (this->systemHeader[sh].offset, ptr, size); + ptr += size; + } + if (!frame) /* send gop header */ + { + size = this->gopTable[gop].headerSize; + FileRead (this->gopTable[gop].offset, ptr, size); + ptr += size; + } + size = this->frameTable[f].size; + for (i=this->gopTable[gop].previousFrames; i<f; i++) + offset += this->frameTable[i].size; + FileRead ((this->gopTable[gop].firstIoffset + offset), ptr, size); + ptr += size; + this->packet->dataBytes = htonl (ptr - buf); + } + + { + int sent; + if (first_time == 1) + { + // ACE_DEBUG ((LM_DEBUG, + // "(%P|%t) Sending first frame to client\n")); + sent = first_packet_send_to_network (timeToUse); + } + else + sent = send_to_network (timeToUse); + if (!sent) + { + /* + fprintf (stderr, "%c%d\n", this->frameTable[f].type, f); + fprintf (stderr, "%c frame %d sent.\n", this->frameTable[f].type, f); + */ + if (this->frameTable[f].type != 'B') + { + this->lastRefPtr = 1 - this->lastRefPtr; + this->lastRef[this->lastRefPtr] = f; + } + } + return sent; + } +} + +int +Video_Global::CmdRead (char *buf, int psize) +{ + int res = wait_read_bytes (this->serviceSocket, + buf, + psize); + if (res == 0) return (1); + if (res == -1) { + fprintf (stderr, "VS error on read this->cmdSocket, size %d", psize); + perror (""); + return (-1); + } + return 0; +} + +int +Video_Global::CmdWrite (char *buf, int size) +{ + int res = wait_write_bytes (this->serviceSocket, buf, size); + if (res == -1) { + if (errno != EPIPE) perror ("VS writes to this->serviceSocket"); + return (-1); + } + return 0; +} + +int +Video_Global::PLAYliveVideo (PLAYpara * para) +{ + int doscale; + int count; + int first_frame; + int frame = para->nextFrame; + int nfds = (this->serviceSocket > this->videoSocket ? this->serviceSocket : this->videoSocket) + 1; + fd_set read_mask; + struct timeval tval = {0, 0}; + double ratio; + int result; + + this->currentUPF = (int) (1000000.0 / this->fps); /* ignore para.usecPerFrame */ + if (this->frameRateLimit < this->fps) { + doscale = 1; + ratio = min (this->frameRateLimit, this->fps) / this->fps; + first_frame = frame; + count = 0; + /* + fprintf (stderr, "doscale %d, this->frameRateLimit %5.2f, this->fps %5.2f, ratio %5.2f\n", + doscale, this->frameRateLimit, this->fps, ratio); + */ + } + else doscale = 0; + StartPlayLiveVideo (); + + for (;;) { + + if (doscale) { + for (;;) { + if ((int) ((frame - first_frame) * ratio + 0.5) < count) frame ++; + else break; + } + count ++; + } + SendPicture (&frame); + frame ++; + + FD_ZERO (&read_mask); + FD_SET (this->serviceSocket, &read_mask); + FD_SET (this->videoSocket, &read_mask); + + // @@ Is this code actually used anymore, i.e., do we need to + // ACE-ify it? + +#ifdef _HPUX_SOURCE + if (select (nfds, (int *)&read_mask, NULL, NULL, &tval) == -1) +#else + if (select (nfds, &read_mask, NULL, NULL, &tval) == -1) +#endif + { + if (errno == EINTR) + continue; + perror ("Error - VS select between service and video sockets"); + StopPlayLiveVideo (); + exit (1); + + } + if (FD_ISSET (this->serviceSocket, &read_mask)) /* stop */ + { + u_char tmp; + result = CmdRead ((char *)&tmp, 1); + if (result != 0) + return result; + if (tmp == CmdCLOSE) { + StopPlayLiveVideo (); + exit (0); + } + else if (tmp == CmdSTOP) { + this->cmd = tmp; + /* + fprintf (stderr, "VS: this->CmdSTOP. . .\n"); + */ + result = CmdRead ((char *)&this->cmdsn, sizeof (int)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + this->cmdsn = ntohl (this->cmdsn); +#endif + StopPlayLiveVideo (); + break; + } + else if (tmp == CmdSPEED) + { + SPEEDpara speed_para; + /* + fprintf (stderr, "VS: this->CmdSPEED. . .\n"); + */ + result = CmdRead ((char *)&speed_para, sizeof (speed_para)); + if (result != 0) + return result; + /* ignore this thing for live video */ + } + else + { + fprintf (stderr, "VS error (live): this->cmd=%d while expect STOP/SPEED.\n", tmp); + this->normalExit = 0; + StopPlayLiveVideo (); + exit (1); + } + } + if (FD_ISSET (this->videoSocket, &read_mask)) /* feedback, only for frame rate + adjustment */ + { + VideoFeedBackPara fb_para; + if (FBread ((char *)&fb_para, sizeof (fb_para)) == -1 || + ntohl (fb_para.cmdsn) != this->cmdsn) { + /* + SFprintf (stderr, "VS warning: a FB this->packet discarded.\n"); + */ + return 0; + } +#ifdef NeedByteOrderConversion + fb_para.frameRateLimit1000 = ntohl (fb_para.frameRateLimit1000); +#endif + this->frameRateLimit = fb_para.frameRateLimit1000 / 1000.0; + if (this->frameRateLimit < this->fps) { + doscale = 1; + ratio = min (this->frameRateLimit, this->fps) / this->fps; + first_frame = frame; + count = 0; + /* + fprintf (stderr, "doscale %d, this->frameRateLimit %5.2f, this->fps %5.2f, ratio %5.2f\n", + doscale, this->frameRateLimit, this->fps, ratio); + */ + } + else doscale = 0; + } + } + return 0; +} + +void +Video_Global::ComputeFirstSendPattern (float limit) +{ + char * buf = this->firstSendPattern; + int len = this->firstPatternSize; + char * pat = (char *)ACE_OS::malloc (len); + int f; + + if (pat == NULL) { + fprintf (stderr, "VS error on allocating %d bytes for computing first SP", len); + perror (""); + exit (1); + } + for (f = 0; f < len; f ++) { + pat[f] = this->frameTable[f].type; + } + memset (buf, 0, len); + + if (limit <= 0) + limit = 1.0; + + f = (int) ((double)len * + ((double)limit / (1000000.0 / (double)this->currentUPF)) + 0.5); + /* rounded to integer, instead of truncated */ + if (f >= len) + f = len; + else if (f <= 1) + f = 1; + + ComputeSendPattern (pat, buf, len, f); + + /* + f = len - f; + fprintf (stderr, "FirstSendPattern (%d frames dropped): ", f); + { + int i; + for (i = 0; i < len; i ++) + fputc (buf[i] ? pat[i] : '-', stderr); + } + fputc ('\n', stderr); + */ + free (pat); +} + +int +Video_Global::FrameToGroup (int * frame) +{ + int f = * frame; + int i = 0; + while (i < this->numG && this->gopTable[i].previousFrames <= f) i++; + i --; + * frame = f - this->gopTable[i].previousFrames; + return i; +} + +int +Video_Global::SendReferences (int group, int frame) +{ + u_char orgcmd; + int i, base; + int pregroup; + int result; + + if (group < 0 || group >= this->numG) return 0; + if (frame <= 0 || frame >= this->gopTable[group].totalFrames) return 0; + + orgcmd = this->cmd; + this->cmd = CmdREF; + + if (group > 0) { + pregroup = 1; + base = this->gopTable[group].previousFrames; + for (i = 0; i <= frame; i ++) { + if (this->frameTable[i + base].type == 'P') { + pregroup = 0; + break; + } + } + } + else pregroup = 0; + + if (pregroup) { /* reference frame can be in previous group */ + pregroup = group -1; + base = this->gopTable[pregroup].previousFrames; + for (i = 0; i < this->gopTable[pregroup].totalFrames; i ++) { + if (this->frameTable[i + base].type != 'B') { + /* + SFprintf (stderr, "REF group%d, frame%d\n", pregroup, i); + */ + result = SendPacket (i == 0, pregroup, i, 0); + if (result != 0) + return result; + } + } + } + + base = this->gopTable[group].previousFrames; + for (i = 0; i < frame; i ++) { + if (this->frameTable[i + base].type != 'B') { + /* + SFprintf (stderr, "REF group%d, frame%d\n", group, i); + */ + SendPacket (i == 0, group, i, 0); + } + } + this->cmd = orgcmd; +} + +int +Video_Global::GetFeedBack () +{ + VideoFeedBackPara para; + struct itimerval val; + int timerUsec; + + if (FBread ((char *)¶, sizeof (para)) == -1 || + ntohl (para.cmdsn) != this->cmdsn) { + /* + SFprintf (stderr, "VS warning: a FB this->packet discarded.\n"); + */ + return -1; + } +#ifdef NeedByteOrderConversion + para.needHeader = ntohl (para.needHeader); + para.addUsecPerFrame = ntohl (para.addUsecPerFrame); + para.addFrames = ntohl (para.addFrames); + para.sendPatternGops = ntohl (para.sendPatternGops); + para.frameRateLimit1000 = ntohl (para.frameRateLimit1000); +#endif + this->frameRateLimit = para.frameRateLimit1000 / 1000.0; + this->sendPatternGops = para.sendPatternGops; + + if (!Video_Timer_Global::timerOn) return 0; + + this->needHeader = para.needHeader; + memcpy (this->sendPattern, para.sendPattern, PATTERN_SIZE); + if (para.addFrames <= 0 || Video_Timer_Global::timerAdjust < MAX_TIMER_ADJUST) + { + Video_Timer_Global::timerAdjust += para.addFrames * SPEEDUP_INV_SCALE; + Video_Timer_Global::TimerSpeed (); + } + else /* drastic compensation for big gap */ + this->addedSignals += para.addFrames; + if (para.addUsecPerFrame) { + this->addedUPF += para.addUsecPerFrame; + Video_Timer_Global::TimerSpeed (); + } + /* + SFprintf (stderr, "VS fb: addf %d, addupf %d\n", + para.addFrames, para.addUsecPerFrame); + */ + + return 0; +} + +int +Video_Global::SendPicture (int * frame) +{ + int size; + char * buf = ((char *) this->packet) + sizeof (VideoPacket); + /* + SFprintf (stderr, "VS to send picture %d.\n", *frame); + */ + + size = ReadLiveVideoPicture (frame, buf, this->packetBufSize); + + this->packet->currentUPF = ntohl (this->currentUPF); + this->packet->cmd = htonl (this->cmd); + this->packet->cmdsn = htonl (this->cmdsn); + this->packet->sh = this->packet->gop = this->packet->frame = this->packet->display = htonl (*frame); + this->packet->future = htonl ((unsigned)-1); + this->packet->past = htonl ((unsigned)-1); + + this->packet->dataBytes = htonl (size); + + return send_to_network (this->currentUPF); +} + +int +Video_Global::ReadInfoFromFile (void) +{ + int fd = -1, i; + int fnlen = strlen (this->videoFile); + + strcpy (&this->videoFile[fnlen], ".Info"); + fd = open (this->videoFile, O_RDONLY); + if (fd == -1) + { + fprintf (stderr, "Reminder: VS fails to open %s for read, ", this->videoFile); + perror ("try create one"); + goto fail_ReadInfoFromFile; + } + read_int (fd, &i); + if (i != this->fileSize) + { + fprintf (stderr, "Warning: this->fileSize in Info: %d not the same as actual %d.\n", + i, this->fileSize); + goto fail_ReadInfoFromFile; + } + + read_int (fd, &this->maxS); + read_int (fd, &this->maxG); + read_int (fd, &this->maxI); + read_int (fd, &this->maxP); + read_int (fd, &this->maxB); + read_int (fd, &this->minS); + read_int (fd, &this->minG); + read_int (fd, &this->minI); + read_int (fd, &this->minP); + read_int (fd, &this->minB); + read_int (fd, &this->numS); + read_int (fd, &this->numG); + read_int (fd, &this->numF); + read_int (fd, &this->numI); + read_int (fd, &this->numP); + read_int (fd, &this->numB); + read_int (fd, &this->averageFrameSize); + read_int (fd, &this->horizontalSize); + read_int (fd, &this->verticalSize); + read_int (fd, &this->pelAspectRatio); + read_int (fd, &this->pictureRate); + read_int (fd, &this->vbvBufferSize); + read_int (fd, &this->patternSize); + + memset (this->pattern, 0, PATTERN_SIZE); + read_bytes (fd, this->pattern, this->patternSize); +#ifdef STAT + this->framesSent = (char *)ACE_OS::malloc ((this->numF + 7)>>3); + if (this->framesSent == NULL) + { + fprintf (stderr, "Error: VS fails to alloc mem for this->framesSent for %d frames", this->numF); + perror (""); + exit (1); + } +#endif + this->systemHeader = (struct Video_Global::SystemHeader *)ACE_OS::malloc (sizeof (struct Video_Global::SystemHeader) * this->numS); + if (this->systemHeader == NULL) + { + perror ("Error: VS error on ACE_OS::malloc this->SystemHeader"); + exit (1); + } + this->gopTable = (struct Video_Global::GopTable *)ACE_OS::malloc (sizeof (struct Video_Global::GopTable) * this->numG); + if (this->gopTable == NULL) + { + perror ("Error: VS error on ACE_OS::malloc GopHeader"); + exit (1); + } + this->frameTable = (struct Video_Global::FrameTable *)ACE_OS::malloc (sizeof (Video_Global::FrameTable) * this->numF); + if (this->frameTable == NULL) + { + perror ("Error: VS error on ACE_OS::malloc this->frameTable"); + exit (1); + } + this->packetBufSize = this->maxS + this->maxG + max (this->maxI, max (this->maxP, this->maxB)); + this->packet = (VideoPacket *)ACE_OS::malloc (sizeof (VideoMessage) + sizeof (VideoPacket) + + this->packetBufSize); + if (this->packet == NULL) + { + perror ("Error: VS error on ACE_OS::malloc this->packet buffer"); + exit (1); + } + this->packet = (VideoPacket *) ((char *)this->packet + sizeof (VideoMessage)); + + for (i = 0; i < this->numS; i ++) + { + read_int (fd, (int *)&this->systemHeader[i].offset); + read_int (fd, &this->systemHeader[i].size); + } + for (i = 0; i < this->numG; i ++) + { + read_int (fd, &this->gopTable[i].systemHeader); + read_int (fd, (int *)&this->gopTable[i].offset); + read_int (fd, &this->gopTable[i].headerSize); + read_int (fd, &this->gopTable[i].size); + read_int (fd, &this->gopTable[i].totalFrames); + read_int (fd, &this->gopTable[i].previousFrames); + read_int (fd, (int *)&this->gopTable[i].firstIoffset); + } + for (i = 0; i < this->numF; i ++) + { + read_byte (fd, &this->frameTable[i].type); + read_short (fd, (short *)&this->frameTable[i].size); + } + + close (fd); + /* + fprintf (stderr, "Read Info from %s\n", this->videoFile); + */ + this->videoFile[fnlen] = 0; + return 0; +fail_ReadInfoFromFile: + if (fd >= 0) + close (fd); + this->videoFile[fnlen] = 0; + /* + fprintf (stderr, "To scan Info from %s\n", this->videoFile); + */ + return -1; +} + +void +Video_Global::WriteInfoToFile (void) +{ + int fd = -1, i; + int fnlen = strlen (this->videoFile); + + strcpy (&this->videoFile[fnlen], ".Info"); + fd = open (this->videoFile, O_WRONLY | O_CREAT, 0444); + if (fd == -1) + { + fprintf (stderr, "VS fails to open %s for write", this->videoFile); + perror (""); + goto fail_WriteInfoToFile; + } + write_int (fd, this->fileSize); + write_int (fd, this->maxS); + write_int (fd, this->maxG); + write_int (fd, this->maxI); + write_int (fd, this->maxP); + write_int (fd, this->maxB); + write_int (fd, this->minS); + write_int (fd, this->minG); + write_int (fd, this->minI); + write_int (fd, this->minP); + write_int (fd, this->minB); + write_int (fd, this->numS); + write_int (fd, this->numG); + write_int (fd, this->numF); + write_int (fd, this->numI); + write_int (fd, this->numP); + write_int (fd, this->numB); + write_int (fd, this->averageFrameSize); + write_int (fd, this->horizontalSize); + write_int (fd, this->verticalSize); + write_int (fd, this->pelAspectRatio); + write_int (fd, this->pictureRate); + write_int (fd, this->vbvBufferSize); + write_int (fd, this->patternSize); + + write_bytes (fd, this->pattern, this->patternSize); + + for (i = 0; i < this->numS; i ++) + { + write_int (fd, this->systemHeader[i].offset); + write_int (fd, this->systemHeader[i].size); + } + for (i = 0; i < this->numG; i ++) + { + write_int (fd, this->gopTable[i].systemHeader); + write_int (fd, this->gopTable[i].offset); + write_int (fd, this->gopTable[i].headerSize); + write_int (fd, this->gopTable[i].size); + write_int (fd, this->gopTable[i].totalFrames); + write_int (fd, this->gopTable[i].previousFrames); + write_int (fd, this->gopTable[i].firstIoffset); + } + for (i = 0; i < this->numF; i ++) + { + write_byte (fd, this->frameTable[i].type); + write_short (fd, this->frameTable[i].size); + } + + close (fd); + this->videoFile[fnlen] = 0; + return; +fail_WriteInfoToFile: + if (fd >= 0) + close (fd); + this->videoFile[fnlen] = 0; + return; +} + +int +Video_Global::init_MPEG1_video_file (void) +{ + u_char nb; + int state = 0; + u_long fileptr = 0; + u_long i, j, k; + int shptr, gopptr, ftptr; + int inpic = 0; + u_long picptr = 0; + int pictype = 0; + int first = 0; + int failureType = 0; + + this->fp = fopen (this->videoFile, "r"); + if (this->fp == NULL) + { + fprintf (stderr, "error on opening video file %s", this->videoFile); + perror (""); + return 2; + } + if (fseek (this->fp, 0, 2) == -1) + { + fprintf (stderr, "File %s not seekable", this->videoFile); + perror (""); + return 3; + } + this->fileSize = ftell (this->fp); + + fseek (this->fp, 0, 0); + + if (ReadInfoFromFile ()) + { + for (;;) + { + nextByte; + if (state >= 0 && nb == 0x00) + state ++; + else if (state >= 2 && nb == 0x01) + state = -1; + else if (state == -1) + { + if (!first) first ++; + else if (first == 1) first ++; + + switch (nb) + { + case 0xb7: /* seq_end_code */ + goto exit_phase1; + break; + case 0xb3: /* seq_start_code */ + if (first == 1) first = 3; + if (first != 3) + { + fprintf (stderr, "VS error: given file is not in MPEG format.\n"); + return 4; + } + this->numS ++; + break; + case 0xb8: /* gop_start_code */ + this->numG ++; + break; + case 0x00: /* picture_start_code */ + nextByte; + nextByte; + nb &= 0x38; + if (nb == 0x08) + { + this->numI ++; + if (this->numG == 2) + this->pattern[this->patternSize++] = 'I'; + } + else if (nb == 0x10) + { + this->numP ++; + if (this->numG == 2) + this->pattern[this->patternSize++] = 'P'; + } + else if (nb == 0x18) + { + this->numB ++; + if (this->numG == 2) + this->pattern[this->patternSize++] = 'B'; + } + /* + else + fprintf (stderr, "VS error: unkonw picture type %d\n", nb); + */ + break; + default: + break; + } + state = 0; + } + else + state = 0; + } + exit_phase1: + + if (first != 3) + { + fprintf (stderr, "VS error: given file \"%s\" is not of MPEG format.\n", this->videoFile); + return 4; + } + + this->pattern[this->patternSize] = 0; + memset (this->sendPattern, 1, PATTERN_SIZE); + + this->numF = this->numI + this->numP + this->numB; + this->averageFrameSize = fileptr / (unsigned)this->numF; + /* + fprintf (stderr, "Pass one finished, total bytes read: %u, average frame size %d\n", + fileptr, this->averageFrameSize); + fprintf (stderr, "this->numS-%d, this->numG-%d, this->numF-%d, this->numI-%d, this->numP-%d, this->numB-%d\n", + this->numS, this->numG, this->numI, this->numI, this->numP, this->numB); + fprintf (stderr, "this->Pattern detected: %s\n", this->pattern); + */ + if (this->numF > MAX_FRAMES) + { + fprintf (stderr, "VS error: this->Number of frames (%d) is bigger than MAX_FRAMES (%d).\n\ +you need to update the constant definition in common.h and recompile.\n", + this->numF, MAX_FRAMES); + return 5; + } + +#ifdef STAT + this->framesSent = (char *)ACE_OS::malloc ((this->numF + 7)>>3); + if (this->framesSent == NULL) + { + fprintf (stderr, "VS fails to alloc mem for this->framesSent for %d frames", this->numF); + perror (""); + return 6; + } +#endif + + this->systemHeader = (struct Video_Global::SystemHeader *)ACE_OS::malloc (sizeof (struct Video_Global::SystemHeader) * this->numS); + if (this->systemHeader == NULL) + { + perror ("VS error on ACE_OS::malloc this->SystemHeader"); + return 7; + } + this->gopTable = (struct Video_Global::GopTable *)ACE_OS::malloc (sizeof (struct Video_Global::GopTable) * this->numG); + if (this->gopTable == NULL) + { + perror ("VS error on ACE_OS::malloc GopHeader"); + return 8; + } + this->frameTable = (struct Video_Global::FrameTable *)ACE_OS::malloc (sizeof (Video_Global::FrameTable) * this->numF); + if (this->frameTable == NULL) + { + perror ("VS error on ACE_OS::malloc this->frameTable"); + return 9; + } + + rewind (this->fp); + fileptr = 0; + state = 0; + inpic = 0; + shptr = -1; + gopptr = -1; + ftptr = 0; + + for (;;) + { + nextByte; + if (state >= 0 && nb == 0x00) + state ++; + else if (state >= 2 && nb == 0x01) + state = -1; + else if (state == -1) + { + switch (nb) + { + case 0xb7: /* seq_end_code */ + if (gopptr >= 0 && this->gopTable[gopptr].size == 0) + this->gopTable[gopptr].size = fileptr - this->gopTable[gopptr].offset - 4; + computePicSize; + goto exit_phase2; + break; + case 0xb3: /* seq_start_code */ + if (gopptr >= 0 && this->gopTable[gopptr].size == 0) + this->gopTable[gopptr].size = fileptr - this->gopTable[gopptr].offset - 4; + computePicSize; + shptr ++; + this->systemHeader[shptr].offset = fileptr - 4; + this->systemHeader[shptr].size = 0; + break; + case 0xb8: /* gop_start_code */ + if (this->systemHeader[shptr].size == 0) + this->systemHeader[shptr].size =fileptr - this->systemHeader[shptr].offset - 4; + if (gopptr >= 0 && this->gopTable[gopptr].size == 0) + this->gopTable[gopptr].size = fileptr - this->gopTable[gopptr].offset - 4; + computePicSize; + gopptr ++; + this->gopTable[gopptr].systemHeader = shptr; + this->gopTable[gopptr].offset = fileptr - 4; + this->gopTable[gopptr].headerSize = 0; + this->gopTable[gopptr].size = 0; + this->gopTable[gopptr].totalFrames = 0; + this->gopTable[gopptr].previousFrames = gopptr ? + (this->gopTable[gopptr - 1].totalFrames + this->gopTable[gopptr - 1].previousFrames) : 0; + + break; + case 0x00: /* picture_start_code */ + if (this->gopTable[gopptr].headerSize == 0) + { + this->gopTable[gopptr].headerSize = fileptr - this->gopTable[gopptr].offset - 4; + this->gopTable[gopptr].firstIoffset = fileptr - 4; + } + this->gopTable[gopptr].totalFrames ++; + computePicSize; + picptr = fileptr - 4; + nextByte; + nextByte; + nb &= 0x38; + if (nb == 0x08) + { + pictype = 'I'; + inpic = 1; + } + else if (nb == 0x10) + { + pictype = 'P'; + inpic = 1; + } + else if (nb == 0x18) + { + pictype = 'B'; + inpic = 1; + } + break; + default: + + break; + } + state = 0; + } + else + state = 0; + } + + exit_phase2: + for (shptr = 0; shptr<this->numS; shptr++) + { + this->maxS = max (this->maxS, this->systemHeader[shptr].size); + this->minS = min (this->minS, this->systemHeader[shptr].size); + } + for (gopptr = 0; gopptr<this->numG; gopptr++) + { + this->maxG = max (this->maxG, this->gopTable[gopptr].headerSize); + this->minG = min (this->minG, this->gopTable[gopptr].headerSize); + } + this->packetBufSize = this->maxS + this->maxG + max (this->maxI, max (this->maxP, this->maxB)); + this->packet = (VideoPacket *)ACE_OS::malloc (sizeof (VideoMessage) + sizeof (VideoPacket) + + this->packetBufSize); + if (this->packet == NULL) + { + perror ("VS error on ACE_OS::malloc this->packet buffer"); + return 10; + } + this->packet = (VideoPacket *) ((char *)this->packet + sizeof (VideoMessage)); + /* + fprintf (stderr, "Pass 2 finished.\n"); + fprintf (stderr, "this->maxS-%d, this->maxG-%d, this->maxI-%d, this->maxP-%d, this->maxB-%d.\n", this->maxS, this->maxG, this->maxI, this->maxP, this->maxB); + fprintf (stderr, "this->minS-%d, this->minG-%d, this->minI-%d, this->minP-%d, this->minB-%d.\n", this->minS, this->minG, this->minI, this->minP, this->minB); + */ + /* + { + int i; + + fprintf (stderr, "id: offset size -- system header table:\n"); + for (i=0; i<this->numS; i++) + fprintf (stderr, "%-3d %-9u %d\n", i, this->systemHeader[i].offset, this->systemHeader[i].size); + fprintf (stderr, + "id: header offset hdsize totSize frames preframs Ioffset Isize -- GOP\n"); + for (i=0; i<this->numG; i++) + { + fprintf (stderr, "%-4d %-8d %-8u %-8d %-8d %-8d %-8d %-8u %d\n", + i, + this->gopTable[i].systemHeader, + this->gopTable[i].offset, + this->gopTable[i].headerSize, + this->gopTable[i].size, + this->gopTable[i].totalFrames, + this->gopTable[i].previousFrames, + this->gopTable[i].firstIoffset, + this->frameTable[this->gopTable[i].previousFrames].size + ); + } + + fprintf (stderr, "\nframe information:"); + for (i=0; i<this->numF; i++) + fprintf (stderr, "%c%c%-8d", (i%10 ? '\0' : '\n'), this->frameTable[i].type, this->frameTable[i].size); + fprintf (stderr, "\n"); + + } + */ + fseek (this->fp, this->systemHeader[0].offset+4, 0); + nextByte; + this->horizontalSize = ((int)nb <<4) & 0xff0; + nextByte; + this->horizontalSize |= (nb >>4) & 0x0f; + this->verticalSize = ((int)nb <<8) & 0xf00; + nextByte; + this->verticalSize |= (int)nb & 0xff; + nextByte; + this->pelAspectRatio = ((int)nb >> 4) & 0x0f; + this->pictureRate = (int)nb & 0x0f; + nextByte; + nextByte; + nextByte; + this->vbvBufferSize = ((int)nb << 5) & 0x3e0; + nextByte; + this->vbvBufferSize |= ((int)nb >>3) & 0x1f; + /* + fprintf (stderr, "SysHeader info: hsize-%d, vsize-%d, pelAspect-%d, rate-%d, vbv-%d.\n", + this->horizontalSize, this->verticalSize, this->pelAspectRatio, this->pictureRate, this->vbvBufferSize); + */ + WriteInfoToFile (); + } +#if 0 + { + int i, j = 20; + + for (i = this->numG - 1;; i --) { + if (this->gopTable[i].offset < 4235260) { + fprintf (stderr, "group %d: offset %ld\n", i, this->gopTable[i].offset); + if (j -- == 0) break; + } + } + /* + for (i = 0; i < this->numG; i ++) { + if (this->gopTable[i].previousFrames > 1800) { + fprintf (stderr, "group %d: offset %ld pre-frames %d\n", + i, this->gopTable[i].offset, this->gopTable[i].previousFrames); + break; + } + } + */ + } +#endif + { + this->firstPatternSize = this->gopTable[0].totalFrames; + this->firstSendPattern = (char *)ACE_OS::malloc (this->firstPatternSize); + if (this->firstSendPattern == NULL) + { + fprintf (stderr, "VS failed to allocate firstSendPattern for %d frames", + this->firstPatternSize); + perror (""); + return 11; + } + } + this->firstGopFrames = this->gopTable[0].totalFrames; + return 0; +} +int +Video_Global::play_send (int debug) +{ + // ACE_DEBUG ((LM_DEBUG,"play_send: sending the frame \n")); + int curGroup = Video_Timer_Global::timerGroup; + int curFrame = Video_Timer_Global::timerFrame; + int curHeader = Video_Timer_Global::timerHeader; + char * sp; + + if (this->preGroup != curGroup || + curFrame != this->preFrame) + { + int sendStatus = -1; + int frameStep = 1; + if (debug) + cerr << " curgroup = " << curGroup << endl ; + if (curGroup == 0) + { + + int i = curFrame + 1; + while (i < this->firstPatternSize && + !this->firstSendPattern[i]) + { + frameStep ++; + i++; + } + } + else /* (curGroup > 0) */ + { + int i = curFrame + 1; + sp = this->sendPattern + ((curGroup - 1) % this->sendPatternGops) * this->patternSize; + while (i < this->patternSize && !sp[i]) + { + frameStep ++; + i++; + } + } + if (curGroup == 0) + { + if (debug) + cerr << "first : " << + this->firstSendPattern[curFrame] << endl; + if (this->firstSendPattern[curFrame]) + sendStatus = 0; + else /* (!firstSendPattern[curFrame]) */ + { + int i = curFrame - 1; + while (i > 0 && !this->firstSendPattern[i]) + i--; + if (i > this->preFrame) + /* the frame (curGroup, i) hasn't been sent yet */ + { + sendStatus = 0; + curFrame = i; + } + else + sendStatus = -1; + if (debug) + cerr << "SendStatus = " << sendStatus << endl; + } + } + else if (sp[curFrame]) /* curGroup > 0 */ + sendStatus = 0; + else /* (!sp[curFrame]) */ + { + int i = curFrame - 1; + while (i > 0 && !sp[i]) + i--; + if (curGroup == this->preGroup && i > this->preFrame) + /* the frame (curGroup, i) hasn't been sent yet */ + { + sendStatus = 0; + curFrame = i; + } + else + sendStatus = -1; + } + if (!sendStatus) + { + // Send the current video frame, calls send_to_network which + // fragments and sends via blocking write . + sendStatus = this->SendPacket (this->preHeader != curHeader, + curGroup, curFrame, + (this->currentUPF + this->addedUPF) * frameStep); + if (sendStatus == -1) + return -1; + if (!sendStatus) + { + this->preHeader = curHeader; + this->preGroup = curGroup; + this->preFrame = curFrame; +#ifdef STAT + if (this->play_para.collectStat) + { + int f = this->gopTable[curGroup].previousFrames + curFrame; + this->framesSent[f>>3] |= (1 << (f % 8)); + } +#endif + } + } + } + return 0; +} + +int +Video_Global::fast_play_send (void) +{ + if (this->fast_preGroup != Video_Timer_Global::timerGroup) + { + int result; + result = this->SendPacket (this->fast_preHeader != Video_Timer_Global::timerHeader, Video_Timer_Global::timerGroup, 0, + this->fast_para.usecPerFrame * this->patternSize >> 2); + if (result == -1) + return -1; + this->fast_preHeader = Video_Timer_Global::timerHeader; + this->fast_preGroup = Video_Timer_Global::timerGroup; + } + return 0; +} + +int +Video_Global::position (void) +{ + int result; + POSITIONpara pos_para; + /* + fprintf (stderr, "POSITION . . .\n"); + */ + result = CmdRead ((char *)&pos_para, sizeof (pos_para)); + if (result != 0) + return result; + + if (this->live_source) return 0; + +#ifdef NeedByteOrderConversion + pos_para.nextGroup = ntohl (pos_para.nextGroup); + pos_para.sn = ntohl (pos_para.sn); +#endif + + CheckGroupRange (pos_para.nextGroup); + this->cmdsn = pos_para.sn; + result = SendPacket (this->numS>1 || pos_para.nextGroup == 0, pos_para.nextGroup, 0, 0); + return result; +} + +int +Video_Global::step_video () +{ + int group; + STEPpara step_para; + int tag = 0; + int result; + + result = CmdRead ((char *)&step_para, sizeof (step_para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + step_para.sn = ntohl (step_para.sn); + step_para.nextFrame = ntohl (step_para.nextFrame); +#endif + + this->cmdsn = step_para.sn; + + if (!this->live_source) { + if (step_para.nextFrame >= this->numF) /* send SEQ_END */ + { + tag = 1; + step_para.nextFrame --; + } + /* + fprintf (stderr, "STEP . . .frame-%d\n", step_para.nextFrame); + */ + CheckFrameRange (step_para.nextFrame); + group = FrameToGroup (&step_para.nextFrame); + if (this->precmd != CmdSTEP && !tag ) { + result = SendReferences (group, step_para.nextFrame); + if (result < 0 ) + return result; + } + } + if (this->live_source) StartPlayLiveVideo (); + + if (this->live_source) { + SendPicture (&step_para.nextFrame); + } + else if (this->video_format == VIDEO_MPEG1) { + SendPacket (this->numS>1, group, tag ? this->numF : step_para.nextFrame, 0); + } + else { + fprintf (stderr, "VS: wierd1\n"); + } + + if (this->live_source) StopPlayLiveVideo (); + return 0; +} + +int +Video_Global::fast_forward (void) +{ + // return this->init_fast_play () + return 0; +} + +int +Video_Global::fast_backward (void) +{ +// return this->init_fast_play (); + return 0; +} + +int +Video_Global::stat_stream (void) +{ + int i, j = 0; + for (i = 0; i < this->numF; i++) + { + short size = htons (this->frameTable[i].size); + char type = this->frameTable[i].type; + if (i == this->gopTable[j].previousFrames) + { + type = tolower (type); + j ++; + } + CmdWrite ((char *)&type, 1); + CmdWrite ((char *)&size, 2); + } + return 0; +} + +int +Video_Global::stat_sent (void) +{ +#ifdef STAT + CmdWrite ((char *)this->framesSent, (this->numF + 7) / 8); +#else + int i; + char zeroByte = 0; + for (i = 0; i < (this->numF + 7) / 8; i++) + CmdWrite ((char *)&zeroByte, 1); +#endif + return 0; +} + +int +Video_Global::init_play (Video_Control::PLAYpara para, + CORBA::Long_out vts) +{ + // ~~ why do we need the play_para in Video_Global , why can't just use + // the para that's passed. + int result; + + ACE_DEBUG ((LM_DEBUG, + " (%P|%t) Video_Global::init_play ()")); + + // this gets the parameters for the play command + // result = this->CmdRead ((char *)&this->play_para, sizeof (this->play_para)); + // if (result != 0) + // return result; + + // Assign the passed play + this->play_para = para ; +#ifdef NeedByteOrderConversion +// this->play_para.sn = ntohl (this->play_para.sn); +// this->play_para.nextFrame = ntohl (this->play_para.nextFrame); +// this->play_para.usecPerFrame = ntohl (this->play_para.usecPerFrame); +// this->play_para.framesPerSecond = ntohl (this->play_para.framesPerSecond); +// this->play_para.frameRateLimit1000 = ntohl (this->play_para.frameRateLimit1000); +// this->play_para.collectStat = ntohl (this->play_para.collectStat); +// this->play_para.sendPatternGops = ntohl (this->play_para.sendPatternGops); +// this->play_para.VStimeAdvance = ntohl (this->play_para.VStimeAdvance); + + this->play_para.sn = ntohl (play_para.sn); + this->play_para.nextFrame = ntohl (para.nextFrame); + this->play_para.usecPerFrame = ntohl (para.usecPerFrame); + this->play_para.framesPerSecond = ntohl (para.framesPerSecond); + this->play_para.frameRateLimit1000 = ntohl (para.frameRateLimit1000); + this->play_para.collectStat = ntohl (para.collectStat); + this->play_para.sendPatternGops = ntohl (para.sendPatternGops); + this->play_para.VStimeAdvance = ntohl (para.VStimeAdvance); +#endif + +// this->frameRateLimit = this->play_para.frameRateLimit1000 / 1000.0; +// this->cmdsn = this->play_para.sn; +// this->currentUPF = this->play_para.usecPerFrame; +// this->VStimeAdvance = this->play_para.VStimeAdvance; + + this->frameRateLimit = para.frameRateLimit1000 / 1000.0; + this->cmdsn = para.sn; + this->currentUPF = para.usecPerFrame; + this->VStimeAdvance = play_para.VStimeAdvance; + + vts = get_usec (); + // cerr << "vts is " << vts << endl; + // begin evil code + // { + // int vts = get_usec (); + // this->CmdWrite ((char *)&ts, sizeof (int)); + // } + // end evil code + + if (this->live_source || this->video_format != VIDEO_MPEG1) { + PLAYpara live_play_para; // xxx hack to compile the code + if (this->live_source) + this->PLAYliveVideo (&live_play_para); + return 0; + } + + fprintf (stderr, "this->VStimeAdvance from client: %d\n", this->VStimeAdvance); + + this->sendPatternGops = this->play_para.sendPatternGops; + ComputeFirstSendPattern (this->frameRateLimit); +#ifdef STAT + if (this->play_para.collectStat) + memset (this->framesSent, 0, (this->numF + 7)>>3); +#endif + CheckFrameRange (para.nextFrame); + Video_Timer_Global::timerFrame = para.nextFrame; + Video_Timer_Global::timerGroup = FrameToGroup (&Video_Timer_Global::timerFrame); + Video_Timer_Global::timerHeader = this->gopTable[Video_Timer_Global::timerGroup].systemHeader; + // memcpy (this->sendPattern, this->play_para.sendPattern, PATTERN_SIZE); + // Do a sequence copy.. + + for (int i=0; i<PATTERN_SIZE ; i++) + this->sendPattern[i] = this->play_para.sendPattern[i]; + result = SendReferences (Video_Timer_Global::timerGroup, Video_Timer_Global::timerFrame); + if (result < 0) + return result; + Video_Timer_Global::StartTimer (); + + // Sends the first frame of the video... not true anymore since the + // user can position the stream anywhere and then call play. + result = play_send (0); + return 0; +} + +CORBA::Boolean +Video_Global::init_fast_play (const Video_Control::FFpara &ff_para ) +{ + // save the parameters for future reference + this->fast_para = ff_para; + int result; + + // result = CmdRead ((char *)&this->ff_para, sizeof (this->ff_para)); + // if (result != 0) + // return result; + + if (this->live_source) return 0; + + this->VStimeAdvance = ff_para.VStimeAdvance; + /* + fprintf (stderr, "this->VStimeAdvance from client: %d\n", this->VStimeAdvance); + */ + CheckGroupRange (ff_para.nextGroup); + this->cmdsn = ff_para.sn; + Video_Timer_Global::timerGroup = ff_para.nextGroup; + Video_Timer_Global::timerFrame = 0; + Video_Timer_Global::timerHeader = this->gopTable[Video_Timer_Global::timerGroup].systemHeader; + this->currentUPF = ff_para.usecPerFrame; + Video_Timer_Global::StartTimer (); + + fast_play_send (); + return 0; +} + +int +Video_Global::init_video (void) +{ + INITvideoPara para; + int failureType = 0; + int result; + /* + fprintf (stderr, "VS about to read Para.\n"); + */ + result = CmdRead ((char *)¶, sizeof (para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + para.sn = ntohl (para.sn); + para.version = ntohl (para.version); + para.nameLength = ntohl (para.nameLength); +#endif + if (para.nameLength>0) + { + result = CmdRead (this->videoFile, para.nameLength); + if (result != 0) + return result; + } + if (Mpeg_Global::session_num > Mpeg_Global::session_limit || para.version != VERSION) { + char errmsg[128]; + this->cmd = CmdFAIL; + CmdWrite ((char *)&this->cmd, 1); + if (Mpeg_Global::session_num > Mpeg_Global::session_limit) { + sprintf (errmsg, + "Too many sessions being serviced, please try again later.\n"); + } + else { + sprintf (errmsg, "Version # not match, VS %d.%02d, Client %d.%02d", + VERSION / 100, VERSION % 100, + para.version / 100, para.version % 100); + } + write_string (this->serviceSocket, errmsg); + exit (0); + } + this->cmdsn = para.sn; + /* + fprintf (stderr, "MPEG file %s got.\n", this->videoFile); + */ + this->videoFile[para.nameLength] = 0; + + if (!strncasecmp ("LiveVideo", this->videoFile, 9)) { + if (OpenLiveVideo (&this->video_format, &this->horizontalSize, + &this->verticalSize, &this->averageFrameSize, + &this->fps, &this->pelAspectRatio) == -1) { + failureType = 100; + goto failure; + } + if (this->video_format == VIDEO_MPEG2) { + failureType = 101; + goto failure; + } + this->live_source = 1; + + this->fileSize =0x7fffffff; + this->maxS = this->maxG = this->maxI = this->maxP = this->maxB = this->minS = this->minG = this->minI = this->minP = this->minB = 1; + this->numS = this->numG = this->numF = this->numI = 0x7fffffff; + this->numP = this->numB = 0; + this->vbvBufferSize = 1; + this->firstGopFrames = 1; + this->patternSize = 1; + this->pattern[0] = 'I'; + this->pattern[1] = 0; + this->packetBufSize = this->verticalSize * this->horizontalSize * 3; + this->packet = (VideoPacket *)ACE_OS::malloc (sizeof (VideoMessage) + sizeof (VideoPacket) + + this->packetBufSize); + if (this->packet == NULL) + { + perror ("Error: VS error on ACE_OS::malloc this->packet buffer"); + exit (1); + } + this->packet = (VideoPacket *) ((char *)this->packet + sizeof (VideoMessage)); + + } + else { + static double pictureRateTable[] = {23.976, 24, 25, 29.97, 30, 50, 59.94, 60}; + + this->video_format = VIDEO_MPEG1; + failureType = init_MPEG1_video_file (); + if (failureType) goto failure; + this->fps = pictureRateTable[this->pictureRate - 1]; + } + + { + INITvideoReply reply; + + reply.totalHeaders = htonl (this->numS); + reply.totalGroups = htonl (this->numG); + reply.totalFrames = htonl (this->numF); + reply.sizeIFrame = htonl (this->maxI); + reply.sizePFrame = htonl (this->maxP); + reply.sizeBFrame = htonl (this->maxB); + reply.sizeSystemHeader = htonl (this->maxS); + reply.sizeGop = htonl (this->maxG); + reply.averageFrameSize = htonl (this->averageFrameSize); + reply.verticalSize = htonl (this->verticalSize); + reply.horizontalSize = htonl (this->horizontalSize); + reply.pelAspectRatio = htonl (this->pelAspectRatio); + reply.pictureRate1000 = htonl ((int) (this->fps * 1000)); + reply.vbvBufferSize = htonl (this->vbvBufferSize); + reply.firstGopFrames = htonl (this->firstGopFrames); + reply.patternSize = htonl (this->patternSize); + strncpy (reply.pattern, this->pattern, PATTERN_SIZE); + + reply.live = htonl (this->live_source); + reply.format = htonl (this->video_format); + + CmdWrite ((char *)&this->cmd, 1); + + CmdWrite ((char *)&reply, sizeof (reply)); + + /* write the first SH, GOP and IFrame to this->serviceSocket (TCP), + using code for SendPacket () */ + { + int tmpSocket = this->videoSocket; + + if (this->live_source) StartPlayLiveVideo (); + + this->videoSocket = this->serviceSocket; + + if (this->live_source) { + int frame = 0; + SendPicture (&frame); + } + else if (this->video_format == VIDEO_MPEG1) { + SendPacket (1, 0, 0, 0); + } + else { + fprintf (stderr, "VS: this->video_format %d not supported.\n", + this->video_format); + } + this->videoSocket = tmpSocket; + + if (this->live_source) StopPlayLiveVideo (); + } + + return 0; + + } +failure: + { + char * msg; + char errmsg[64]; + this->cmd = CmdFAIL; + sprintf (errmsg, "VS failed to alloc internal buf (type %d)", failureType); + CmdWrite ((char *)&this->cmd, 1); + msg = failureType == 1 ? (char *)"not a complete MPEG stream" : + failureType == 2 ? (char *)"can't open MPEG file" : + failureType == 3 ? (char *)"MPEG file is not seekable" : + failureType == 4 ? (char *)"not an MPEG stream" : + failureType == 5 ? + (char *)"too many frames in MPEG file, need change MAX_FRAMES and recompile VS" : + failureType == 100 ? (char *)"failed to connect to live video source" : + failureType == 101 ? (char *)"live MPEG2 not supported" : + errmsg; + write_string (this->serviceSocket, msg); + exit (0); + } +} + +//-------------------------------------------------------- +// Video_Timer_Global methods +void +Video_Timer_Global::StartTimer (void) +{ + VIDEO_SINGLETON::instance ()->addedUPF = 0; + VIDEO_SINGLETON::instance ()->addedSignals = 0; + timerAdjust = (VIDEO_SINGLETON::instance ()->VStimeAdvance * SPEEDUP_INV_SCALE) / VIDEO_SINGLETON::instance ()->currentUPF; + /* + SFprintf (stderr, "VS StartTimer (): fast-start frames %d\n", + timerAdjust / SPEEDUP_INV_SCALE); + */ + TimerSpeed (); + // setsignal (SIGALRM, timerHandler); + timerOn = 1; + preTimerVal = get_usec (); + /* + fprintf (stderr, "VS: timer started at %d upf.\n", VIDEO_SINGLETON::instance ()->currentUPF + VIDEO_SINGLETON::instance ()->addedUPF); + */ +} + +void +Video_Timer_Global::StopTimer (void) +{ + struct itimerval val; + // ## I have to incorporate this logic into the changed code + // setsignal (SIGALRM, SIG_IGN); + val.it_interval.tv_sec = val.it_value.tv_sec = 0; + val.it_interval.tv_usec = val.it_value.tv_usec = 0; + setitimer (ITIMER_REAL, &val, NULL); + timerOn = 0; + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Video_Timer_Global::StopTimer: timer stopped\n")); +} + +void +Video_Timer_Global::TimerSpeed (void) +{ + struct itimerval val; + int usec = VIDEO_SINGLETON::instance ()->currentUPF + VIDEO_SINGLETON::instance ()->addedUPF; + if (Mpeg_Global::drift_ppm) { + /* + int drift = (double)usec * (double)Mpeg_Global::drift_ppm / 1000000.0; + SFprintf (stderr, "Mpeg_Global::drift_ppm %d, usec %d, drift %d, new usec %d\n", + Mpeg_Global::drift_ppm, usec, drift, usec - drift); + */ + usec -= (int) ((double)usec * (double)Mpeg_Global::drift_ppm / 1000000.0); + } + if (timerAdjust > 1) + usec = (int) (((double)usec * (double) (SPEEDUP_INV_SCALE - 1)) / + (double)SPEEDUP_INV_SCALE); + val.it_interval.tv_sec = val.it_value.tv_sec = usec / 1000000; + val.it_interval.tv_usec = val.it_value.tv_usec = usec % 1000000; + setitimer (ITIMER_REAL, &val, NULL); + /* + SFprintf (stderr, + "VS TimerSpeed () at %s speed, timerAdjust %d VIDEO_SINGLETON::instance ()->addedSignals %d.\n", + (timerAdjust > 1) ? "higher" : "normal", timerAdjust, VIDEO_SINGLETON::instance ()->addedSignals); + */ + +} + +void +Video_Timer_Global::TimerProcessing (void) +{ + /* + fprintf (stderr, "VS: timerHandler...\n"); + */ + if (!timerOn) { + return; + } + if (timerAdjust < 0) + { + timerAdjust += SPEEDUP_INV_SCALE; + return; + } + if (timerAdjust >0) + { + if ((--timerAdjust) == 0) + TimerSpeed (); + } + if (VIDEO_SINGLETON::instance ()->cmd == CmdPLAY) + { + if (timerGroup == VIDEO_SINGLETON::instance ()->numG - 1 && timerFrame >= VIDEO_SINGLETON::instance ()->gopTable[timerGroup].totalFrames - 1) + { + timerFrame ++; /* force sending of END_SEQ when PLAY VIDEO_SINGLETON::instance ()->cmd */ + StopTimer (); + return; + } + else + { + timerFrame ++; + if (timerFrame >= VIDEO_SINGLETON::instance ()->gopTable[timerGroup].totalFrames) + { + timerGroup ++; + timerFrame = 0; + timerHeader = VIDEO_SINGLETON::instance ()->gopTable[timerGroup].systemHeader; + } + } + } + else { + if (VIDEO_SINGLETON::instance ()->cmd == CmdFF) { + if (timerGroup == VIDEO_SINGLETON::instance ()->numG - 1) { + StopTimer (); + return; + } + timerGroup ++; + timerHeader = VIDEO_SINGLETON::instance ()->gopTable[timerGroup].systemHeader; + } + else { + if (timerGroup == 0) { + StopTimer (); + return; + } + timerGroup --; + timerHeader = VIDEO_SINGLETON::instance ()->gopTable[timerGroup].systemHeader; + } + } + +} + +void +Video_Timer_Global::timerHandler (int sig) +{ + // ACE_DEBUG ((LM_DEBUG, + // "Video_Timer_Global::timerHandler\n")); + + int val2, val3; + int usec = VIDEO_SINGLETON::instance ()->currentUPF + VIDEO_SINGLETON::instance ()->addedUPF; + + if (Mpeg_Global::drift_ppm) { + usec -= (int) ((double)usec * (double)Mpeg_Global::drift_ppm / 1000000.0); + } + + if (timerAdjust > 1) + usec = (int) (((double)usec * (double) (SPEEDUP_INV_SCALE - 1)) / + (double)SPEEDUP_INV_SCALE); + val3 = get_duration (preTimerVal, (val2 = get_usec ())); + /* + if (val3 >= usec<< 1)) + fprintf (stderr, "Slower: %d out of VIDEO_SINGLETON::instance ()->currentUPF %d.\n", + val3, usec); + else + fprintf (stderr, "+\n"); + */ + preTimerVal = val2; + if (val3 < 0 || val3 > 100000000) + val3 = usec; + val2 = (val3 + (usec>>1)) / usec; + if (val2 < 0) val2 = 0; + if (val2) { + TimerProcessing (); + val2 --; + } + VIDEO_SINGLETON::instance ()->addedSignals += val2; + + if (VIDEO_SINGLETON::instance ()->addedSignals) { + val2 = timerAdjust; + if (timerAdjust < MAX_TIMER_ADJUST) { + timerAdjust += VIDEO_SINGLETON::instance ()->addedSignals * SPEEDUP_INV_SCALE; + if (val2 < SPEEDUP_INV_SCALE) { + TimerSpeed (); + } + } + else { + /* + fprintf (stderr, "VS timerAdjust %d, VIDEO_SINGLETON::instance ()->addedSignals %d, timerFrame %d\n", + timerAdjust, VIDEO_SINGLETON::instance ()->addedSignals, timerFrame); + */ + for (val3 = 0; val3 < VIDEO_SINGLETON::instance ()->addedSignals; val3 ++) + TimerProcessing (); + } + VIDEO_SINGLETON::instance ()->addedSignals = 0; + } +} + +// send the first packet, given by packet pointed by +// 'this->packet' to the network. +int +Video_Global::send_to_network (int timeToUse) +{ + int count = 0; + VideoMessage * msghd = (VideoMessage *) (((char *) this->packet) - sizeof (VideoMessage)); + int sent = 0; + int packetSize = ntohl (this->packet->dataBytes); + + msghd->packetsn = htonl (this->packetsn ++); + msghd->packetSize = htonl (packetSize + sizeof (* this->packet)); + + // fprintf (stderr, "VS to send pkt %d of size %d.\n", + // ntohl (msghd->packetsn), ntohl (msghd->packetSize)); + + + { + VideoMessage * msg = NULL; + int size = packetSize + sizeof (* this->packet); /* msghd->this->packetSize */ + int offset = 0; + int targetTime; + + if (size > this->msgsize) + { + if (!timeToUse) + { + timeToUse = (this->msgsize + sizeof (*msg) + 28) * 2; + /* + set the max network as 500KB. + 28 - UDP header size + */ + /* + fprintf (stderr, "computed timeToUse %d. ", timeToUse); + */ + } + else + { + timeToUse = (timeToUse * 7) >> 3; + /* + fprintf (stderr, "preset timeToUse %d.", timeToUse); + */ + timeToUse /= (size + this->msgsize - 1) / this->msgsize; + timeToUse = min (timeToUse, (this->msgsize + sizeof (*msg) + 28) * 100); + /* limit min network bandwidth = 10K */ + } + + } + while (size > 0) + { + int segsize, sentsize; + int resent = 0; + + if (msg == NULL) { /* first message for current this->packet */ + count = 0; + msg = msghd; + targetTime = get_usec (); + } + else { +#if 0 + /* the select () is not precise enough for being used here*/ + int sleepTime; + targetTime += timeToUse; + sleepTime = get_duration (get_usec (), targetTime); + if (sleepTime >= 5000) { /* resolution of timer is 10,000 usec */ + usleep (sleepTime); /* not first message, wait for a while */ + } +#endif + /* + count ++; + if (! (count % 10)) usleep (10000); + */ + msg = (VideoMessage *) ((char *)msg + this->msgsize); + memcpy ((char *)msg, (char *)msghd, sizeof (* msg)); + } + msg->msgsn = htonl (this->msgsn++); + msg->msgOffset = htonl (offset); + msg->msgSize = htonl (min (size, this->msgsize)); + + segsize = min (size, this->msgsize)+sizeof (*msg); + if (this->conn_tag != 0) { /* this->packet stream */ + // cerr << "sending " << segsize << " on fd = " << this->videoSocket << endl; + while ((sentsize = write (this->videoSocket, (char *)msg, segsize)) == -1) { + if (errno == EINTR) + continue; + if (errno == ENOBUFS) { + if (resent) { + perror ("Warning, pkt discarded because"); + sent = -1; + break; + } + else { + resent = 1; + perror ("VS to sleep 5ms"); + usleep (5000); + continue; + } + } + if (errno != EPIPE) { + fprintf (stderr, "VS error on send this->packet %d of size %d ", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + perror (""); + } + exit (errno != EPIPE); + } + } + else { + sentsize = wait_write_bytes (this->videoSocket, (char *)msg, segsize); + if (sentsize == -1) { + if (errno != EPIPE) { + fprintf (stderr, "VS error on send this->packet %d of size %d ", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + perror (""); + } + exit (errno != EPIPE); + } + } + if (sentsize < segsize) { + SFprintf (stderr, "VS warning: message size %dB, sent only %dB\n", + segsize, sentsize); + } + if (sent == -1) + break; + /* + fprintf (stderr, "VS: message %d of size %d sent.\n", + this->msgsn-1, min (size, this->msgsize)+sizeof (*msg)); + */ + size -= this->msgsize; + offset += this->msgsize; + } + } + /* + fprintf (stderr, "sent = %d\n", sent); + */ + if (!sent) this->pkts_sent ++; + return sent; +} + + +Audio_Global::Audio_Global (void) + :state (AUDIO_WAITING), + addSamples (0), + nextTime (0), + upp (0), + delta_sps (0), + bytes_sent (0), + start_time (0), + conn_tag (0), + serviceSocket (-1), + audioSocket (-1), + fd (0), + totalSamples (0), + fileSize (0), + cmd (0), + live_source (0), + databuf_size (0), + cmdsn (0), + nextsample (0), + sps (0), + spslimit (0), + spp (0), + pktbuf (0), + fbpara (0) +{ +} + +int +Audio_Global::CmdRead(char *buf, int psize) +{ + int res = wait_read_bytes(serviceSocket, buf, psize); + if (res == 0) return (1); + if (res == -1) { + fprintf(stderr, "AS error on read cmdSocket, size %d", psize); + ACE_OS::perror (""); + return (-1); + } + return 0; +} + +void +Audio_Global::CmdWrite(char *buf, int size) +{ + int res = wait_write_bytes(serviceSocket, buf, size); + if (res == -1) { + if (errno != EPIPE)ACE_OS::perror ("AS writes to serviceSocket"); + ACE_OS::exit (errno != EPIPE); + } +} + +int +Audio_Global::INITaudio(void) +{ + int result; + int failureType; /* 0 - can't open file, 1 - can't open live source */ + INITaudioPara para; + + result = CmdRead((char *)¶, sizeof(para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + para.sn = ntohl(para.sn); + para.version = ntohl(para.version); + para.nameLength = ntohl(para.nameLength); + para.para.encodeType = ntohl(para.para.encodeType); + para.para.channels = ntohl(para.para.channels); + para.para.samplesPerSecond = ntohl(para.para.samplesPerSecond); + para.para.bytesPerSample = ntohl(para.para.bytesPerSample); +#endif + if (para.nameLength>0) + result = CmdRead(audioFile, para.nameLength); + if (result != 0) + return result; + if (Mpeg_Global::session_num > Mpeg_Global::session_limit || para.version != VERSION) { + char errmsg[128]; + cmd = CmdFAIL; + CmdWrite((char *)&cmd, 1); + if (Mpeg_Global::session_num > Mpeg_Global::session_limit) { + sprintf(errmsg, + "Too many sessions being serviced, please try again later.\n"); + } + else { + sprintf(errmsg, "Version # not match, AS %d.%02d, Client %d.%02d", + VERSION / 100, VERSION % 100, + para.version / 100, para.version % 100); + } + write_string(serviceSocket, errmsg); + return(1); + } + ACE_OS::memcpy (&audioPara, ¶.para, sizeof(audioPara)); + /* + fprintf(stderr, "Client Audio para: encode %d, ch %d, sps %d, bps %d.\n", + para.para.encodeType, para.para.channels, + para.para.samplesPerSecond, para.para.bytesPerSample); + */ + audioFile[para.nameLength] = 0; + { + int len =ACE_OS::strlen (audioFile); + if (strncasecmp("LiveAudio", audioFile, 9) && + strcasecmp(".au", audioFile+len-3)) { + char errmsg[128]; + cmd = CmdFAIL; + CmdWrite((char *)&cmd, 1); + sprintf(errmsg, "%s without suffix .au", audioFile); + write_string(serviceSocket, errmsg); + return(1); + } + } + /* + fprintf(stderr, "Audio file %s got.\n", audioFile); + */ + + if (!strncasecmp("LiveAudio", audioFile, 9)) { + fd = OpenLiveAudio(&(para.para)); + if (fd == -1) { + failureType = 1; + goto failure; + } + fileSize =0x7fffffff; + totalSamples = fileSize / audioPara.bytesPerSample; + live_source = 1; + } + else { + LeaveLiveAudio(); + fd = open(audioFile, O_RDONLY); + if (fd == -1) + { + fprintf(stderr, "AS error on opening audio file %s", audioFile); + ACE_OS::perror (""); + failureType = 0; + goto failure; + } + + /* Try to get audioFile format audioPara here */ + + /* figure out totalsamples */ + fileSize = lseek(fd, 0L, SEEK_END); + lseek(fd, 0L, SEEK_SET); + totalSamples = fileSize / audioPara.bytesPerSample; + /* + fprintf(stderr, "Total Samples=%d in audio file %ss.\n", totalSamples, audioFile); + */ + } + { + INITaudioReply reply; + + reply.para.encodeType = htonl(audioPara.encodeType); + reply.para.channels = htonl(audioPara.channels); + reply.para.samplesPerSecond = htonl(audioPara.samplesPerSecond); + reply.para.bytesPerSample = htonl(audioPara.bytesPerSample); + reply.totalSamples = htonl(totalSamples); + + reply.live = htonl(live_source); + reply.format = htonl(AUDIO_RAW); + + CmdWrite((char *)&cmd, 1); + CmdWrite((char *)&reply, sizeof(reply)); + } + return 0; + + failure: + { + /* + fprintf(stderr, "AS error: failed initializing audio file.\n"); + */ + cmd = CmdFAIL; + CmdWrite((char *)&cmd, 1); + write_string(serviceSocket, + failureType == 0 ? (char *)"Failed to open audio file for read." : + (char *)"Failed to connect to live audio source."); + return(1); + } +} + +/* send a packet of audio samples to audioSocket + returns: 0 - no more data from audio file: EOF reached; + 1 - More data is available from the audio file */ +int +Audio_Global::send_packet (int firstSample, int samples) +{ + // ACE_DEBUG ((LM_DEBUG,"(%P|%t) send_packet called\n")); + long offset = firstSample * audioPara.bytesPerSample; + int size = samples * audioPara.bytesPerSample; + char * buf = (char *)pktbuf + sizeof(*pktbuf); + int len; + int resent = 0; + int segsize, sentsize; + + if (live_source) { + len = ReadLiveAudioSamples(buf, samples); + len *= audioPara.bytesPerSample; + } + else { + lseek(fd, offset, SEEK_SET); + while ((len = ACE_OS::read (fd, buf, size)) == -1) { + if (errno == EINTR) + continue; /* interrupted */ + ACE_OS::perror ("AS error on read audio file"); + return(-1); + } + if (len < audioPara.bytesPerSample) { + return 0; + } + } + + samples = len / audioPara.bytesPerSample; + len = samples * audioPara.bytesPerSample; + bytes_sent += len; + pktbuf->firstSample = htonl(firstSample); + pktbuf->samples = htonl(samples); + pktbuf->actualSamples = htonl(samples); + pktbuf->dataBytes = htonl(len); + if (spslimit < sps) { /* interpolation needed */ + SFprintf(stderr, "AS audio sample interpolation not available yet.\n"); + } + segsize = sizeof(*pktbuf) + len; + if (conn_tag != 0) { + while ((sentsize = ACE_OS::write (audioSocket, (char *)pktbuf, segsize)) == -1) { + if (errno == EINTR) /* interrupted */ + continue; + if (errno == ENOBUFS) { + if (resent) { + ACE_OS::perror ("AS Warning, pkt discarded because"); + break; + } + else { + resent = 1; + usleep(5000); + continue; + } + } + if (errno != EPIPE) { + fprintf(stderr, "AS error on send audio packet %d(%d):", + firstSample, samples); + perror(""); + } + ACE_OS::exit ((errno != EPIPE)); + } + } + else { + sentsize = wait_write_bytes(audioSocket, (char *)pktbuf, segsize); + if (sentsize == -1) { + if (errno != EPIPE) { + fprintf(stderr, "AS error on send audio packet %d(%d):", + firstSample, samples); + perror(""); + } + ACE_OS::exit ((errno != EPIPE)); + } + } + if (sentsize < segsize) { + SFprintf(stderr, "AS warning: message size %dB, sent only %dB\n", + segsize, sentsize); + } + /* + SFprintf(stderr, "AS sent audio packet %d(%d).\n", + firstSample, samples); + */ + return (len < size ? 0 : 1); +} + +/* send a packet of audio samples to audioSocket + returns: 0 - no more data from audio file: EOF reached; + 1 - More data is available from the audio file */ +int +Audio_Global::SendPacket (void) +{ + int moredata; + pktbuf->cmdsn = htonl(cmdsn); + pktbuf->resend = htonl(0); + pktbuf->samplesPerSecond = htonl(sps); + moredata = send_packet(nextsample, spp); + if (moredata) + { + nextsample += spp; + } + return moredata; +} + +void +Audio_Global::ResendPacket (int firstsample, int samples) +{ + pktbuf->cmdsn = htonl(cmdsn); + pktbuf->resend = htonl(1); + pktbuf->samplesPerSecond = htonl(sps); + while (samples > 0) { + int size = samples < spp ? samples : spp; + send_packet(firstsample, size); + firstsample += size; + samples -= size; + if (samples > 0) { + usleep(10000); + } + } +} + +#if 0 +int +Audio_Global::PLAYaudio(void) +{ + int hasdata = 1; + int addSamples; + int packets = 0; + unsigned nextTime; + int upp; /* micro-seconds per packet */ + int delta_sps = 0; /* compensation for sps from feedback msgs */ + int nfds = (serviceSocket > audioSocket ? serviceSocket : audioSocket) + 1; + int result; + /* + fprintf(stderr, "PLAY . . .\n"); + */ + { + PLAYaudioPara para; + result = CmdRead((char *)¶, sizeof(para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + para.sn = ntohl(para.sn); + para.nextSample = ntohl(para.nextSample); + para.samplesPerSecond = ntohl(para.samplesPerSecond); + para.samplesPerPacket = ntohl(para.samplesPerPacket); + para.ABsamples = ntohl(para.ABsamples); + para.spslimit = ntohl(para.spslimit); +#endif + nextsample = para.nextSample; + cmdsn = para.sn; + sps = para.samplesPerSecond; + spslimit = para.spslimit; + spp = para.samplesPerPacket; + addSamples = para.ABsamples / 2; + if (spp * audioPara.bytesPerSample > databuf_size) { + spp = databuf_size / audioPara.bytesPerSample; + } + /* + SFprintf(stderr, "AS got CmdPLAY: sps %d\n", sps); + */ + } + /* + fprintf(stderr, "AS: nextSampe=%d for PLAY.\n", para.nextSample); + */ + + upp = (int)(1000000.0 / ((double)sps / (double)spp)); + nextTime = get_usec(); + + CmdWrite((char *)&nextTime, sizeof(int)); + + if (live_source) { + StartPlayLiveAudio(); + } + + for (;;) + { + fd_set read_mask, write_mask; + struct timeval tval; + unsigned curTime = get_usec(); + + if (hasdata) { + if (addSamples < - spp) { /* slow down by not sending packets */ + nextTime += upp; + addSamples += spp; + } + else { + int need_sleep = 0; + while (nextTime <= curTime && hasdata) { + if (need_sleep) usleep(5000); + hasdata = SendPacket(); + need_sleep = 1; + packets ++; + nextTime += upp; + if (addSamples > 0 && packets % SPEEDUP_SCALE == 0) { + addSamples -= spp; + usleep(5000); + hasdata = SendPacket(); + packets ++; + } + } + } + } + curTime = nextTime - curTime; + if (curTime > 5000000) curTime = 5000000; /* limit on 5 second weit time + in case error happens */ + tval.tv_sec = curTime / 1000000; + tval.tv_usec = curTime % 1000000; + FD_ZERO(&read_mask); + FD_SET(serviceSocket, &read_mask); + FD_SET(audioSocket, &read_mask); +#ifdef _HPUX_SOURCE + if (select(nfds, (int *)&read_mask, NULL, NULL, hasdata ? &tval : NULL) == -1) +#else + if (select(nfds, &read_mask, NULL, NULL, hasdata ? &tval : NULL) == -1) +#endif + { + if (errno == EINTR) + continue; + ACE_OS::perror ("AS error on select reading or writing"); + return(-1); + } + if (FD_ISSET(serviceSocket, &read_mask)){ /* STOP, SPEED, or CLOSE*/ + unsigned char tmp; + result = CmdRead((char *)&tmp, 1); + if (result != 0) + return result; + switch (tmp) + { + case CmdSPEED: + { + SPEEDaudioPara para; + result = CmdRead((char *)¶, sizeof(para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + para.sn = ntohl(para.sn); + para.samplesPerSecond = ntohl(para.samplesPerSecond); + para.samplesPerPacket = ntohl(para.samplesPerPacket); + para.spslimit = ntohl(para.spslimit); +#endif + sps = para.samplesPerSecond; + spslimit = para.spslimit; + spp = para.samplesPerPacket; + if (spp * audioPara.bytesPerSample > databuf_size) { + spp = databuf_size / audioPara.bytesPerSample; + } + delta_sps = 0; /* reset compensation value */ + upp = (int)(1000000.0 / ((double)sps / (double)spp)); + /* + SFprintf(stderr, "AS got CmdSPEED: sps %d\n", sps); + */ + } + break; + case CmdSTOP: + { + int val; + cmd = tmp; + /* + fprintf(stderr, "AS: CmdSTOP. . .\n"); + */ + result = CmdRead((char *)&val, sizeof(int)); + if (result != 0) + return result; + /* + CmdWrite(AUDIO_STOP_PATTERN,ACE_OS::strlen (AUDIO_STOP_PATTERN)); + */ + if (live_source) { + StopPlayLiveAudio(); + } + return 0; /* return from PLAYaudio() */ + } + case CmdCLOSE: + if (live_source) { + StopPlayLiveAudio(); + } + return(1); /* The whole AS session terminates */ + default: + if (live_source) { + StopPlayLiveAudio(); + } + fprintf(stderr, "AS error: cmd=%d while expects STOP/SPEED/CLOSE.\n", tmp); + return(-1); + } + } + + if (FD_ISSET(audioSocket, &read_mask)){ /* Feedback packet */ + int bytes, len; + for (;;) { + if (conn_tag >= 0) { + len = wait_read_bytes(audioSocket, (char *)fbpara, sizeof(*fbpara)); + if (len == 0) return(1); /* connection broken */ + else if (len < 0) { /* unexpected error */ + ACE_OS::perror ("AS read1 FB"); + return(-1); + } + } + else { /* discard mode packet stream, read the whole packet */ + len = ACE_OS::read (audioSocket, (char *)fbpara, FBBUF_SIZE); + } + if (len == -1) { + if (errno == EINTR) continue; /* interrupt */ + else { + if (errno != EPIPE && errno != ECONNRESET)ACE_OS::perror ("AS failed to ACE_OS::read () fbmsg header"); + break; + } + } + break; + } + if (len < sizeof(*fbpara)) { + if (len > 0) fprintf(stderr, + "AS warn ACE_OS::read () len %dB < sizeof(*fbpara) %dB\n", + len, sizeof(*fbpara)); + continue; + } +#ifdef NeedByteOrderConversion + fbpara->type = ntohl(fbpara->type); +#endif + bytes = (fbpara->type > 0) ? + sizeof(APdescriptor) * (fbpara->type - 1) : + 0; + if (bytes > 0) { + if (conn_tag >= 0) { /* not discard mode packet stream, + read the rest of packet */ + len = wait_read_bytes(audioSocket, + ((char *)fbpara) + sizeof(*fbpara), + bytes); + if (len == 0) return(1); /* connection broken */ + else if (len < 0) { /* unexpected error */ + ACE_OS::perror ("AS read2 FB"); + return(-1); + } + len += sizeof(*fbpara); + } + } + bytes += sizeof(*fbpara); + if (len < bytes) { + if (len > 0) fprintf(stderr, + "AS only read partial FBpacket, %dB out of %dB.\n", + len, bytes); + continue; + } + if (live_source) { /* ignore all feedback messags for live source */ + continue; + } + +#ifdef NeedByteOrderConversion + fbpara->cmdsn = ntohl(fbpara->cmdsn); +#endif + if (len != sizeof(*fbpara) + + (fbpara->type ? (fbpara->type -1) * sizeof(APdescriptor) : 0)) { + /* unknown message, discard */ + SFprintf(stderr, "AS Unkown fb msg: len = %d, type = %d\n", + len, fbpara->type); + continue; + } + if (fbpara->cmdsn != cmdsn) { /* discard the outdated message */ + continue; + } +#ifdef NeedByteOrderConversion + { + int i, * ptr = (int *)fbpara + 2; + for (i = 0; i < (len >> 2) - 2; i++) *ptr = ntohl(*ptr); + } +#endif + if (fbpara->type == 0) { /* feedback message */ + /* + SFprintf(stderr, "AS got fbmsg: addsamples %d, addsps %d\n", + fbpara->data.fb.addSamples, fbpara->data.fb.addsps); + */ + addSamples += fbpara->data.fb.addSamples; + if (fbpara->data.fb.addsps) { + delta_sps += fbpara->data.fb.addsps; + upp = (int)(1000000.0 / ((double)(sps + delta_sps) / (double)spp)); + } + } + else { /* resend requests */ + APdescriptor * req = &(fbpara->data.ap); + int i; + /* + SFprintf(stderr, "AS got %d resend reqs\n", fbpara->type); + */ + for (i = 0; i < fbpara->type; i ++) { + ResendPacket(req->firstSample, req->samples); + req ++; + } + } + } + } +} +#endif + +// our version of play audio. +int +Audio_Global::play_audio(void) +{ + int result; + + ACE_DEBUG ((LM_DEBUG,"(%P|%t) play_audio () called \n")); + + { + PLAYaudioPara para; + result = CmdRead((char *)¶, sizeof(para)); + if (result != 0) + return result; +#ifdef NeedByteOrderConversion + para.sn = ntohl(para.sn); + para.nextSample = ntohl(para.nextSample); + para.samplesPerSecond = ntohl(para.samplesPerSecond); + para.samplesPerPacket = ntohl(para.samplesPerPacket); + para.ABsamples = ntohl(para.ABsamples); + para.spslimit = ntohl(para.spslimit); +#endif + nextsample = para.nextSample; + cmdsn = para.sn; + sps = para.samplesPerSecond; + spslimit = para.spslimit; + spp = para.samplesPerPacket; + addSamples = para.ABsamples / 2; + if (spp * audioPara.bytesPerSample > databuf_size) { + spp = databuf_size / audioPara.bytesPerSample; + } + /* + SFprintf(stderr, "AS got CmdPLAY: sps %d\n", sps); + */ + } + /* + fprintf(stderr, "AS: nextSampe=%d for PLAY.\n", para.nextSample); + */ + + upp = (int)(1000000.0 / ((double)sps / (double)spp)); + nextTime = get_usec(); + + CmdWrite((char *)&nextTime, sizeof(int)); + if (live_source) { + StartPlayLiveAudio(); + } + this->send_audio (); +} + +int +Audio_Global::send_audio (void) +{ + unsigned curTime = get_usec(); + + if (hasdata) { + if (addSamples < - spp) { /* slow down by not sending packets */ + /* ACE_DEBUG ((LM_DEBUG,"(%P|%t) slow down by not sending\n")); */ + nextTime += upp; + addSamples += spp; + } + else { + /* ACE_DEBUG ((LM_DEBUG,"(%P|%t) sending." + "nexttime = %d, curTime = %d, hasdata = %d\n", + nextTime, curTime, hasdata)); */ + int need_sleep = 0; + while ( (nextTime <= curTime) && (hasdata)) { + if (need_sleep) usleep(5000); + hasdata = SendPacket(); + need_sleep = 1; + packets ++; + nextTime += upp; + if (addSamples > 0 && packets % SPEEDUP_SCALE == 0) { + addSamples -= spp; + usleep(5000); + hasdata = SendPacket(); + packets ++; + } + } + } + } + curTime = nextTime - curTime; + if (curTime > 5000000) curTime = 5000000; /* limit on 5 second weit time + in case error happens */ + tval.tv_sec = curTime / 1000000; + tval.tv_usec = curTime % 1000000; + + if (hasdata) + { + // schedule a sigalrm to simulate select timeout. + ACE_Time_Value tv (tval); + ACE_OS::ualarm (tv,0); + } + return 0; +} + + +void +Audio_Global::on_exit_routine(void) +{ + struct sockaddr_in peeraddr_in; + int size = sizeof(peeraddr_in); + + /* + fprintf(stderr, "An AS session terminated\n"); + */ + if (ACE_OS::getpeername(serviceSocket, + (struct sockaddr *)&peeraddr_in, &size) == 0 && + peeraddr_in.sin_family == AF_INET) { + if (strncmp(inet_ntoa(peeraddr_in.sin_addr), "129.95.50", 9)) { + struct hostent *hp; + time_t val =ACE_OS::time (NULL); + char * buf = ACE_OS::ctime (&start_time); + + hp = ACE_OS::gethostbyaddr((char *)&(peeraddr_in.sin_addr), 4, AF_INET); + buf[strlen(buf)-1] = 0; + printf("%s: %s %3dm%02ds %dB %s\n", + buf, + hp == NULL ? inet_ntoa(peeraddr_in.sin_addr) : hp->h_name, + (val - start_time) / 60, (val - start_time) % 60, + bytes_sent, audioFile); + } + } + ComCloseConn(serviceSocket); + ComCloseConn(audioSocket); +} + +#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) +template class ACE_Singleton<Video_Global, TAO_SYNCH_MUTEX>; +template class ACE_Singleton<Audio_Global, TAO_SYNCH_MUTEX>; +#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) +#pragma instantiate ACE_Singleton<Video_Global, TAO_SYNCH_MUTEX> +#pragma instantiate ACE_Singleton<Audio_Global, TAO_SYNCH_MUTEX> +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ |