diff options
Diffstat (limited to 'TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp')
-rw-r--r-- | TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp | 374 |
1 files changed, 246 insertions, 128 deletions
diff --git a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp index 80fa1cedf07..7e4de09bfa8 100644 --- a/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp +++ b/TAO/orbsvcs/tests/AVStreams/mpeg/source/mpeg_client/ctr.cpp @@ -58,21 +58,14 @@ ACE_RCSID(mpeg_client, ctr, "$Id$") #define SPEEDHIST_SIZE 20 -#if defined max -#undef max -#endif #define max(a,b) (a>b ? a : b) - -#if defined min -#undef min -#endif #define min(a,b) (a<b ? a : b) static int usr1_flag = 0; static int rtplay = 1; static int cmdSocket = -1; - static int writeSocket = -1; +static int javaSocket = -1; static int CTRpid = -1, VBpid = -1, VDpid = -1, UIpid = -1; static int ABpid = -1; static int videoSocket = -1; @@ -120,12 +113,6 @@ static int fb_startup = 0; /* Indicate the first feedback action. The first feed feedback action would then adjust server fps linearly adjstep each time */ -AudioBuffer *abuffer; -VideoBuffer *vbuffer; -int asp[2]; -int vsp[2]; -int vdsp[2]; - #define max(a,b) (a>b ? a : b) #define min(a,b) (a<b ? a : b) @@ -140,21 +127,91 @@ OurCmdRead(char *buf, int size) if (size == 0) return 0; if (cmdBytes > 0) { - ACE_OS::memcpy (buf, cmdBuffer, size); + memcpy(buf, cmdBuffer, size); cmdBytes -= size; cmdBuffer += size; return 0; } - while ((val = ACE_OS::read (cmdSocket, (buf), (size))) <= 0) + while ((val = read(cmdSocket, (buf), (size))) <= 0) { if (val == -1 && errno == EINTR) return 1; if (!val) { - ACE_OS::perror ("CTR error, EOF reached unexpected within CmdRead()"); + perror("CTR error, EOF reached unexpected within CmdRead()"); } else { - ACE_OS::perror ("CTR CmdRead() from UI through CmdSocket"); + perror("CTR CmdRead() from UI through CmdSocket"); } - ACE_OS::exit (1); + exit(1); + } + return 0; +} + +void CmdRead(char *buf, int size) +{ + int val; + if (size == 0) return; + if (cmdBytes > 0) + { + memcpy(buf, cmdBuffer, size); + cmdBytes -= size; + cmdBuffer += size; + return; + } + while ((val = read(cmdSocket, (buf), (size))) <= 0) + { + if (val == -1 && errno == EINTR) continue; + if (!val) { + perror("CTR error, EOF reached unexpected within CmdRead()"); + } + else { + perror("CTR CmdRead() from UI through CmdSocket"); + } + exit(1); + } + return; +} + +static int CmdReadNW(char *buf, int size) +{ + struct fd_set read_mask; + int nfds = cmdSocket+1; + int val; + + if (size == 0) return 0; + + if (cmdBytes > 0) + { + memcpy(buf, cmdBuffer, size); + cmdBytes -= size; + cmdBuffer += size; + return 0; + } + + FD_ZERO(&read_mask); + FD_SET(cmdSocket, &read_mask); +#ifdef _HPUX_SOURCE + if (select(nfds, (int *)&read_mask, NULL, NULL, NULL) == -1) +#else + if (select(nfds, &read_mask, NULL, NULL, NULL) == -1) +#endif + { + if (errno == EINTR) + return -1; + perror("CTR CmdReadNW by select"); + exit(1); + } + if ((val = read(cmdSocket, (buf), (size))) < 0) + { + if (errno == EINTR) + return -1; + + perror("CTR CmdReadNW by read"); + exit(1); + } + if (val == 0) + { + fprintf(stderr, "Error: EOF reached unexpectedly within CmdReadNW()."); + exit(1); } return 0; } @@ -171,8 +228,8 @@ static void CmdWrite(unsigned char * buf, int size) { if (errno == EINTR) continue; - ACE_OS::perror ("CTR writes to UI through cmdSocket"); - ACE_OS::exit (1); + perror("CTR writes to UI through cmdSocket"); + exit(1); } } @@ -185,7 +242,7 @@ static void SocketRead(int s, char *buf, int size) // fprintf (stderr, "SocketRead: videoSocket = %d\n",videoSocket); for (;;) { - val = ACE_OS::read (s, ptr, remain); + val = read(s, ptr, remain); // fprintf(stderr, "CTR got from %sSocket %d of %d.\n",s == videoSocket ? "video" : "audio", val, remain); @@ -198,14 +255,14 @@ static void SocketRead(int s, char *buf, int size) { fprintf(stderr, "CTR error read %sSocket, ret=%d(size=%d)", s == videoSocket ? "video" : "audio", size-remain, size); - ACE_OS::perror (""); - ACE_OS::exit (1); + perror(""); + exit(1); } if (val == 0) { fprintf(stderr, "CTR error read %sSocket, EOF met, ret=%d(size=%d).\n", s == videoSocket ? "video" : "audio", size-remain, size); - ACE_OS::exit (1); + exit(1); } ptr += val; remain -= val; @@ -213,23 +270,66 @@ static void SocketRead(int s, char *buf, int size) { fprintf(stderr, "CTR error read %sSocket, read too much, ret=%d(size=%d).\n", s == videoSocket ? "video" : "audio", size-remain, size); - ACE_OS::exit (1); + exit(1); } if (remain == 0) break; } } +#if 0 +static void SocketRecv(int s, char *buf, int size) +{ int val, remain = size; + char * ptr = buf; + for (;;) + { + val = read(s, ptr, remain); + /* + fprintf(stderr, "CTR got from %sSocket %d of %d.\n", + s == videoSocket ? "video" : "audio", val, remain); + */ + if (val == -1 && errno == EINTR) + { + errno = 0; + continue; + } + if (val == -1) + { + fprintf(stderr, "CTR error read %sSocket, ret=%d(size=%d)", + s == videoSocket ? "video" : "audio", size-remain, size); + perror(""); + exit(1); + } + if (val == 0) + { + fprintf(stderr, "CTR error read %sSocket, EOF met, ret=%d(size=%d).\n", + s == videoSocket ? "video" : "audio", size-remain, size); + exit(1); + } + ptr += val; + remain -= val; + if (remain < 0) + { + fprintf(stderr, "CTR error read %sSocket, read too much, ret=%d(size=%d).\n", + s == videoSocket ? "video" : "audio", size-remain, size); + exit(1); + } + if (remain == 0) + break; + } +} +#endif + #define VideoRead(buf, size) SocketRead(videoSocket, buf, size) #define VideoWrite(buf, psize) \ - { int val; while ((val = ACE_OS::write (videoSocket, (buf), (psize))) == -1) \ + { int val; while ((val = write(videoSocket, (buf), (psize))) == -1) \ { if (errno == EINTR) continue; \ - ACE_OS::perror ("CTR writes to VS through videoSocket");\ - ACE_OS::exit (1); \ + perror("CTR writes to VS through videoSocket");\ + exit(1); \ } \ if (val < (int)(psize)) { \ - fprintf(stderr, "CTR bad VideoWrite, size %d, val %d", psize, val);ACE_OS::perror (""); }\ + fprintf(stderr, "CTR bad VideoWrite, size %d, val %d", psize, val); perror(""); }\ } #define AudioRead(buf, size) SocketRead(audioSocket, buf, size) @@ -237,8 +337,8 @@ static void SocketRead(int s, char *buf, int size) #define AudioWrite(buf, size) \ { while (write(audioSocket, (buf), (size)) == -1) \ { if (errno == EINTR) continue; \ - ACE_OS::perror ("CTR writes to AS through audioSocket"); \ - ACE_OS::exit (1); \ + perror("CTR writes to AS through audioSocket"); \ + exit(1); \ } \ } @@ -369,15 +469,15 @@ static void PlayAudioInit(void) * (double)shared->audioPara.samplesPerSecond / 1000.0); if (rawBuf == NULL) { - if ((rawBuf = (char *)ACE_OS::malloc(AUDIO_BUFSIZE)) == NULL) + if ((rawBuf = (char *)malloc(AUDIO_BUFSIZE)) == NULL) { - ACE_OS::perror ("CTR fails to allocate rawBuf for audio channel"); - ACE_OS::exit (1); + perror("CTR fails to allocate rawBuf for audio channel"); + exit(1); } - if ((workBuf = (char *)ACE_OS::malloc(AUDIO_BUFSIZE)) == NULL) + if ((workBuf = (char *)malloc(AUDIO_BUFSIZE)) == NULL) { - ACE_OS::perror ("CTR fails to allocate workBuf for audio channel"); - ACE_OS::exit (1); + perror("CTR fails to allocate workBuf for audio channel"); + exit(1); } } AudioBufSize = (AUDIO_BUFSIZE / shared->audioPara.bytesPerSample) * @@ -441,7 +541,7 @@ static int PlayAudio(void) ((double)timer_count / shared->pictureRate) * shared->audioPara.samplesPerSecond); int skip_samples = next_sample - nextASSample; - abuffer->ABskipSamples(skip_samples); + ABskipSamples(skip_samples); nextAFtime += (unsigned int)(((double)skip_samples * (double)shared->audioPara.samplesPerSecond) / (double)shared->samplesPerSecond); @@ -471,10 +571,10 @@ static int PlayAudio(void) please reduce value of parameter 'Audio timer interval',\n\ or 'Audio buffered intervals' or Frames per audio play',\n\ and try again.\n"); - ACE_OS::exit (1); + exit(1); } - read_samples = abuffer->ABgetSamples(ptr, frame_samples); + read_samples = ABgetSamples(ptr, frame_samples); /* convert and play to AF */ { @@ -497,7 +597,7 @@ static int PlayAudio(void) break; else { - ACE_OS::memcpy (rawBuf, rawBuf + samples * shared->audioPara.bytesPerSample, + memcpy(rawBuf, rawBuf + samples * shared->audioPara.bytesPerSample, left_samples * shared->audioPara.bytesPerSample); } } @@ -549,7 +649,7 @@ static void start_timer (void) if (shared->nextSample < shared->totalSamples) { int samples = videoSocket >= 0 ? 1200 : 1200; - while (abuffer->ABcheckSamples() <= samples) + while (ABcheckSamples() <= samples) { if (get_duration(val1, get_usec()) >= MAX_WAIT_USEC) { @@ -721,6 +821,11 @@ static void usr1_handler(int sig) } } +static void default_usr2_handler(int sig) +{ + Fprintf(stderr, "CTR warning: void SIGUSR2 handler.\n"); +} + static void compute_sendPattern(void) { char buf[PATTERN_SIZE]; @@ -747,7 +852,7 @@ static void compute_sendPattern(void) ComputeSendPattern(pat, buf, len, f); shared->qosRecomputes ++; - ACE_OS::memcpy (shared->sendPattern, buf, PATTERN_SIZE); + memcpy(shared->sendPattern, buf, PATTERN_SIZE); f = len - f; if (shared->config.verbose) { @@ -763,6 +868,32 @@ static void compute_sendPattern(void) } } +/* about automatic experiment plan by software developers or specific users: + to be able to conduct experiment plan, the user need to have a uid defined + by DEVELOPER_UID in "../include/common.h", and he/she needs to prepare a file + EXP_PLAN_FILE, with following format: + + {Delay #seconds | + Expriment } * + EndExpriment + + An experiment command is followed by one or more of following parameters: + + playSpeed #float + frameRateLimit #float + maxSPframes #int + filterPara #int + collectStat 0/1 + qosEffective 0/1 + syncEffective 0/1 + + and the parameter is terminated by an empty line. + + The automatic experiment plan file is opened after init() and experiment + setting read just before calling play(). The Player will be terminated + after all experiments are done. + + */ static void on_exit_routine(void) { @@ -776,30 +907,26 @@ static void on_exit_routine(void) if (audioSocket >= 0) { - char message[BUFSIZ]; - message [0] = EXIT; - ACE_OS::write (asp[0],&message,BUFSIZ); - // ACE_OS::write (audioSocket, &tmp, 1); - // ComCloseConn(audioSocket); + write(audioSocket, &tmp, 1); + ComCloseConn(audioSocket); audioSocket = -1; } if (videoSocket >= 0) { - // ACE_OS::write (videoSocket, &tmp, 1); + // write(videoSocket, &tmp, 1); // ComCloseConn(videoSocket); videoSocket = -1; if (VBpid > 0) { - char message[BUFSIZ]; - message [0] = EXIT; - ACE_OS::write (vsp[0],&message,BUFSIZ); - // ACE_OS::kill (VBpid, SIGUSR1); + kill(VBpid, SIGUSR1); VBpid = -1; } } ComCloseClient(); } +#define EXP_PLAN_FILE "experiment_plan" + int CTRmain(int argc, char **argv) { @@ -809,6 +936,7 @@ int CTRmain(int argc, FILE * fp = NULL; /* file pointer for experiment plan */ set_exit_routine_tag(0); + // setsignal(SIGUSR2, default_usr2_handler); /* allocate shared data structure and initialize it */ shared = (SharedData *) creat_shared_mem(sizeof(*shared)); @@ -850,72 +978,50 @@ int CTRmain(int argc, shared->config.maxSPframes = DEFAULT_maxSPframes; shared->config.audioConn = 0; shared->config.videoConn = 0; - shared->config.verbose = (!getuid()) || ACE_OS::getuid () == DEVELOPER_UID; - - ACE_NEW_RETURN (vbuffer, - VideoBuffer (), - -1); - - ACE_NEW_RETURN (abuffer, - AudioBuffer (), - -1); + shared->config.verbose = (!getuid()) || getuid() == DEVELOPER_UID; /* create all shared buffers: AB-CTR, VB-VD, VD-VP */ - abuffer->ABinitBuf(AB_BUF_SIZE); - vbuffer->VBinitBuf(VB_BUF_SIZE); + ABinitBuf(AB_BUF_SIZE); + VBinitBuf(VB_BUF_SIZE); VDinitBuf(VD_BUF_SIZE); - CTRpid =ACE_OS::getpid (); + CTRpid = getpid(); set_exit_routine_tag(1); - - // create the notification socket pair. - if (ACE_OS::socketpair (AF_UNIX,SOCK_STREAM,0,asp) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"Error in opening notification socket:%p", - "notification socket"),-1); - - // create the notification socket pair. - if (ACE_OS::socketpair (AF_UNIX,SOCK_STREAM,0,vsp) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"Error in opening notification socket:%p", - "notification socket"),-1); - - if (ACE_OS::socketpair (AF_UNIX,SOCK_STREAM,0,vdsp) == -1) - ACE_ERROR_RETURN ((LM_ERROR,"Error in opening notification socket:%p", - "notification socket"),-1); - /* create command socket pair */ if (ACE_OS::socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { - ACE_OS::perror ("CTR error on open cmd socketpair"); - ACE_OS::exit (1); + perror("CTR error on open cmd socketpair"); + exit(1); } cmdSocket = sv[0]; + cerr << "cmdsocket = " << cmdSocket << endl; /* fork processes: VD, GUI */ - if ((VDpid = ACE_OS::fork ()) == -1) + if ((VDpid = fork()) == -1) { - ACE_OS::perror ("CTR error on fork VD"); - ACE_OS::exit (1); + perror("CTR error on fork VD"); + exit(1); } else if (VDpid == 0) { - abuffer->ABdeleteBuf(); - ACE_OS::close (sv[0]); - ACE_OS::close (sv[1]); + ABdeleteBuf(); + close(sv[0]); + close(sv[1]); if (realTimeFlag >= 3) { if (SetRTpriority("VD", 0)) realTimeFlag = 0; } VDprocess(CTRpid); } - if ((UIpid = ACE_OS::fork ()) == -1) + if ((UIpid = fork()) == -1) { - ACE_OS::perror ("CTR error on fork UI"); - ACE_OS::exit (1); + perror("CTR error on fork UI"); + exit(1); } else if (UIpid == 0) { - vbuffer->VBdeleteBuf(); - abuffer->ABdeleteBuf(); - ACE_OS::close (sv[0]); + VBdeleteBuf(); + ABdeleteBuf(); + close(sv[0]); if (realTimeFlag >= 2) { #ifdef __svr4__ if (SetRTpriority("VB", 0)) realTimeFlag = 0; @@ -925,7 +1031,8 @@ int CTRmain(int argc, } UIprocess(sv[1]); } - ACE_OS::close (sv[1]); + close(sv[1]); + // setsignal(SIGUSR1, usr1_handler); /* initialize Audio device */ if (InitAudioDevice() == 0) @@ -935,30 +1042,32 @@ int CTRmain(int argc, // ComInitClient(VCR_TCP_PORT, VCR_UNIX_PORT, VCR_ATM_PORT); - if ((vh = (char *)ACE_OS::malloc(PATH_SIZE)) == NULL) + if ((vh = (char *)malloc(PATH_SIZE)) == NULL) { - ACE_OS::perror ("CTR failed to allocate space for vh"); - ACE_OS::exit (1); + perror("CTR failed to allocate space for vh"); + exit(1); } - if ((vf = (char *)ACE_OS::malloc(PATH_SIZE)) == NULL) + if ((vf = (char *)malloc(PATH_SIZE)) == NULL) { - ACE_OS::perror ("CTR failed to allocate space for vf"); - ACE_OS::exit (1); + perror("CTR failed to allocate space for vf"); + exit(1); } - if ((ah = (char *)ACE_OS::malloc(PATH_SIZE)) == NULL) + if ((ah = (char *)malloc(PATH_SIZE)) == NULL) { - ACE_OS::perror ("CTR failed to allocate space for ah"); - ACE_OS::exit (1); + perror("CTR failed to allocate space for ah"); + exit(1); } - if ((af = (char *)ACE_OS::malloc(PATH_SIZE)) == NULL) + if ((af = (char *)malloc(PATH_SIZE)) == NULL) { - ACE_OS::perror ("CTR failed to allocate space for af"); - ACE_OS::exit (1); + perror("CTR failed to allocate space for af"); + exit(1); } if (realTimeFlag) { if (SetRTpriority("CTR", 4)) realTimeFlag = 0; } + // atexit(on_exit_routine); + // instantiate our command handler Command_Handler command_handler (cmdSocket); if (command_handler.init (argc,argv) == -1) @@ -973,27 +1082,36 @@ int CTRmain(int argc, "(%P|%t) register_handler for command_handler failed\n"), -1); - int result = command_handler.run (); - if (ABpid == 0) - { - ACE_DEBUG ((LM_DEBUG,"(%d) Restarting the ACE_Reactor::instance ()\n",ACE_OS::getpid ())); - ACE_Reactor::instance ()->run_event_loop (); - int result = TAO_ORB_Core_instance ()->reactor ()->remove_handler (&command_handler, - ACE_Event_Handler::READ_MASK); - if (result == -1) - ACE_DEBUG ((LM_DEBUG,"(%P)Remove handler for Command Handler failed\n")); - } +// // and now instantiate the sig_handler +// Client_Sig_Handler client_sig_handler (&command_handler); - if (VBpid == 0) - { - ACE_DEBUG ((LM_DEBUG,"(%d) Restarting the ACE_Reactor::instance ()\n",ACE_OS::getpid ())); - ACE_Reactor::instance ()->run_event_loop (); - } - - ACE_DEBUG ((LM_DEBUG, - "(%P|%t) Exited the client command handler event loop\n" - "%p\n", - "run_event_loop")); +// // .. and ask it to register itself with the reactor +// if (client_sig_handler.register_handler () < 0) +// ACE_ERROR_RETURN ((LM_ERROR, +// "(%P|%t) register_handler for sig_handler failed\n"), +// -1); + + // and run the event loop + // TAO_ORB_Core_instance ()->reactor ()->run_event_loop (); + // int result; +// while (1) +// { + int result = command_handler.run (); +// cerr << "result " << result << " "; +// // ACE_DEBUG ((LM_DEBUG,"Command_Handler::run result is %s\n",result)); +// if (errno == EINTR) +// { +// cerr << "Interrupted run "; +// continue; +// } +// else +// break; +// } + + ACE_DEBUG ((LM_DEBUG, + "(%P|%t) Exited the client command handler event loop\n" + "%p\n", + "run_event_loop")); return 0; } |