diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp | 767 |
1 files changed, 339 insertions, 428 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp index 2887981ee52..7cbccea9cc1 100644 --- a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp +++ b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp @@ -25,51 +25,62 @@ * Department of Computer Science and Engineering * email: scen@cse.ogi.edu */ +#include <stdio.h> +#include <errno.h> +#include <signal.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include <stdlib.h> +#include <netinet/in.h> +#include <X11/Xlib.h> +#include <X11/Xutil.h> +#include <X11/Intrinsic.h> +#ifdef __svr4__ +#include <stropts.h> +#include <sys/conf.h> +#endif -extern int asp[2]; -extern void set_exit_routine_tag(int tag); - -#include "ab.h" +#include "include/common.h" +#include "newproto.h" +#include "global.h" +#include "mpeg_shared/filters.h" +#include "mpeg_shared/fileio.h" +#include "mpeg_shared/com.h" ACE_RCSID(mpeg_client, ab, "$Id$") -AudioBuffer::AudioBuffer (void) - :temp (0), - bytes (-1), - abuf (0), - sid (-1), - exit_tag (0), - savedSocket (-1), - packet (0), - pkt_data (0), - conn_tag (-1), - fbstate (0), - waketime (0), - pcmdsn (-1), - mode_ (INVALID) -{ -} - -//destructor. -AudioBuffer::~AudioBuffer (void) -{ - if (ACE_Reactor::instance ()->remove_handler (this->handler_,ACE_Event_Handler::READ_MASK) == -1) - ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for Video_Notification_Handler\n")); - - delete this->handler_; - if (ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK) == -1) - ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for VideoBuffer\n")); - -} - -void -AudioBuffer::set_silence (char *buf, int samples) +/* magic number -- deviation is considered + caused by clock drift only if rate <= 1/MAX_CLOCK_DRIFT. + */ +#define MAX_CLOCK_DRIFT 50 + +#define max(a,b) ((a)>(b) ? (a) : (b)) +#define min(a,b) ((a)<(b) ? (a) : (b)) + +typedef struct { + int bufsize; /* number of bytes for the buffer pointed by 'buf' */ + char * buf; /* pointer to the data buffer area */ + int bps; /* current byte-per-sample */ + int size; /* number of samples the buffer can hold */ + int samples; /* number of samples in the buffer; */ + int stuff; /* number of stuff samples to be read by ABgetSamples() */ + int ts; /* tail-sample: the next sample to be comsumed by CTR */ + int hs; /* head-sample: the next sample to be expected from the network */ + int tind; /* index of the ts-sample in the buf */ +} ABBuffer; + +static ABBuffer * abuf; +static int sid; +static int exit_tag = 0; +static int savedSocket; + +static void set_silence(char *buf, int samples) { memset(buf, 0xff, samples * shared->audioPara.bytesPerSample); } -void -AudioBuffer::ABinitBuf (int size) /* size in bytes */ +void ABinitBuf(int size) /* size in bytes */ { abuf = (ABBuffer *)creat_shared_mem(size); abuf->bufsize = size - sizeof(*abuf); @@ -77,8 +88,7 @@ AudioBuffer::ABinitBuf (int size) /* size in bytes */ sid = creat_semaphore(); } -void -AudioBuffer::ABflushBuf (int nextSample) /* flush the whole buffer */ +void ABflushBuf(int nextSample) /* flush the whole buffer */ { enter_cs(sid); abuf->bps = shared->audioPara.bytesPerSample; @@ -91,14 +101,12 @@ AudioBuffer::ABflushBuf (int nextSample) /* flush the whole buffer */ leave_cs(sid); } -int -AudioBuffer::ABcheckSamples (void) /* returns # of samples in ABbuf */ +int ABcheckSamples(void) /* returns # of samples in ABbuf */ { return abuf->samples; } -int -AudioBuffer::ABgetSamples (char * buf, int samples) +int ABgetSamples(char * buf, int samples) /* read at most given number of samples from AB to buf, returns number of sample actually read */ { @@ -117,12 +125,12 @@ AudioBuffer::ABgetSamples (char * buf, int samples) /* there may be fewer samples in abuf */ if (as > 0) { int part1 = min(as, abuf->size - abuf->tind); - ACE_OS::memcpy (buf, abuf->buf + (abuf->bps * abuf->tind), part1 * abuf->bps); + memcpy(buf, abuf->buf + (abuf->bps * abuf->tind), part1 * abuf->bps); set_silence(abuf->buf + (abuf->bps * abuf->tind), part1); if (part1 < as) { /* This read cross the boundary of abuf */ - ACE_OS::memcpy (buf + (part1 * abuf->bps), - abuf->buf, - (as - part1) * abuf->bps); + memcpy(buf + (part1 * abuf->bps), + abuf->buf, + (as - part1) * abuf->bps); set_silence(abuf->buf, as - part1); } } @@ -137,8 +145,7 @@ AudioBuffer::ABgetSamples (char * buf, int samples) /* if samples < 0; then stuff |samples| silient samples to ABgetSamples(), otherwise wipe out this number of samples from AB */ -int -AudioBuffer::ABskipSamples (int samples) +int ABskipSamples(int samples) { enter_cs(sid); if (samples <= 0) { @@ -153,423 +160,327 @@ AudioBuffer::ABskipSamples (int samples) return samples; } -void -AudioBuffer::ABdeleteBuf (void) +void ABdeleteBuf(void) { remove_shared_mem((char *)abuf); } -void -AudioBuffer::ABdeleteSem (void) +void ABdeleteSem(void) { remove_semaphore(sid); } /* SIGUSR1 from CTR is for killing this process, without affecting any other ones. */ -void -AudioBuffer::exit_on_kill (void) +static void exit_on_kill(void) { ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess killed \n")); extern void set_exit_routine_tag(int tag); set_exit_routine_tag(0); // ComCloseConn(savedSocket); - vbuffer->VBdeleteBuf(); - ACE_OS::exit (0); + VBdeleteBuf(); + exit(0); } -ACE_HANDLE -AudioBuffer::get_handle (void) const +static void usr1_handler(int sig) { - return this->dataSocket; + cerr << "ABprocess got sigusr1\n"; + exit_on_kill (); } -int -AudioBuffer::handle_input (ACE_HANDLE fd) + +static void usr2_handler(int sig) { - // ACE_DEBUG ((LM_DEBUG,"handle_input:mode = %d\n",this->mode_)); - int len; - switch (this->mode_) - { - case READ_HEADER: - { - int len; - if (conn_tag >= 0) - { - // ACE_DEBUG ((LM_DEBUG,"non discard mode: ")); - if (bytes < 0) - bytes = sizeof(*packet); - len = ACE_OS::read (dataSocket, (char *)temp, bytes); - } - else - { /* discard mode packet stream, read all bytes */ - // ACE_DEBUG ((LM_DEBUG,"discard mode: ")); - if (bytes < 0) - bytes = PACKET_SIZE; - len = ACE_OS::read (dataSocket, (char *)packet, bytes); - // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: got a %d sized packet\n",len)); - } - if (len == -1) - { - if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) - { - return 0; - } - perror("AB ACE_OS::read () audio packet from discard-mode packet stream"); - ACE_Reactor::instance ()->end_event_loop (); - return -1; - } + Fprintf(stderr, "VB void usr2_handler (supposed for stat).\n"); +} + +#define PACKET_SIZE 8192 +#define STARTUP_WAIT 10000000 +#define ACTION_WAIT 5000000 + +void ABprocess(int dataSocket) +{ + + AudioPacket * packet; + char * pkt_data; + int conn_tag = shared->audioMaxPktSize; + + /* following are for feedback */ + int fbstate = 0; + unsigned waketime; + int pcmdsn = -1; /* previous cmdsn */ + + exit_tag = 0; + + savedSocket = dataSocket; + + setsignal(SIGUSR1, usr1_handler); + setsignal(SIGUSR2, usr2_handler); + + packet = (AudioPacket *)malloc(PACKET_SIZE); + if (packet == NULL) { + perror("AB failed to allocate mem for packet buffer"); + exit(1); + } + pkt_data = (char *)packet + sizeof(*packet); - if (len == 0) - { - fprintf(stderr, "Error: AB found dataSocket broken\n"); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - // ACE_DEBUG ((LM_DEBUG,"packet: bytes = %d,len = %d\n",bytes,len)); - if (conn_tag >= 0) - { - temp += len; - bytes -= len; - if (bytes == 0) - { - // header reading is done. - this->mode_ = READ_DATA; - bytes = -1; - len = sizeof (*packet); - } - else - return 0; - } - if (len < sizeof (*packet)) - { - fprintf(stderr, "Warn: AB discard len = %d bytes of supposed header.\n", len); - return 0; - } - // process the header. + for (;;) { + int len; + int bytes; + if (conn_tag >= 0) { + bytes = sizeof(*packet); + + + len = wait_read_bytes(dataSocket, (char *)packet, bytes); + if (exit_tag) exit_on_kill(); + } + else { /* discard mode packet stream, read all bytes */ + bytes = PACKET_SIZE; + len = read(dataSocket, (char *)packet, bytes); + // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: got a %d sized packet\n",len)); + if (exit_tag) exit_on_kill(); + if (len == -1) { + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { + usleep(10000); + if (exit_tag) exit_on_kill(); + continue; + } + perror("AB read() audio packet from discard-mode packet stream"); + exit(1); + } + } + if (len == 0) { + fprintf(stderr, "Error: AB found dataSocket broken\n"); + exit(1); + } + if (len < sizeof(*packet)) { /* unknown packet */ + fprintf(stderr, "Warn: AB discard len = %d bytes of supposed header.\n", len); + continue; + } #ifdef NeedByteOrderConversion - packet->dataBytes = ntohl(packet->dataBytes); + packet->dataBytes = ntohl(packet->dataBytes); #endif - // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: Received %d sized packet\n",len)); - if (packet->dataBytes <= 0) - { - fprintf(stderr, "AB Error: pkt->dataBytes %d, len %d\n", - packet->dataBytes, - len); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - bytes = packet->dataBytes + sizeof(*packet); + // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: Received %d sized packet\n",len)); + if (packet->dataBytes <= 0) { + fprintf(stderr, "AB Error: pkt->dataBytes %d, len %d\n", + packet->dataBytes, + len); + exit(1); + } + bytes = packet->dataBytes + sizeof(*packet); - if (bytes > PACKET_SIZE) - { - Fprintf(stderr, "Fatal error: AB packet buf (%dB) too small (%d)\n", - PACKET_SIZE, bytes); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - if (conn_tag > 0) - { - temp = (char *)packet + sizeof(*packet); - bytes = bytes - sizeof(*packet); - ACE_DEBUG ((LM_DEBUG,"(%P) Ready to read the data part of the packet\n")); - break; - } + if (bytes > PACKET_SIZE) { + Fprintf(stderr, "Fatal error: AB packet buf (%dB) too small (%d)\n", + PACKET_SIZE, bytes); + exit(1); + } + if (conn_tag >= 0) { + len = wait_read_bytes(dataSocket, (char *)packet + sizeof(*packet), + bytes - sizeof(*packet)); + if (len <= 0) { + if (len == -1) perror("AB encounter error on wait_read_bytes()"); + else fprintf(stderr, "AB encounter EOF on wait_read_bytes()\n"); } - // fall through and process the header. - case READ_DATA: - { - // code to read the audio packet and buffer it. - if (conn_tag >= 0) - { - len = ACE_OS::read (dataSocket,temp,bytes); - - if (len <= 0) - { - if (len == -1)ACE_OS::perror ("AB encounter error on wait_read_bytes()"); - else fprintf(stderr, "AB encounter EOF on wait_read_bytes()\n"); - } - temp +=len; - bytes -= len; - if (bytes != 0) - return 0; - } - // set the parameters for the header reading. - this->mode_ = READ_HEADER; - bytes = -1; - temp = (char *)packet; + } #ifdef NeedByteOrderConversion - packet->cmdsn = ntohl(packet->cmdsn); - packet->samplesPerSecond = ntohl(packet->samplesPerSecond); - packet->resend = ntohl(packet->resend); - packet->firstSample = ntohl(packet->firstSample); - packet->samples = ntohl(packet->samples); - packet->actualSamples = ntohl(packet->actualSamples); - /* dataBytes already byte-reordered */ + packet->cmdsn = ntohl(packet->cmdsn); + packet->samplesPerSecond = ntohl(packet->samplesPerSecond); + packet->resend = ntohl(packet->resend); + packet->firstSample = ntohl(packet->firstSample); + packet->samples = ntohl(packet->samples); + packet->actualSamples = ntohl(packet->actualSamples); + /* dataBytes already byte-reordered */ #endif - /* - Fprintf(stderr, "AB got a packet: %d(%d)\n", - packet->firstSample, packet->samples); - */ - /* - if (packet->firstSample % 10240 && !packet->resend) continue; - */ - if (packet->samples * abuf->bps > PACKET_SIZE - sizeof(*packet)) { - fprintf(stderr, "Fatal error: AB has too small packet buffer, %d out of %d\n", - PACKET_SIZE, packet->samples * abuf->bps + sizeof(*packet)); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - - if (packet->cmdsn != shared->cmdsn) { /* outdated packet */ - /* - Fprintf(stderr, "AB discarded an outdated packet\n"); - */ - return 0; - } - enter_cs(sid); - if (packet->firstSample + packet->samples <= abuf->ts) - { - /* all samples too late, discard it */ - abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); - abuf->samples = abuf->hs - abuf->ts; - leave_cs(sid); - /* - Fprintf(stderr, "AB all sample in packet %d(%d) too late\n", - packet->firstSample, packet->samples); - */ - feedback (); - } - else if (packet->firstSample >= abuf->ts + abuf->size) { - /* all samples too early, discard the packet */ - abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); - abuf->samples = abuf->hs - abuf->ts; - leave_cs(sid); - /* - Fprintf(stderr, "AB all sample in packet %d(%d) too early\n", - packet->firstSample, packet->samples); - */ - feedback (); - } - else if (packet->samples > packet->actualSamples) { - leave_cs(sid); - fprintf(stderr, "Error: AB interpolation not available yet.\n"); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - else - { - int oldhs = abuf->hs; - int firstSample = max(packet->firstSample, abuf->ts); - int samples = min(packet->samples - - (firstSample - packet->firstSample), - (abuf->ts + abuf->size) - packet->firstSample); - char * data = pkt_data + - (firstSample - packet->firstSample) * abuf->bps; - int dstart = (abuf->tind + (firstSample - abuf->ts)) % abuf->size; - int part1 = min(samples, abuf->size - dstart); - ACE_OS::memcpy (abuf->buf + (dstart * abuf->bps), data, part1 * abuf->bps); - if (part1 < samples) { - memcpy(abuf->buf, data + part1 * abuf->bps, - (samples - part1) * abuf->bps); - } - abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); - abuf->samples = abuf->hs - abuf->ts; - dstart =max(oldhs, abuf->ts); - - leave_cs(sid); - - part1 = firstSample - dstart; - if (packet->resend) { - Fprintf(stderr, "AB got resent %d(%d)\n", - packet->firstSample, packet->samples); - } - else if (part1 > 0) { - int res; - AudioFeedBackPara para; - Fprintf(stderr, "AB found gap %d(%d)\n", dstart, part1); - para.cmdsn = htonl(shared->cmdsn); - para.type = htonl(1); - para.data.ap.firstSample = htonl(dstart); - para.data.ap.samples = htonl(part1); - // register ourself for the write handler. - int result; - result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK); - if (result != 0) - return result; - this->mode_ = WRITE_FEEDBACK2; - temp = (char *)¶ - bytes = sizeof (para); - } - } - } - break; - default: - break; + /* + Fprintf(stderr, "AB got a packet: %d(%d)\n", + packet->firstSample, packet->samples); + */ + /* + if (packet->firstSample % 10240 && !packet->resend) continue; + */ + if (packet->samples * abuf->bps > PACKET_SIZE - sizeof(*packet)) { + fprintf(stderr, "Fatal error: AB has too small packet buffer, %d out of %d\n", + PACKET_SIZE, packet->samples * abuf->bps + sizeof(*packet)); + exit(1); } - return 0; -} -int -AudioBuffer::handle_output (ACE_HANDLE fd) -{ - ACE_DEBUG ((LM_DEBUG,"handle_output:mode = %d\n",this->mode_)); - int res; - if ((this->mode_ == WRITE_FEEDBACK1) || (this->mode_ == WRITE_FEEDBACK2)) - { - // send feedback. - - if (conn_tag != 0) - { /* packet stream */ - res = ACE_OS::write (dataSocket, temp, bytes); - if (res == -1) - { - if (errno == ENOBUFS) { - ACE_OS::perror ("AB Warning, resend-req packet discarded for"); - return 0; - } - ACE_OS::perror ("AB error, resend-req packet sending failed"); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - } - else - { - res = ACE_OS::write (dataSocket, temp, bytes); - if (res == -1) { - ACE_OS::perror ("AB error, resend-req packet sending failed"); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - } - if (res == 0) - { - ACE_DEBUG ((LM_DEBUG,"(%P|%t)AudioBuffer::handle_output:write failed\n")); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - temp += res; - bytes -= res; - if (bytes != 0) - return 0; - else - { - // remove the write handler. - int result; - result = ACE_Reactor::instance ()->remove_handler (this, - ACE_Event_Handler::WRITE_MASK); - if (result != 0) - return result; - - this->mode_ = READ_HEADER; - temp = (char *)packet; - bytes == -1; - } + if (packet->cmdsn != shared->cmdsn) { /* outdated packet */ + /* + Fprintf(stderr, "AB discarded an outdated packet\n"); + */ + continue; + } + enter_cs(sid); + if (packet->firstSample + packet->samples <= abuf->ts) { + /* all samples too late, discard it */ + abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); + abuf->samples = abuf->hs - abuf->ts; + leave_cs(sid); + /* + Fprintf(stderr, "AB all sample in packet %d(%d) too late\n", + packet->firstSample, packet->samples); + */ + goto feedback_code; + } + if (packet->firstSample >= abuf->ts + abuf->size) { + /* all samples too early, discard the packet */ + abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); + abuf->samples = abuf->hs - abuf->ts; + leave_cs(sid); + /* + Fprintf(stderr, "AB all sample in packet %d(%d) too early\n", + packet->firstSample, packet->samples); + */ + goto feedback_code; } - if (this->mode_ == WRITE_FEEDBACK1) + + if (packet->samples > packet->actualSamples) { + leave_cs(sid); + fprintf(stderr, "Error: AB interpolation not available yet.\n"); + exit(1); + } + { - // adjust the wakeup time and feedback state. - waketime = get_usec() + STARTUP_WAIT; - fbstate = 1; + int oldhs = abuf->hs; + int firstSample = max(packet->firstSample, abuf->ts); + int samples = min(packet->samples - + (firstSample - packet->firstSample), + (abuf->ts + abuf->size) - packet->firstSample); + char * data = pkt_data + + (firstSample - packet->firstSample) * abuf->bps; + int dstart = (abuf->tind + (firstSample - abuf->ts)) % abuf->size; + int part1 = min(samples, abuf->size - dstart); + memcpy(abuf->buf + (dstart * abuf->bps), data, part1 * abuf->bps); + if (part1 < samples) { + memcpy(abuf->buf, data + part1 * abuf->bps, + (samples - part1) * abuf->bps); + } + abuf->hs = max(abuf->hs, packet->firstSample + packet->samples); + abuf->samples = abuf->hs - abuf->ts; + dstart =max(oldhs, abuf->ts); + + leave_cs(sid); + + part1 = firstSample - dstart; + if (packet->resend) { + Fprintf(stderr, "AB got resent %d(%d)\n", + packet->firstSample, packet->samples); + } + else if (part1 > 0) { + int res; + AudioFeedBackPara para; + Fprintf(stderr, "AB found gap %d(%d)\n", dstart, part1); + para.cmdsn = htonl(shared->cmdsn); + para.type = htonl(1); + para.data.ap.firstSample = htonl(dstart); + para.data.ap.samples = htonl(part1); + if (conn_tag != 0) { /* packet stream */ + while ((res = write(dataSocket, (char *)¶, sizeof(para))) == -1) + { + if (errno == EINTR) + continue; + if (errno == ENOBUFS) { + perror("AB Warning, resend-req packet discarded for"); + break; + } + perror("AB error, resend-req packet sending failed"); + exit(1); + } + } + else { + res = wait_write_bytes(dataSocket, (char *)¶, sizeof(para)); + if (res == -1) { + perror("AB error, resend-req packet sending failed"); + exit(1); + } + } + if (res < sizeof(para)) { + fprintf(stderr, "AB warn: send() for gap res %dB < sizeof(para) %dB\n", + res, sizeof(para)); + } + } } - return 0; -} - -int -AudioBuffer::ABprocess (int socket) -{ - int result; - ACE_DEBUG ((LM_DEBUG,"(%P|%t)AudioBuffer::ABprocess ()\n")); - packet = (AudioPacket *)ACE_OS::malloc(PACKET_SIZE); - if (packet == NULL) { - ACE_OS::perror ("AB failed to allocate mem for packet buffer"); - ACE_Reactor::instance ()->end_event_loop (); return -1; - } - - this->dataSocket = socket; - ACE_NEW_RETURN (this->handler_, - Audio_Notification_Handler, - -1); - // Register the notification handler with the reactor. - result = ACE_Reactor::instance ()->register_handler (this->handler_, - ACE_Event_Handler::READ_MASK); - if (result != 0) - return result; - - result = ACE_Reactor::instance ()->register_handler (this, - ACE_Event_Handler::READ_MASK); - - if (result != 0) - return result; - - conn_tag = shared->audioMaxPktSize; - exit_tag = 0; - - this->mode_ = READ_HEADER; - pkt_data = (char *)packet + sizeof(*packet); - temp = (char *)packet; - bytes = -1; - return 0; -} - -// following is feedback algorithm. -void -AudioBuffer::feedback (void) -{ - if (shared->live || (!shared->config.syncEffective)) return; - /* + + /* following is feedback algorithm */ + + feedback_code: + + if (shared->live || (!shared->config.syncEffective)) continue; + /* fprintf(stderr, "AB fbstate = %d\n", fbstate); */ - switch (fbstate) { - case 0: /* startup init */ - if (pcmdsn != packet->cmdsn) { - fbstate = 0; - pcmdsn = packet->cmdsn; - } - else { - fbstate = 1; - waketime = get_usec() + STARTUP_WAIT; - } - break; - case 1: /* startup wait */ - if (pcmdsn != packet->cmdsn) { - fbstate = 0; + switch (fbstate) { + case 0: /* startup init */ + if (pcmdsn != packet->cmdsn) { + fbstate = 0; + pcmdsn = packet->cmdsn; + } + else { + fbstate = 1; + waketime = get_usec() + STARTUP_WAIT; + } break; - } - if (get_usec() >= waketime) { - fbstate = 2; - } - break; - case 2: /* monitoring */ - if (pcmdsn != packet->cmdsn) { - fbstate = 0; + case 1: /* startup wait */ + if (pcmdsn != packet->cmdsn) { + fbstate = 0; + break; + } + if (get_usec() >= waketime) { + fbstate = 2; + } break; - } - if (abuf->samples < abuf->size >>2 || - abuf->samples > (abuf->size * 3) >> 2) - { - /* feedback action needed */ - AudioFeedBackPara para; - int res; - para.data.fb.addsps = 0; - para.data.fb.addSamples = (abuf->size >> 2) - abuf->samples; + case 2: /* monitoring */ + if (pcmdsn != packet->cmdsn) { + fbstate = 0; + break; + } + if (abuf->samples < abuf->size >>2 || + abuf->samples > (abuf->size * 3) >> 2) { + /* feedback action needed */ + AudioFeedBackPara para; + int res; + para.data.fb.addsps = 0; + para.data.fb.addSamples = (abuf->size >> 2) - abuf->samples; + + Fprintf(stderr, "AB sends fb: %dsps, %dsamples\n", + para.data.fb.addsps, + para.data.fb.addSamples); + para.cmdsn = htonl(shared->cmdsn); + para.type = htonl(0); + para.data.fb.addsps = htonl(para.data.fb.addsps); + para.data.fb.addSamples = htonl(para.data.fb.addSamples); + if (conn_tag != 0) { + while ((res = write(dataSocket, (char *)¶, sizeof(para))) == -1) + { + if (errno == EINTR) + continue; + if (errno == ENOBUFS) { + perror("AB Warning, fb packet discarded for"); + break; + } + perror("AB error, fb packet sending failed"); + exit(1); + } + } + else { + res = wait_write_bytes(dataSocket, (char *)¶, sizeof(para)); + if (res == -1) { + perror("AB error, fb packet sending failed"); + exit(1); + } + } + if (res < sizeof(para)) { + fprintf(stderr, "AB warn: send() for sync res %dB < sizeof(para) %dB\n", + res, sizeof(para)); + } - Fprintf(stderr, "AB sends fb: %dsps, %dsamples\n", - para.data.fb.addsps, - para.data.fb.addSamples); - para.cmdsn = htonl(shared->cmdsn); - para.type = htonl(0); - para.data.fb.addsps = htonl(para.data.fb.addsps); - para.data.fb.addSamples = htonl(para.data.fb.addSamples); - int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK); - if (result != 0) - ACE_DEBUG ((LM_DEBUG,"register_hanlder for write failed\n")); - this->mode_ = WRITE_FEEDBACK1; - temp = (char *)¶ - bytes = sizeof (para); - return; + waketime = get_usec() + STARTUP_WAIT; + fbstate = 1; } - break; - default: - break; + break; + default: + break; + } } -} - -ACE_HANDLE -Audio_Notification_Handler::get_handle (void) const -{ - return asp[1]; -} +} |