summaryrefslogtreecommitdiff
path: root/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp')
-rw-r--r--TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp767
1 files changed, 339 insertions, 428 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp
index 2887981ee52..7cbccea9cc1 100644
--- a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp
+++ b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ab.cpp
@@ -25,51 +25,62 @@
* Department of Computer Science and Engineering
* email: scen@cse.ogi.edu
*/
+#include <stdio.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <netinet/in.h>
+#include <X11/Xlib.h>
+#include <X11/Xutil.h>
+#include <X11/Intrinsic.h>
+#ifdef __svr4__
+#include <stropts.h>
+#include <sys/conf.h>
+#endif
-extern int asp[2];
-extern void set_exit_routine_tag(int tag);
-
-#include "ab.h"
+#include "include/common.h"
+#include "newproto.h"
+#include "global.h"
+#include "mpeg_shared/filters.h"
+#include "mpeg_shared/fileio.h"
+#include "mpeg_shared/com.h"
ACE_RCSID(mpeg_client, ab, "$Id$")
-AudioBuffer::AudioBuffer (void)
- :temp (0),
- bytes (-1),
- abuf (0),
- sid (-1),
- exit_tag (0),
- savedSocket (-1),
- packet (0),
- pkt_data (0),
- conn_tag (-1),
- fbstate (0),
- waketime (0),
- pcmdsn (-1),
- mode_ (INVALID)
-{
-}
-
-//destructor.
-AudioBuffer::~AudioBuffer (void)
-{
- if (ACE_Reactor::instance ()->remove_handler (this->handler_,ACE_Event_Handler::READ_MASK) == -1)
- ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for Video_Notification_Handler\n"));
-
- delete this->handler_;
- if (ACE_Reactor::instance ()->remove_handler (this,ACE_Event_Handler::READ_MASK) == -1)
- ACE_DEBUG ((LM_ERROR,"(%P)remove handler failed for VideoBuffer\n"));
-
-}
-
-void
-AudioBuffer::set_silence (char *buf, int samples)
+/* magic number -- deviation is considered
+ caused by clock drift only if rate <= 1/MAX_CLOCK_DRIFT.
+ */
+#define MAX_CLOCK_DRIFT 50
+
+#define max(a,b) ((a)>(b) ? (a) : (b))
+#define min(a,b) ((a)<(b) ? (a) : (b))
+
+typedef struct {
+ int bufsize; /* number of bytes for the buffer pointed by 'buf' */
+ char * buf; /* pointer to the data buffer area */
+ int bps; /* current byte-per-sample */
+ int size; /* number of samples the buffer can hold */
+ int samples; /* number of samples in the buffer; */
+ int stuff; /* number of stuff samples to be read by ABgetSamples() */
+ int ts; /* tail-sample: the next sample to be comsumed by CTR */
+ int hs; /* head-sample: the next sample to be expected from the network */
+ int tind; /* index of the ts-sample in the buf */
+} ABBuffer;
+
+static ABBuffer * abuf;
+static int sid;
+static int exit_tag = 0;
+static int savedSocket;
+
+static void set_silence(char *buf, int samples)
{
memset(buf, 0xff, samples * shared->audioPara.bytesPerSample);
}
-void
-AudioBuffer::ABinitBuf (int size) /* size in bytes */
+void ABinitBuf(int size) /* size in bytes */
{
abuf = (ABBuffer *)creat_shared_mem(size);
abuf->bufsize = size - sizeof(*abuf);
@@ -77,8 +88,7 @@ AudioBuffer::ABinitBuf (int size) /* size in bytes */
sid = creat_semaphore();
}
-void
-AudioBuffer::ABflushBuf (int nextSample) /* flush the whole buffer */
+void ABflushBuf(int nextSample) /* flush the whole buffer */
{
enter_cs(sid);
abuf->bps = shared->audioPara.bytesPerSample;
@@ -91,14 +101,12 @@ AudioBuffer::ABflushBuf (int nextSample) /* flush the whole buffer */
leave_cs(sid);
}
-int
-AudioBuffer::ABcheckSamples (void) /* returns # of samples in ABbuf */
+int ABcheckSamples(void) /* returns # of samples in ABbuf */
{
return abuf->samples;
}
-int
-AudioBuffer::ABgetSamples (char * buf, int samples)
+int ABgetSamples(char * buf, int samples)
/* read at most given number of samples from AB to buf, returns
number of sample actually read */
{
@@ -117,12 +125,12 @@ AudioBuffer::ABgetSamples (char * buf, int samples)
/* there may be fewer samples in abuf */
if (as > 0) {
int part1 = min(as, abuf->size - abuf->tind);
- ACE_OS::memcpy (buf, abuf->buf + (abuf->bps * abuf->tind), part1 * abuf->bps);
+ memcpy(buf, abuf->buf + (abuf->bps * abuf->tind), part1 * abuf->bps);
set_silence(abuf->buf + (abuf->bps * abuf->tind), part1);
if (part1 < as) { /* This read cross the boundary of abuf */
- ACE_OS::memcpy (buf + (part1 * abuf->bps),
- abuf->buf,
- (as - part1) * abuf->bps);
+ memcpy(buf + (part1 * abuf->bps),
+ abuf->buf,
+ (as - part1) * abuf->bps);
set_silence(abuf->buf, as - part1);
}
}
@@ -137,8 +145,7 @@ AudioBuffer::ABgetSamples (char * buf, int samples)
/* if samples < 0; then stuff |samples| silient samples to ABgetSamples(),
otherwise wipe out this number of samples from AB */
-int
-AudioBuffer::ABskipSamples (int samples)
+int ABskipSamples(int samples)
{
enter_cs(sid);
if (samples <= 0) {
@@ -153,423 +160,327 @@ AudioBuffer::ABskipSamples (int samples)
return samples;
}
-void
-AudioBuffer::ABdeleteBuf (void)
+void ABdeleteBuf(void)
{
remove_shared_mem((char *)abuf);
}
-void
-AudioBuffer::ABdeleteSem (void)
+void ABdeleteSem(void)
{
remove_semaphore(sid);
}
/* SIGUSR1 from CTR is for killing this process, without affecting any other ones. */
-void
-AudioBuffer::exit_on_kill (void)
+static void exit_on_kill(void)
{
ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess killed \n"));
extern void set_exit_routine_tag(int tag);
set_exit_routine_tag(0);
// ComCloseConn(savedSocket);
- vbuffer->VBdeleteBuf();
- ACE_OS::exit (0);
+ VBdeleteBuf();
+ exit(0);
}
-ACE_HANDLE
-AudioBuffer::get_handle (void) const
+static void usr1_handler(int sig)
{
- return this->dataSocket;
+ cerr << "ABprocess got sigusr1\n";
+ exit_on_kill ();
}
-int
-AudioBuffer::handle_input (ACE_HANDLE fd)
+
+static void usr2_handler(int sig)
{
- // ACE_DEBUG ((LM_DEBUG,"handle_input:mode = %d\n",this->mode_));
- int len;
- switch (this->mode_)
- {
- case READ_HEADER:
- {
- int len;
- if (conn_tag >= 0)
- {
- // ACE_DEBUG ((LM_DEBUG,"non discard mode: "));
- if (bytes < 0)
- bytes = sizeof(*packet);
- len = ACE_OS::read (dataSocket, (char *)temp, bytes);
- }
- else
- { /* discard mode packet stream, read all bytes */
- // ACE_DEBUG ((LM_DEBUG,"discard mode: "));
- if (bytes < 0)
- bytes = PACKET_SIZE;
- len = ACE_OS::read (dataSocket, (char *)packet, bytes);
- // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: got a %d sized packet\n",len));
- }
- if (len == -1)
- {
- if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
- {
- return 0;
- }
- perror("AB ACE_OS::read () audio packet from discard-mode packet stream");
- ACE_Reactor::instance ()->end_event_loop ();
- return -1;
- }
+ Fprintf(stderr, "VB void usr2_handler (supposed for stat).\n");
+}
+
+#define PACKET_SIZE 8192
+#define STARTUP_WAIT 10000000
+#define ACTION_WAIT 5000000
+
+void ABprocess(int dataSocket)
+{
+
+ AudioPacket * packet;
+ char * pkt_data;
+ int conn_tag = shared->audioMaxPktSize;
+
+ /* following are for feedback */
+ int fbstate = 0;
+ unsigned waketime;
+ int pcmdsn = -1; /* previous cmdsn */
+
+ exit_tag = 0;
+
+ savedSocket = dataSocket;
+
+ setsignal(SIGUSR1, usr1_handler);
+ setsignal(SIGUSR2, usr2_handler);
+
+ packet = (AudioPacket *)malloc(PACKET_SIZE);
+ if (packet == NULL) {
+ perror("AB failed to allocate mem for packet buffer");
+ exit(1);
+ }
+ pkt_data = (char *)packet + sizeof(*packet);
- if (len == 0)
- {
- fprintf(stderr, "Error: AB found dataSocket broken\n");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- // ACE_DEBUG ((LM_DEBUG,"packet: bytes = %d,len = %d\n",bytes,len));
- if (conn_tag >= 0)
- {
- temp += len;
- bytes -= len;
- if (bytes == 0)
- {
- // header reading is done.
- this->mode_ = READ_DATA;
- bytes = -1;
- len = sizeof (*packet);
- }
- else
- return 0;
- }
- if (len < sizeof (*packet))
- {
- fprintf(stderr, "Warn: AB discard len = %d bytes of supposed header.\n", len);
- return 0;
- }
- // process the header.
+ for (;;) {
+ int len;
+ int bytes;
+ if (conn_tag >= 0) {
+ bytes = sizeof(*packet);
+
+
+ len = wait_read_bytes(dataSocket, (char *)packet, bytes);
+ if (exit_tag) exit_on_kill();
+ }
+ else { /* discard mode packet stream, read all bytes */
+ bytes = PACKET_SIZE;
+ len = read(dataSocket, (char *)packet, bytes);
+ // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: got a %d sized packet\n",len));
+ if (exit_tag) exit_on_kill();
+ if (len == -1) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ usleep(10000);
+ if (exit_tag) exit_on_kill();
+ continue;
+ }
+ perror("AB read() audio packet from discard-mode packet stream");
+ exit(1);
+ }
+ }
+ if (len == 0) {
+ fprintf(stderr, "Error: AB found dataSocket broken\n");
+ exit(1);
+ }
+ if (len < sizeof(*packet)) { /* unknown packet */
+ fprintf(stderr, "Warn: AB discard len = %d bytes of supposed header.\n", len);
+ continue;
+ }
#ifdef NeedByteOrderConversion
- packet->dataBytes = ntohl(packet->dataBytes);
+ packet->dataBytes = ntohl(packet->dataBytes);
#endif
- // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: Received %d sized packet\n",len));
- if (packet->dataBytes <= 0)
- {
- fprintf(stderr, "AB Error: pkt->dataBytes %d, len %d\n",
- packet->dataBytes,
- len);
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- bytes = packet->dataBytes + sizeof(*packet);
+ // ACE_DEBUG ((LM_DEBUG,"(%P|%t) ABprocess: Received %d sized packet\n",len));
+ if (packet->dataBytes <= 0) {
+ fprintf(stderr, "AB Error: pkt->dataBytes %d, len %d\n",
+ packet->dataBytes,
+ len);
+ exit(1);
+ }
+ bytes = packet->dataBytes + sizeof(*packet);
- if (bytes > PACKET_SIZE)
- {
- Fprintf(stderr, "Fatal error: AB packet buf (%dB) too small (%d)\n",
- PACKET_SIZE, bytes);
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- if (conn_tag > 0)
- {
- temp = (char *)packet + sizeof(*packet);
- bytes = bytes - sizeof(*packet);
- ACE_DEBUG ((LM_DEBUG,"(%P) Ready to read the data part of the packet\n"));
- break;
- }
+ if (bytes > PACKET_SIZE) {
+ Fprintf(stderr, "Fatal error: AB packet buf (%dB) too small (%d)\n",
+ PACKET_SIZE, bytes);
+ exit(1);
+ }
+ if (conn_tag >= 0) {
+ len = wait_read_bytes(dataSocket, (char *)packet + sizeof(*packet),
+ bytes - sizeof(*packet));
+ if (len <= 0) {
+ if (len == -1) perror("AB encounter error on wait_read_bytes()");
+ else fprintf(stderr, "AB encounter EOF on wait_read_bytes()\n");
}
- // fall through and process the header.
- case READ_DATA:
- {
- // code to read the audio packet and buffer it.
- if (conn_tag >= 0)
- {
- len = ACE_OS::read (dataSocket,temp,bytes);
-
- if (len <= 0)
- {
- if (len == -1)ACE_OS::perror ("AB encounter error on wait_read_bytes()");
- else fprintf(stderr, "AB encounter EOF on wait_read_bytes()\n");
- }
- temp +=len;
- bytes -= len;
- if (bytes != 0)
- return 0;
- }
- // set the parameters for the header reading.
- this->mode_ = READ_HEADER;
- bytes = -1;
- temp = (char *)packet;
+ }
#ifdef NeedByteOrderConversion
- packet->cmdsn = ntohl(packet->cmdsn);
- packet->samplesPerSecond = ntohl(packet->samplesPerSecond);
- packet->resend = ntohl(packet->resend);
- packet->firstSample = ntohl(packet->firstSample);
- packet->samples = ntohl(packet->samples);
- packet->actualSamples = ntohl(packet->actualSamples);
- /* dataBytes already byte-reordered */
+ packet->cmdsn = ntohl(packet->cmdsn);
+ packet->samplesPerSecond = ntohl(packet->samplesPerSecond);
+ packet->resend = ntohl(packet->resend);
+ packet->firstSample = ntohl(packet->firstSample);
+ packet->samples = ntohl(packet->samples);
+ packet->actualSamples = ntohl(packet->actualSamples);
+ /* dataBytes already byte-reordered */
#endif
- /*
- Fprintf(stderr, "AB got a packet: %d(%d)\n",
- packet->firstSample, packet->samples);
- */
- /*
- if (packet->firstSample % 10240 && !packet->resend) continue;
- */
- if (packet->samples * abuf->bps > PACKET_SIZE - sizeof(*packet)) {
- fprintf(stderr, "Fatal error: AB has too small packet buffer, %d out of %d\n",
- PACKET_SIZE, packet->samples * abuf->bps + sizeof(*packet));
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
-
- if (packet->cmdsn != shared->cmdsn) { /* outdated packet */
- /*
- Fprintf(stderr, "AB discarded an outdated packet\n");
- */
- return 0;
- }
- enter_cs(sid);
- if (packet->firstSample + packet->samples <= abuf->ts)
- {
- /* all samples too late, discard it */
- abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
- abuf->samples = abuf->hs - abuf->ts;
- leave_cs(sid);
- /*
- Fprintf(stderr, "AB all sample in packet %d(%d) too late\n",
- packet->firstSample, packet->samples);
- */
- feedback ();
- }
- else if (packet->firstSample >= abuf->ts + abuf->size) {
- /* all samples too early, discard the packet */
- abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
- abuf->samples = abuf->hs - abuf->ts;
- leave_cs(sid);
- /*
- Fprintf(stderr, "AB all sample in packet %d(%d) too early\n",
- packet->firstSample, packet->samples);
- */
- feedback ();
- }
- else if (packet->samples > packet->actualSamples) {
- leave_cs(sid);
- fprintf(stderr, "Error: AB interpolation not available yet.\n");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- else
- {
- int oldhs = abuf->hs;
- int firstSample = max(packet->firstSample, abuf->ts);
- int samples = min(packet->samples -
- (firstSample - packet->firstSample),
- (abuf->ts + abuf->size) - packet->firstSample);
- char * data = pkt_data +
- (firstSample - packet->firstSample) * abuf->bps;
- int dstart = (abuf->tind + (firstSample - abuf->ts)) % abuf->size;
- int part1 = min(samples, abuf->size - dstart);
- ACE_OS::memcpy (abuf->buf + (dstart * abuf->bps), data, part1 * abuf->bps);
- if (part1 < samples) {
- memcpy(abuf->buf, data + part1 * abuf->bps,
- (samples - part1) * abuf->bps);
- }
- abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
- abuf->samples = abuf->hs - abuf->ts;
- dstart =max(oldhs, abuf->ts);
-
- leave_cs(sid);
-
- part1 = firstSample - dstart;
- if (packet->resend) {
- Fprintf(stderr, "AB got resent %d(%d)\n",
- packet->firstSample, packet->samples);
- }
- else if (part1 > 0) {
- int res;
- AudioFeedBackPara para;
- Fprintf(stderr, "AB found gap %d(%d)\n", dstart, part1);
- para.cmdsn = htonl(shared->cmdsn);
- para.type = htonl(1);
- para.data.ap.firstSample = htonl(dstart);
- para.data.ap.samples = htonl(part1);
- // register ourself for the write handler.
- int result;
- result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- return result;
- this->mode_ = WRITE_FEEDBACK2;
- temp = (char *)&para;
- bytes = sizeof (para);
- }
- }
- }
- break;
- default:
- break;
+ /*
+ Fprintf(stderr, "AB got a packet: %d(%d)\n",
+ packet->firstSample, packet->samples);
+ */
+ /*
+ if (packet->firstSample % 10240 && !packet->resend) continue;
+ */
+ if (packet->samples * abuf->bps > PACKET_SIZE - sizeof(*packet)) {
+ fprintf(stderr, "Fatal error: AB has too small packet buffer, %d out of %d\n",
+ PACKET_SIZE, packet->samples * abuf->bps + sizeof(*packet));
+ exit(1);
}
- return 0;
-}
-int
-AudioBuffer::handle_output (ACE_HANDLE fd)
-{
- ACE_DEBUG ((LM_DEBUG,"handle_output:mode = %d\n",this->mode_));
- int res;
- if ((this->mode_ == WRITE_FEEDBACK1) || (this->mode_ == WRITE_FEEDBACK2))
- {
- // send feedback.
-
- if (conn_tag != 0)
- { /* packet stream */
- res = ACE_OS::write (dataSocket, temp, bytes);
- if (res == -1)
- {
- if (errno == ENOBUFS) {
- ACE_OS::perror ("AB Warning, resend-req packet discarded for");
- return 0;
- }
- ACE_OS::perror ("AB error, resend-req packet sending failed");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- }
- else
- {
- res = ACE_OS::write (dataSocket, temp, bytes);
- if (res == -1) {
- ACE_OS::perror ("AB error, resend-req packet sending failed");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- }
- if (res == 0)
- {
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)AudioBuffer::handle_output:write failed\n"));
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
- temp += res;
- bytes -= res;
- if (bytes != 0)
- return 0;
- else
- {
- // remove the write handler.
- int result;
- result = ACE_Reactor::instance ()->remove_handler (this,
- ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- return result;
-
- this->mode_ = READ_HEADER;
- temp = (char *)packet;
- bytes == -1;
- }
+ if (packet->cmdsn != shared->cmdsn) { /* outdated packet */
+ /*
+ Fprintf(stderr, "AB discarded an outdated packet\n");
+ */
+ continue;
+ }
+ enter_cs(sid);
+ if (packet->firstSample + packet->samples <= abuf->ts) {
+ /* all samples too late, discard it */
+ abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
+ abuf->samples = abuf->hs - abuf->ts;
+ leave_cs(sid);
+ /*
+ Fprintf(stderr, "AB all sample in packet %d(%d) too late\n",
+ packet->firstSample, packet->samples);
+ */
+ goto feedback_code;
+ }
+ if (packet->firstSample >= abuf->ts + abuf->size) {
+ /* all samples too early, discard the packet */
+ abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
+ abuf->samples = abuf->hs - abuf->ts;
+ leave_cs(sid);
+ /*
+ Fprintf(stderr, "AB all sample in packet %d(%d) too early\n",
+ packet->firstSample, packet->samples);
+ */
+ goto feedback_code;
}
- if (this->mode_ == WRITE_FEEDBACK1)
+
+ if (packet->samples > packet->actualSamples) {
+ leave_cs(sid);
+ fprintf(stderr, "Error: AB interpolation not available yet.\n");
+ exit(1);
+ }
+
{
- // adjust the wakeup time and feedback state.
- waketime = get_usec() + STARTUP_WAIT;
- fbstate = 1;
+ int oldhs = abuf->hs;
+ int firstSample = max(packet->firstSample, abuf->ts);
+ int samples = min(packet->samples -
+ (firstSample - packet->firstSample),
+ (abuf->ts + abuf->size) - packet->firstSample);
+ char * data = pkt_data +
+ (firstSample - packet->firstSample) * abuf->bps;
+ int dstart = (abuf->tind + (firstSample - abuf->ts)) % abuf->size;
+ int part1 = min(samples, abuf->size - dstart);
+ memcpy(abuf->buf + (dstart * abuf->bps), data, part1 * abuf->bps);
+ if (part1 < samples) {
+ memcpy(abuf->buf, data + part1 * abuf->bps,
+ (samples - part1) * abuf->bps);
+ }
+ abuf->hs = max(abuf->hs, packet->firstSample + packet->samples);
+ abuf->samples = abuf->hs - abuf->ts;
+ dstart =max(oldhs, abuf->ts);
+
+ leave_cs(sid);
+
+ part1 = firstSample - dstart;
+ if (packet->resend) {
+ Fprintf(stderr, "AB got resent %d(%d)\n",
+ packet->firstSample, packet->samples);
+ }
+ else if (part1 > 0) {
+ int res;
+ AudioFeedBackPara para;
+ Fprintf(stderr, "AB found gap %d(%d)\n", dstart, part1);
+ para.cmdsn = htonl(shared->cmdsn);
+ para.type = htonl(1);
+ para.data.ap.firstSample = htonl(dstart);
+ para.data.ap.samples = htonl(part1);
+ if (conn_tag != 0) { /* packet stream */
+ while ((res = write(dataSocket, (char *)&para, sizeof(para))) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+ if (errno == ENOBUFS) {
+ perror("AB Warning, resend-req packet discarded for");
+ break;
+ }
+ perror("AB error, resend-req packet sending failed");
+ exit(1);
+ }
+ }
+ else {
+ res = wait_write_bytes(dataSocket, (char *)&para, sizeof(para));
+ if (res == -1) {
+ perror("AB error, resend-req packet sending failed");
+ exit(1);
+ }
+ }
+ if (res < sizeof(para)) {
+ fprintf(stderr, "AB warn: send() for gap res %dB < sizeof(para) %dB\n",
+ res, sizeof(para));
+ }
+ }
}
- return 0;
-}
-
-int
-AudioBuffer::ABprocess (int socket)
-{
- int result;
- ACE_DEBUG ((LM_DEBUG,"(%P|%t)AudioBuffer::ABprocess ()\n"));
- packet = (AudioPacket *)ACE_OS::malloc(PACKET_SIZE);
- if (packet == NULL) {
- ACE_OS::perror ("AB failed to allocate mem for packet buffer");
- ACE_Reactor::instance ()->end_event_loop (); return -1;
- }
-
- this->dataSocket = socket;
- ACE_NEW_RETURN (this->handler_,
- Audio_Notification_Handler,
- -1);
- // Register the notification handler with the reactor.
- result = ACE_Reactor::instance ()->register_handler (this->handler_,
- ACE_Event_Handler::READ_MASK);
- if (result != 0)
- return result;
-
- result = ACE_Reactor::instance ()->register_handler (this,
- ACE_Event_Handler::READ_MASK);
-
- if (result != 0)
- return result;
-
- conn_tag = shared->audioMaxPktSize;
- exit_tag = 0;
-
- this->mode_ = READ_HEADER;
- pkt_data = (char *)packet + sizeof(*packet);
- temp = (char *)packet;
- bytes = -1;
- return 0;
-}
-
-// following is feedback algorithm.
-void
-AudioBuffer::feedback (void)
-{
- if (shared->live || (!shared->config.syncEffective)) return;
- /*
+
+ /* following is feedback algorithm */
+
+ feedback_code:
+
+ if (shared->live || (!shared->config.syncEffective)) continue;
+ /*
fprintf(stderr, "AB fbstate = %d\n", fbstate);
*/
- switch (fbstate) {
- case 0: /* startup init */
- if (pcmdsn != packet->cmdsn) {
- fbstate = 0;
- pcmdsn = packet->cmdsn;
- }
- else {
- fbstate = 1;
- waketime = get_usec() + STARTUP_WAIT;
- }
- break;
- case 1: /* startup wait */
- if (pcmdsn != packet->cmdsn) {
- fbstate = 0;
+ switch (fbstate) {
+ case 0: /* startup init */
+ if (pcmdsn != packet->cmdsn) {
+ fbstate = 0;
+ pcmdsn = packet->cmdsn;
+ }
+ else {
+ fbstate = 1;
+ waketime = get_usec() + STARTUP_WAIT;
+ }
break;
- }
- if (get_usec() >= waketime) {
- fbstate = 2;
- }
- break;
- case 2: /* monitoring */
- if (pcmdsn != packet->cmdsn) {
- fbstate = 0;
+ case 1: /* startup wait */
+ if (pcmdsn != packet->cmdsn) {
+ fbstate = 0;
+ break;
+ }
+ if (get_usec() >= waketime) {
+ fbstate = 2;
+ }
break;
- }
- if (abuf->samples < abuf->size >>2 ||
- abuf->samples > (abuf->size * 3) >> 2)
- {
- /* feedback action needed */
- AudioFeedBackPara para;
- int res;
- para.data.fb.addsps = 0;
- para.data.fb.addSamples = (abuf->size >> 2) - abuf->samples;
+ case 2: /* monitoring */
+ if (pcmdsn != packet->cmdsn) {
+ fbstate = 0;
+ break;
+ }
+ if (abuf->samples < abuf->size >>2 ||
+ abuf->samples > (abuf->size * 3) >> 2) {
+ /* feedback action needed */
+ AudioFeedBackPara para;
+ int res;
+ para.data.fb.addsps = 0;
+ para.data.fb.addSamples = (abuf->size >> 2) - abuf->samples;
+
+ Fprintf(stderr, "AB sends fb: %dsps, %dsamples\n",
+ para.data.fb.addsps,
+ para.data.fb.addSamples);
+ para.cmdsn = htonl(shared->cmdsn);
+ para.type = htonl(0);
+ para.data.fb.addsps = htonl(para.data.fb.addsps);
+ para.data.fb.addSamples = htonl(para.data.fb.addSamples);
+ if (conn_tag != 0) {
+ while ((res = write(dataSocket, (char *)&para, sizeof(para))) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+ if (errno == ENOBUFS) {
+ perror("AB Warning, fb packet discarded for");
+ break;
+ }
+ perror("AB error, fb packet sending failed");
+ exit(1);
+ }
+ }
+ else {
+ res = wait_write_bytes(dataSocket, (char *)&para, sizeof(para));
+ if (res == -1) {
+ perror("AB error, fb packet sending failed");
+ exit(1);
+ }
+ }
+ if (res < sizeof(para)) {
+ fprintf(stderr, "AB warn: send() for sync res %dB < sizeof(para) %dB\n",
+ res, sizeof(para));
+ }
- Fprintf(stderr, "AB sends fb: %dsps, %dsamples\n",
- para.data.fb.addsps,
- para.data.fb.addSamples);
- para.cmdsn = htonl(shared->cmdsn);
- para.type = htonl(0);
- para.data.fb.addsps = htonl(para.data.fb.addsps);
- para.data.fb.addSamples = htonl(para.data.fb.addSamples);
- int result = ACE_Reactor::instance ()->register_handler (this,ACE_Event_Handler::WRITE_MASK);
- if (result != 0)
- ACE_DEBUG ((LM_DEBUG,"register_hanlder for write failed\n"));
- this->mode_ = WRITE_FEEDBACK1;
- temp = (char *)&para;
- bytes = sizeof (para);
- return;
+ waketime = get_usec() + STARTUP_WAIT;
+ fbstate = 1;
}
- break;
- default:
- break;
+ break;
+ default:
+ break;
+ }
}
-}
-
-ACE_HANDLE
-Audio_Notification_Handler::get_handle (void) const
-{
- return asp[1];
-}
+}