diff options
Diffstat (limited to 'ACE/examples/APG/Streams')
25 files changed, 2134 insertions, 0 deletions
diff --git a/ACE/examples/APG/Streams/.cvsignore b/ACE/examples/APG/Streams/.cvsignore new file mode 100644 index 00000000000..ff318c0de98 --- /dev/null +++ b/ACE/examples/APG/Streams/.cvsignore @@ -0,0 +1,2 @@ +Answerer +Answerer diff --git a/ACE/examples/APG/Streams/Answerer.cpp b/ACE/examples/APG/Streams/Answerer.cpp new file mode 100644 index 00000000000..507b6172108 --- /dev/null +++ b/ACE/examples/APG/Streams/Answerer.cpp @@ -0,0 +1,403 @@ +/** + * $Id$ + * + * Streams Listing 01 + * + * An answering machine based on a one-way ACE_Stream + */ + +#include "ace/OS_NS_string.h" +#include "ace/Stream.h" +#include "ace/Message_Block.h" +#include "ace/FILE_IO.h" + +#include "MessageInfo.h" +#include "Message.h" +#include "BasicTask.h" +#include "EndTask.h" +#include "Util.h" +#include "RecordingDevice.h" + +// Listing 21 code/ch18 +class AnswerIncomingCall : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("AnswerIncomingCall::process()")); + + if (message->recorder ()->answer_call () < 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("AnswerIncomingCall")), + -1); + return 0; + } +}; +// Listing 21 + +// Listing 22 code/ch18 +class GetCallerId : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("GetCallerId::process()")); + + CallerId *id; + id = message->recorder ()->retrieve_callerId (); + if (!id) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("GetCallerId")), + -1); + + message->caller_id (id); + return 0; + } +}; +// Listing 22 + +// Listing 23 code/ch18 +class PlayOutgoingMessage : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("PlayOutgoingMessage::process()")); + + ACE_FILE_Addr outgoing_message = + this->get_outgoing_message (message); + + int pmrv = + message->recorder ()->play_message (outgoing_message); + if (pmrv < 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("PlayOutgoingMessage")), + -1); + return 0; + } + + ACE_FILE_Addr get_outgoing_message (Message *) + { + // Exclude 23 + return ACE_FILE_Addr (ACE_TEXT ("/tmp/outgoing_message")); + // Exclude 23 + } +}; +// Listing 23 + +// Listing 24 code/ch18 +class RecordIncomingMessage : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("RecordIncomingMessage::process()")); + + ACE_FILE_Addr incoming_message = + this->get_incoming_message_queue (); + + MessageType *type = + message->recorder ()->record_message (incoming_message); + if (!type) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("RecordIncomingMessage")), + -1); + message->incoming_message (incoming_message, type); + return 0; + } + + ACE_FILE_Addr get_incoming_message_queue (void) + { + // Exclude 24 + return ACE_FILE_Addr (ACE_TEXT ("/tmp/incoming_message")); + // Exclude 24 + } +}; +// Listing 24 + +// Listing 25 code/ch18 +class ReleaseDevice : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()")); + message->recorder ()->release (); + return 0; + } +}; +// Listing 25 + +// Listing 26 code/ch18 +class EncodeMessage : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()")); + + ACE_FILE_Addr &incoming = message->addr (); + ACE_FILE_Addr addr = this->get_message_destination (message); + + if (message->is_text ()) + Util::convert_to_unicode (incoming, addr); + else if (message->is_audio ()) + Util::convert_to_mp3 (incoming, addr); + else if (message->is_video ()) + Util::convert_to_mpeg (incoming, addr); + + message->addr (addr); + return 0; + } + + ACE_FILE_Addr get_message_destination (Message *) + { + // Exclude 26 + return ACE_FILE_Addr (ACE_TEXT ("/tmp/encoded_message")); + // Exclude 26 + } +}; +// Listing 26 + +// Listing 27 code/ch18 +class SaveMetaData : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("SaveMetaData::process()")); + + ACE_TString path (message->addr ().get_path_name ()); + path += ACE_TEXT (".xml"); + + ACE_FILE_Connector connector; + ACE_FILE_IO file; + ACE_FILE_Addr addr (path.c_str ()); + if (connector.connect (file, addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("create meta-data file")), + 0); + + file.truncate (0); + this->write (file, "<Message>\n"); + // ... + this->write (file, "</Message>\n"); + file.close (); + return 0; + } + +private: + int write (ACE_FILE_IO &file, const char *str) + { + return file.send (str, ACE_OS::strlen (str)); + } +}; +// Listing 27 + +// Listing 28 code/ch18 +class NotifySomeone : public BasicTask +{ +protected: + virtual int process (Message *message) + { + ACE_TRACE (ACE_TEXT ("NotifySomeone::process()")); + + // Format an email to tell someone about the + // newly received message. + // ... + + // Display message information in the logfile + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("New message from %s ") + ACE_TEXT ("received and stored at %s\n"), + message->caller_id ()->string (), + message->addr ().get_path_name ())); + return 0; + } +}; +// Listing 28 + +// Listing 10 code/ch18 +class RecordingStream : public ACE_Stream<ACE_MT_SYNCH> +{ +public: + typedef ACE_Stream<ACE_MT_SYNCH> inherited; + typedef ACE_Module<ACE_MT_SYNCH> Module; + + RecordingStream () : inherited() + { } +// Listing 10 + + // Listing 1000 code/ch18 + virtual int open (void *arg, + Module *head = 0, Module *tail = 0) + { + if (tail == 0) + ACE_NEW_RETURN (tail, + Module (ACE_TEXT ("End Module"), new EndTask ()), + -1); + this->inherited::open (arg, head, tail); + // Listing 1000 + + // Listing 1001 code/ch18 + Module *answerIncomingCallModule; + ACE_NEW_RETURN (answerIncomingCallModule, + Module (ACE_TEXT ("Answer Incoming Call"), + new AnswerIncomingCall ()), + -1); + + // Listing 11 code/ch18 + Module *getCallerIdModule; + ACE_NEW_RETURN (getCallerIdModule, + Module (ACE_TEXT ("Get Caller ID"), new GetCallerId ()), + -1); + // Listing 11 + + Module *playOGMModule; + ACE_NEW_RETURN (playOGMModule, + Module (ACE_TEXT ("Play Outgoing Message"), + new PlayOutgoingMessage ()), + -1); + + Module *recordModule; + ACE_NEW_RETURN (recordModule, + Module (ACE_TEXT ("Record Incoming Message"), + new RecordIncomingMessage ()), + -1); + + Module *releaseModule; + ACE_NEW_RETURN (releaseModule, + Module (ACE_TEXT ("Release Device"), + new ReleaseDevice ()), + -1); + + Module *conversionModule; + ACE_NEW_RETURN (conversionModule, + Module (ACE_TEXT ("Encode Message"), + new EncodeMessage ()), + -1); + + Module *saveMetaDataModule; + ACE_NEW_RETURN (saveMetaDataModule, + Module (ACE_TEXT ("Save Meta-Data"), + new SaveMetaData ()), + -1); + + Module *notificationModule; + ACE_NEW_RETURN (notificationModule, + Module (ACE_TEXT ("Notify Someone"), + new NotifySomeone ()), + -1); + // Listing 1001 + + // Listing 12 code/ch18 + if (this->push (notificationModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("notificationModule")), + -1); + if (this->push (saveMetaDataModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("saveMetaDataModule")), + -1); + if (this->push (conversionModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("conversionModule")), + -1); + if (this->push (releaseModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("releaseModule")), + -1); + if (this->push (recordModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("recordModule")), + -1); + if (this->push (playOGMModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("playOGMModule")), + -1); + if (this->push (getCallerIdModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + ACE_TEXT ("getCallerIdModule")), + -1); + if (this->push (answerIncomingCallModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n") + ACE_TEXT ("answerIncomingCallModule")), + -1); + // Listing 12 + + return 0; + } + + // Listing 13 code/ch18 + int record (RecordingDevice *recorder) + { + ACE_Message_Block * mb; + ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1); + + Message *message = (Message *)mb->wr_ptr (); + mb->wr_ptr (sizeof(Message)); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("RecordingStream::record() - ") + ACE_TEXT ("message->recorder(recorder)\n"))); + message->recorder (recorder); + + int rval = this->put (mb); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("RecordingStream::record() - ") + ACE_TEXT ("this->put() returns %d\n"), + rval)); + return rval; + } + // Listing 13 +}; + + +// Listing 1 code/ch18 +int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) +{ + RecordingDevice *recorder = + RecordingDeviceFactory::instantiate (argc, argv); + // Listing 1 + + // Listing 2 code/ch18 + RecordingStream *recording_stream; + ACE_NEW_RETURN (recording_stream, RecordingStream, -1); + + if (recording_stream->open (0) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("RecordingStream->open()")), + 0); + // Listing 2 + + // Listing 3 code/ch18 + for (;;) + { + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Waiting for incoming message\n"))); + RecordingDevice *activeRecorder = + recorder->wait_for_activity (); + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("Initiating recording process\n"))); + + recording_stream->record (activeRecorder); + } + // Listing 3 + + return 0; +} diff --git a/ACE/examples/APG/Streams/BasicTask.h b/ACE/examples/APG/Streams/BasicTask.h new file mode 100644 index 00000000000..edebc397998 --- /dev/null +++ b/ACE/examples/APG/Streams/BasicTask.h @@ -0,0 +1,143 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef BASIC_TASK_H +#define BASIC_TASK_H + +#include "ace/Task_T.h" +#include "ace/ace_wchar.h" + +// Listing 100 code/ch18 +class BasicTask : public ACE_Task<ACE_MT_SYNCH> +{ +public: + typedef ACE_Task<ACE_MT_SYNCH> inherited; + + BasicTask () : inherited() + { } + + virtual int open (void * = 0) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("BasicTask::open() starting ") + ACE_TEXT ("%d threads\n"), + this->desired_threads ())); + + return this->activate (THR_NEW_LWP | THR_JOINABLE, + this->desired_threads ()); + } + // Listing 100 + + // Listing 101 code/ch18 + int put (ACE_Message_Block *message, + ACE_Time_Value *timeout) + { + return this->putq (message, timeout); + } + // Listing 101 + + // Listing 1020 code/ch18 + virtual int svc (void) + { + for (ACE_Message_Block *message = 0; ; ) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("BasicTask::svc() - ") + ACE_TEXT ("waiting for work\n" ))); + + if (this->getq (message) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("getq")), + -1); + // Listing 1020 + + // Listing 1021 code/ch18 + if (message->msg_type () == ACE_Message_Block::MB_HANGUP) + { + if (this->putq (message) == -1) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Task::svc() putq"))); + message->release (); + } + break; + } + // Listing 1021 + + // Listing 1022 code/ch18 + Message *recordedMessage = + (Message *)message->rd_ptr (); + + if (this->process (recordedMessage) == -1) + { + message->release (); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("process")), + -1); + } + // Listing 1022 + + // Listing 1023 code/ch18 + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("BasicTask::svc() - ") + ACE_TEXT ("Continue to next stage\n" ))); + if (this->next_step (message) < 0) + { + ACE_ERROR ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("put_next failed"))); + message->release (); + break; + } + // Listing 1023 + } + + return 0; + } + + // Listing 103 code/ch18 + virtual int close (u_long flags) + { + int rval = 0; + + if (flags == 1) + { + ACE_Message_Block *hangup = new ACE_Message_Block (); + hangup->msg_type (ACE_Message_Block::MB_HANGUP); + if (this->putq (hangup) == -1) + { + hangup->release (); + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Task::close() putq")), + -1); + } + + rval = this->wait (); + } + + return rval; + } + // Listing 103 + + // Listing 105 code/ch18 +protected: + virtual int next_step (ACE_Message_Block *message_block) + { + return this->put_next (message_block); + } + // Listing 105 + + // Listing 104 code/ch18 + virtual int process (Message *message) = 0; + + virtual int desired_threads (void) + { + return 1; + } +}; +// Listing 104 + +#endif /* BASIC_TASK_H */ diff --git a/ACE/examples/APG/Streams/Command.h b/ACE/examples/APG/Streams/Command.h new file mode 100644 index 00000000000..eae0f5ecb5f --- /dev/null +++ b/ACE/examples/APG/Streams/Command.h @@ -0,0 +1,40 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef COMMAND_H +#define COMMAND_H + +#include "ace/SString.h" +#include "ace/Message_Block.h" + +// Listing 01 code/ch18 +class Command : public ACE_Data_Block +{ +public: + // Result Values + enum { + RESULT_PASS = 1, + RESULT_SUCCESS = 0, + RESULT_FAILURE = -1 + }; + + // Commands + enum { + CMD_UNKNOWN = -1, + CMD_ANSWER_CALL = 10, + CMD_RETRIEVE_CALLER_ID, + CMD_PLAY_MESSAGE, + CMD_RECORD_MESSAGE + } commands; + + int flags_; + int command_; + + void *extra_data_; + + int numeric_result_; + ACE_TString result_; +}; +// Listing 01 + +#endif /* COMMAND_H */ diff --git a/ACE/examples/APG/Streams/CommandModule.cpp b/ACE/examples/APG/Streams/CommandModule.cpp new file mode 100644 index 00000000000..9ee5a92918a --- /dev/null +++ b/ACE/examples/APG/Streams/CommandModule.cpp @@ -0,0 +1,20 @@ +// $Id$ + +#include "CommandModule.h" + +// Listing 01 code/ch18 +CommandModule::CommandModule (const ACE_TCHAR *module_name, + CommandTask *writer, + CommandTask *reader, + ACE_SOCK_Stream *peer) + : inherited(module_name, writer, reader, peer) +{ } +// Listing 01 + +// Listing 02 code/ch18 +ACE_SOCK_Stream &CommandModule::peer (void) +{ + ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg (); + return *peer; +} +// Listing 02 diff --git a/ACE/examples/APG/Streams/CommandModule.h b/ACE/examples/APG/Streams/CommandModule.h new file mode 100644 index 00000000000..7fe65b81f78 --- /dev/null +++ b/ACE/examples/APG/Streams/CommandModule.h @@ -0,0 +1,27 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef COMMAND_MODULE_H +#define COMMAND_MODULE_H + +#include "ace/Module.h" +#include "ace/SOCK_Stream.h" +#include "CommandTask.h" + +// Listing 01 code/ch18 +class CommandModule : public ACE_Module<ACE_MT_SYNCH> +{ +public: + typedef ACE_Module<ACE_MT_SYNCH> inherited; + typedef ACE_Task<ACE_MT_SYNCH> Task; + + CommandModule (const ACE_TCHAR *module_name, + CommandTask *writer, + CommandTask *reader, + ACE_SOCK_Stream *peer); + + ACE_SOCK_Stream &peer (void); +}; +// Listing 01 + +#endif /* COMMAND_MODULE_H */ diff --git a/ACE/examples/APG/Streams/CommandStream.cpp b/ACE/examples/APG/Streams/CommandStream.cpp new file mode 100644 index 00000000000..def3f123d77 --- /dev/null +++ b/ACE/examples/APG/Streams/CommandStream.cpp @@ -0,0 +1,97 @@ +// $Id$ + +#include "ace/Log_Msg.h" +#include "ace/OS_Memory.h" +#include "CommandStream.h" +#include "Command.h" +#include "CommandModule.h" +#include "CommandTasks.h" + +// Gotcha: superclass' open() won't open head/tail modules +// Gotcha!! Must open the stream before pushing modules! + +// Listing 01 code/ch18 +int CommandStream::open (void *arg, + ACE_Module<ACE_MT_SYNCH> *head, + ACE_Module<ACE_MT_SYNCH> *tail) +{ + ACE_TRACE (ACE_TEXT ("CommandStream::open(peer)")); + + if (this->inherited::open (arg, head, tail) == -1) + ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), + ACE_TEXT ("Failed to open superclass")), + -1); + // Listing 01 + + // Listing 02 code/ch18 + CommandModule *answerCallModule; + ACE_NEW_RETURN (answerCallModule, + AnswerCallModule (this->peer_), + -1); + + CommandModule *retrieveCallerIdModule; + ACE_NEW_RETURN (retrieveCallerIdModule, + RetrieveCallerIdModule (this->peer_), + -1); + + CommandModule *playMessageModule; + ACE_NEW_RETURN (playMessageModule, + PlayMessageModule (this->peer_), + -1); + + CommandModule *recordMessageModule; + ACE_NEW_RETURN (recordMessageModule, + RecordMessageModule (this->peer_), + -1); + // Listing 02 + + // Listing 03 code/ch18 + if (this->push (answerCallModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + answerCallModule->name()), + -1); + + if (this->push (retrieveCallerIdModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + retrieveCallerIdModule->name()), + -1); + + if (this->push (playMessageModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + playMessageModule->name()), + -1); + + if (this->push (recordMessageModule) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Failed to push %p\n"), + recordMessageModule->name()), + -1); + // Listing 03 + return 0; +} + +// Listing 04 code/ch18 +Command *CommandStream::execute (Command *command) +{ + ACE_Message_Block *mb; + ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0); + if (this->put (mb) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("Fail on put command %d: %p\n"), + command->command_, + ACE_TEXT ("")), + 0); + + this->get (mb); + command = (Command *)mb->data_block (); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Command (%d) returns (%d)\n"), + command->command_, + command->numeric_result_)); + + return command; +} +// Listing 04 diff --git a/ACE/examples/APG/Streams/CommandStream.h b/ACE/examples/APG/Streams/CommandStream.h new file mode 100644 index 00000000000..97e9e673f7c --- /dev/null +++ b/ACE/examples/APG/Streams/CommandStream.h @@ -0,0 +1,44 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef COMMAND_STREAM_H +#define COMMAND_STREAM_H + +#include "ace/Module.h" +#include "ace/Stream.h" +#include "ace/SOCK_Stream.h" +#include "ace/Synch_Traits.h" + +#include "Command.h" + +// A CommandStream is a bidirectional ACE_Stream implementing a chain +// of commands. A message will move down the stream until a +// CommandModule is capable of processing it. After processing, it +// will move on down the stream to the end. Data received from the +// tail will likewise move up the stream until the downstream's +// partner module is encoutered. The retrieved data will be processed +// and continue on up the stream. + +// Listing 01 code/ch18 +class CommandStream : public ACE_Stream<ACE_MT_SYNCH> +{ +public: + typedef ACE_Stream<ACE_MT_SYNCH> inherited; + + CommandStream (ACE_SOCK_Stream *peer) + : inherited (), peer_(peer) { } + + virtual int open (void *arg, + ACE_Module<ACE_MT_SYNCH> *head = 0, + ACE_Module<ACE_MT_SYNCH> *tail = 0); + + Command *execute (Command *command); + +private: + CommandStream () { } + + ACE_SOCK_Stream *peer_; +}; +// Listing 01 + +#endif /* COMMAND_STREAM_H */ diff --git a/ACE/examples/APG/Streams/CommandTask.cpp b/ACE/examples/APG/Streams/CommandTask.cpp new file mode 100644 index 00000000000..7ad63166ffd --- /dev/null +++ b/ACE/examples/APG/Streams/CommandTask.cpp @@ -0,0 +1,153 @@ +// $Id$ + +#include "CommandTask.h" + +// Listing 01 code/ch18 +CommandTask::CommandTask (int command) + : inherited (), command_(command) +{ } +// Listing 01 + +// Listing 02 code/ch18 +int CommandTask::open (void *) +{ + return this->activate (); +} +// Listing 02 + +// Listing 03 code/ch18 +int CommandTask::put (ACE_Message_Block *message, + ACE_Time_Value *timeout) +{ + return this->putq (message, timeout); +} +// Listing 03 + +// Listing 04 code/ch18 +int CommandTask::process (Command *) +{ + ACE_TRACE (ACE_TEXT ("CommandTask::process()")); + return Command::RESULT_FAILURE; +} +// Listing 04 + +// Listing 05 code/ch18 +int CommandTask::close (u_long flags) +{ + int rval = 0; + if (flags == 1) + { + ACE_Message_Block *hangup = new ACE_Message_Block; + hangup->msg_type (ACE_Message_Block::MB_HANGUP); + if (this->putq (hangup->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Task::close() putq")), + -1); + } + + hangup->release (); + rval = this->wait (); + } + + return rval; +} +// Listing 05 + +// Listing 06 code/ch18 +// Listing 061 code/ch18 +int CommandTask::svc (void) +{ + ACE_Message_Block *message; + + for (;;) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("CommandTask::svc() - ") + ACE_TEXT ("%s waiting for work\n"), + this->module ()->name ())); + + if (this->getq (message) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("getq")), + -1); + + if (message->msg_type () == ACE_Message_Block::MB_HANGUP) + { + if (this->putq (message->duplicate ()) == -1) + { + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("Task::svc() putq")), + -1); + } + + message->release (); + break; + } + // Listing 061 + + // Listing 062 code/ch18 + Command *command = (Command *)message->data_block (); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("CommandTask::svc() - ") + ACE_TEXT ("%s got work request %d\n"), + this->module ()->name (), + command->command_)); + + if (command->command_ != this->command_) + { + this->put_next (message->duplicate ()); + } + // Listing 062 + // Listing 063 code/ch18 + else + { + int result = this->process (command); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("CommandTask::svc() - ") + ACE_TEXT ("%s work request %d result is %d\n"), + this->module ()->name (), + command->command_, + result)); + + if (result == Command::RESULT_FAILURE) + { + command->numeric_result_ = -1; + } + // Listing 063 + // Listing 064 code/ch18 + else if (result == Command::RESULT_PASS) + { + this->put_next (message->duplicate ()); + } + // Listing 064 + // Listing 065 code/ch18 + else // result == Command::RESULT_SUCCESS + { + if (this->is_writer ()) + { + this->sibling ()->putq + (message->duplicate ()); + } + // Listing 065 + // Listing 066 code/ch18 + else // this->is_reader () + { + this->put_next (message->duplicate ()); + } + // Listing 066 + } // result == ... + } // command->command_ ? = this->command_ + + // Listing 067 code/ch18 + message->release (); + } // for (;;) + + return 0; +} +// Listing 067 +// Listing 06 diff --git a/ACE/examples/APG/Streams/CommandTask.h b/ACE/examples/APG/Streams/CommandTask.h new file mode 100644 index 00000000000..ae78017b0f9 --- /dev/null +++ b/ACE/examples/APG/Streams/CommandTask.h @@ -0,0 +1,39 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef COMMAND_TASK_H +#define COMMAND_TASK_H + +#include "ace/Task.h" +#include "ace/Module.h" + +#include "Command.h" + +// Listing 01 code/ch18 +class CommandTask : public ACE_Task<ACE_MT_SYNCH> +{ +public: + typedef ACE_Task<ACE_MT_SYNCH> inherited; + + virtual ~CommandTask () { } + + virtual int open (void * = 0 ); + + int put (ACE_Message_Block *message, + ACE_Time_Value *timeout); + + virtual int svc (void); + + virtual int close (u_long flags); + +protected: + CommandTask (int command); + + virtual int process (Command *message); + + int command_; +}; +// Listing 01 + + +#endif /* COMMAND_TASK_H */ diff --git a/ACE/examples/APG/Streams/CommandTasks.cpp b/ACE/examples/APG/Streams/CommandTasks.cpp new file mode 100644 index 00000000000..78a5e2de451 --- /dev/null +++ b/ACE/examples/APG/Streams/CommandTasks.cpp @@ -0,0 +1,221 @@ +// $Id$ + +#include "ace/FILE_Addr.h" +#include "ace/FILE_Connector.h" +#include "ace/FILE_IO.h" + +#include "Command.h" +#include "CommandTasks.h" +#include "RecordingDevice_Text.h" + +// Listing 011 code/ch18 +AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer) + : CommandModule (ACE_TEXT ("AnswerCall Module"), + new AnswerCallDownstreamTask (), + new AnswerCallUpstreamTask (), + peer) +{ } +// Listing 011 +// Listing 012 code/ch18 +AnswerCallDownstreamTask::AnswerCallDownstreamTask (void) + : CommandTask(Command::CMD_ANSWER_CALL) +{ } +// Listing 012 +// Listing 013 code/ch18 +int AnswerCallDownstreamTask::process (Command *command) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream)\n"))); + + TextListenerAcceptor *acceptor = + (TextListenerAcceptor *)command->extra_data_; + + CommandModule *module = + (CommandModule*)this->module (); + + command->numeric_result_ = + acceptor->accept (module->peer ()); + + acceptor->release (); + return Command::RESULT_SUCCESS; +} +// Listing 013 +// Listing 014 code/ch18 +AnswerCallUpstreamTask::AnswerCallUpstreamTask (void) + : CommandTask(Command::CMD_ANSWER_CALL) +{ } +// Listing 014 +// Listing 015 code/ch18 +int AnswerCallUpstreamTask::process (Command *) +{ + ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream)\n"))); + + return Command::RESULT_SUCCESS; +} +// Listing 015 + +// Listing 021 code/ch18 +RetrieveCallerIdModule::RetrieveCallerIdModule + (ACE_SOCK_Stream *peer) + : CommandModule (ACE_TEXT ("RetrieveCallerId Module"), + new RetrieveCallerIdDownstreamTask (), + new RetrieveCallerIdUpstreamTask (), + peer) +{ } +// Listing 021 +// Listing 022 code/ch18 +RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask + (void) + : CommandTask(Command::CMD_RETRIEVE_CALLER_ID) +{ } + +int RetrieveCallerIdDownstreamTask::process (Command *) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Retrieving Caller ID data\n"))); + + return Command::RESULT_SUCCESS; +} +// Listing 022 +// Listing 023 code/ch18 +RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask + (void) + : CommandTask(Command::CMD_RETRIEVE_CALLER_ID) +{ } + +int RetrieveCallerIdUpstreamTask::process (Command *command) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Returning Caller ID data\n"))); + + ACE_INET_Addr remote_addr; + + CommandModule *module = + (CommandModule*)this->module (); + + module->peer ().get_remote_addr (remote_addr); + ACE_TCHAR remote_addr_str[256]; + remote_addr.addr_to_string (remote_addr_str, 256); + command->result_ = ACE_TString (remote_addr_str); + + return Command::RESULT_SUCCESS; +} +// Listing 023 + +PlayMessageModule::PlayMessageModule (ACE_SOCK_Stream *peer) + : CommandModule (ACE_TEXT ("PlayMessage Module"), + new PlayMessageDownstreamTask (), + new PlayMessageUpstreamTask (), + peer) +{ } + +PlayMessageDownstreamTask::PlayMessageDownstreamTask (void) + : CommandTask(Command::CMD_PLAY_MESSAGE) +{ } +// Listing 032 code/ch18 +int PlayMessageDownstreamTask::process (Command *command) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Play Outgoing Message\n"))); + + ACE_FILE_Connector connector; + ACE_FILE_IO file; + + ACE_FILE_Addr *addr = + (ACE_FILE_Addr *)command->extra_data_; + + if (connector.connect (file, *addr) == -1) + { + command->numeric_result_ = -1; + } + else + { + command->numeric_result_ = 0; + + CommandModule *module = + (CommandModule*)this->module (); + + char rwbuf[512]; + ssize_t rwbytes; + while ((rwbytes = file.recv (rwbuf, 512)) > 0) + { + module->peer ().send_n (rwbuf, rwbytes); + } + } + + return Command::RESULT_SUCCESS; +} +// Listing 032 +PlayMessageUpstreamTask::PlayMessageUpstreamTask (void) + : CommandTask(Command::CMD_PLAY_MESSAGE) +{ } + +int PlayMessageUpstreamTask::process (Command *command) +{ + ACE_FILE_Addr * addr = + (ACE_FILE_Addr *)command->extra_data_; + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Outgoing message (%s) sent\n"), + addr->get_path_name ())); + + return Command::RESULT_SUCCESS; +} + +RecordMessageModule::RecordMessageModule (ACE_SOCK_Stream *peer) + : CommandModule (ACE_TEXT ("RecordMessage Module"), + new RecordMessageDownstreamTask (), + new RecordMessageUpstreamTask (), + peer) +{ } + +RecordMessageDownstreamTask::RecordMessageDownstreamTask (void) + : CommandTask(Command::CMD_RECORD_MESSAGE) +{ } + +int RecordMessageDownstreamTask::process (Command *) +{ + return Command::RESULT_SUCCESS; +} + +RecordMessageUpstreamTask::RecordMessageUpstreamTask (void) + : CommandTask(Command::CMD_RECORD_MESSAGE) +{ } +// Listing 033 code/ch18 +int RecordMessageUpstreamTask::process (Command *command) +{ + // Collect whatever the peer sends and write into the + // specified file. + ACE_FILE_Connector connector; + ACE_FILE_IO file; + + ACE_FILE_Addr *addr = + (ACE_FILE_Addr *)command->extra_data_; + + if (connector.connect (file, *addr) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("create file")), + Command::RESULT_FAILURE); + file.truncate (0); + + CommandModule *module = + (CommandModule*)this->module (); + + ssize_t total_bytes = 0; + char rwbuf[512]; + ssize_t rwbytes; + while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0) + { + total_bytes += file.send_n (rwbuf, rwbytes); + } + + file.close (); + + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("RecordMessageUpstreamTask ") + ACE_TEXT ("- recorded %d byte message\n"), + total_bytes)); + + return Command::RESULT_SUCCESS; +} +// Listing 033 diff --git a/ACE/examples/APG/Streams/CommandTasks.h b/ACE/examples/APG/Streams/CommandTasks.h new file mode 100644 index 00000000000..0d55d4da07b --- /dev/null +++ b/ACE/examples/APG/Streams/CommandTasks.h @@ -0,0 +1,108 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef COMMAND_TASKS_H +#define COMMAND_TASKS_H + +#include "ace/SOCK_Stream.h" + +#include "Command.h" +#include "CommandTask.h" +#include "CommandModule.h" + +// CommandModule and CommandTask objects that implement the command +// stream functions. + +// Listing 011 code/ch18 +class AnswerCallModule : public CommandModule +{ +public: + AnswerCallModule (ACE_SOCK_Stream * peer); +}; +// Listing 011 +// Listing 012 code/ch18 +class AnswerCallDownstreamTask : public CommandTask +{ +public: + AnswerCallDownstreamTask (); +protected: + virtual int process (Command *command); +}; +// Listing 012 +// Listing 013 code/ch18 +class AnswerCallUpstreamTask : public CommandTask +{ +public: + AnswerCallUpstreamTask (); +protected: + virtual int process (Command *command); +}; +// Listing 013 + +// Listing 02 code/ch18 +class RetrieveCallerIdModule : public CommandModule +{ +public: + RetrieveCallerIdModule (ACE_SOCK_Stream *peer); +}; +class RetrieveCallerIdDownstreamTask : public CommandTask +{ +public: + RetrieveCallerIdDownstreamTask (); +protected: + virtual int process (Command *command); +}; +class RetrieveCallerIdUpstreamTask : public CommandTask +{ +public: + RetrieveCallerIdUpstreamTask (); +protected: + virtual int process (Command *command); +}; +// Listing 02 + +// Listing 03 code/ch18 +class PlayMessageModule : public CommandModule +{ +public: + PlayMessageModule (ACE_SOCK_Stream *peer); +}; +class PlayMessageDownstreamTask : public CommandTask +{ +public: + PlayMessageDownstreamTask (); +protected: + virtual int process (Command *command); +}; +class PlayMessageUpstreamTask : public CommandTask +{ +public: + PlayMessageUpstreamTask (); +protected: + virtual int process (Command *command); +}; +// Listing 03 + +// Listing 04 code/ch18 +class RecordMessageModule : public CommandModule +{ +public: + RecordMessageModule (ACE_SOCK_Stream *peer); +}; +class RecordMessageDownstreamTask : public CommandTask +{ +public: + RecordMessageDownstreamTask (); +protected: + virtual int process (Command *command); +}; +class RecordMessageUpstreamTask : public CommandTask +{ +public: + RecordMessageUpstreamTask (); +protected: + virtual int process (Command *command); +}; +// Listing 04 + +#endif /* COMMAND_TASKS_H */ diff --git a/ACE/examples/APG/Streams/EndTask.h b/ACE/examples/APG/Streams/EndTask.h new file mode 100644 index 00000000000..a42eca655d9 --- /dev/null +++ b/ACE/examples/APG/Streams/EndTask.h @@ -0,0 +1,27 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef END_TASK_H +#define END_TASK_H + +// Listing 1 code/ch18 +class EndTask : public BasicTask { +protected: + virtual int process (Message *) + { + ACE_TRACE (ACE_TEXT ("EndTask::process()")); + return 0; + } + + virtual int next_step (ACE_Message_Block *mb) + { + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("EndTask::next_step() - ") + ACE_TEXT ("end of the line.\n"))); + mb->release (); + return 0; + } +}; +// Listing 1 + +#endif /* END_TASK_H */ diff --git a/ACE/examples/APG/Streams/Makefile.am b/ACE/examples/APG/Streams/Makefile.am new file mode 100644 index 00000000000..e23f2f22143 --- /dev/null +++ b/ACE/examples/APG/Streams/Makefile.am @@ -0,0 +1,52 @@ +## Process this file with automake to create Makefile.in +## +## $Id$ +## +## This file was generated by MPC. Any changes made directly to +## this file will be lost the next time it is generated. +## +## MPC Command: +## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu + +ACE_BUILDDIR = $(top_builddir) +ACE_ROOT = $(top_srcdir) + + +## Makefile.Answerer.am + +if BUILD_THREADS +if !BUILD_ACE_FOR_TAO +noinst_PROGRAMS = Answerer + +Answerer_CPPFLAGS = \ + -I$(ACE_ROOT) \ + -I$(ACE_BUILDDIR) + +Answerer_SOURCES = \ + Answerer.cpp \ + CommandModule.cpp \ + CommandStream.cpp \ + CommandTask.cpp \ + CommandTasks.cpp \ + RecordingDeviceFactory.cpp \ + RecordingDevice_Text.cpp \ + CommandModule.h \ + CommandStream.h \ + CommandTask.h \ + CommandTasks.h \ + RecordingDeviceFactory.h \ + RecordingDevice_Text.h + +Answerer_LDADD = \ + $(ACE_BUILDDIR)/ace/libACE.la + +endif !BUILD_ACE_FOR_TAO +endif BUILD_THREADS + +## Clean up template repositories, etc. +clean-local: + -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.* + -rm -f gcctemp.c gcctemp so_locations *.ics + -rm -rf cxx_repository ptrepository ti_files + -rm -rf templateregistry ir.out + -rm -rf ptrepository SunWS_cache Templates.DB diff --git a/ACE/examples/APG/Streams/Message.h b/ACE/examples/APG/Streams/Message.h new file mode 100644 index 00000000000..29ddd30d5a1 --- /dev/null +++ b/ACE/examples/APG/Streams/Message.h @@ -0,0 +1,92 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef MESSAGE_H +#define MESSAGE_H + +class RecordingDevice; + +class Message +{ +public: + Message () : device_(0), type_(0), id_(0) + { } + + ~Message () + { } + + RecordingDevice *recorder (void) + { + return this->device_; + } + + void recorder (RecordingDevice *device) + { + this->device_ = device; + } + + void type (MessageType *type) + { + this->type_ = type; + } + + MessageType *type (void) + { + return this->type_; + } + + void caller_id (CallerId *id) + { + this->id_ = id; + } + + CallerId *caller_id (void) + { + return this->id_; + } + + void addr (ACE_FILE_Addr &addr) + { + this->addr_ = addr; + } + + void incoming_message (ACE_FILE_Addr &addr, MessageType *type) + { + this->addr_ = addr; + this->type_ = type; + } + + ACE_FILE_Addr &addr (void) + { + return this->addr_; + } + + int is_text (void) + { + return this->type_->is_text (); + } + + int is_audio (void) + { + return this->type_->is_audio (); + } + + int is_video (void) + { + return this->type_->is_video (); + } + +private: + RecordingDevice *device_; + MessageType *type_; + CallerId *id_; + ACE_FILE_Addr addr_; +}; + +class AudioMessage : public Message +{ }; + +class VideoMessage : public Message +{ }; + +#endif /* MESSAGE_H */ diff --git a/ACE/examples/APG/Streams/MessageInfo.h b/ACE/examples/APG/Streams/MessageInfo.h new file mode 100644 index 00000000000..0f0f1bc60dc --- /dev/null +++ b/ACE/examples/APG/Streams/MessageInfo.h @@ -0,0 +1,100 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef MESSAGE_INFO_H +#define MESSAGE_INFO_H + +#include "ace/FILE_Addr.h" +#include "ace/SString.h" + +/* Opaque class that represents a caller's ID */ +class CallerId +{ +public: + CallerId () : id_ (ACE_TEXT ("UNKNOWN")) + { } + + CallerId (ACE_TString id) : id_(id) + { } + + const ACE_TCHAR * string(void) + { + return this->id_.c_str (); + } + +private: + ACE_TString id_; +}; + +class MessageType +{ +public: + enum { + // Known video codecs + FIRST_VIDEO_CODEC = 1, + + DIVX, + // ... + LAST_VIDEO_CODEC, + + // Known audio codecs + FIRST_AUDIO_CODEC, + + MP3, + RAWPCM, + // ... + LAST_AUDIO_CODEC, + + // Known text codecs + FIRST_TEXT_CODEC, + + RAWTEXT, + XML, + + // ... + LAST_TEXT_CODEC, + + LAST_CODEC + }; + + MessageType (int codec, ACE_FILE_Addr addr) + : codec_(codec), addr_(addr) + { } + + int get_codec (void) + { + return this->codec_; + } + + ACE_FILE_Addr &get_addr (void) + { + return this->addr_; + } + + int is_video (void) + { + return + this->get_codec () > FIRST_VIDEO_CODEC && + this->get_codec () < LAST_VIDEO_CODEC; + } + + int is_audio (void) + { + return + this->get_codec () > FIRST_AUDIO_CODEC && + this->get_codec () < LAST_AUDIO_CODEC ; + } + + int is_text (void) + { + return + this->get_codec () > FIRST_TEXT_CODEC && + this->get_codec () < LAST_TEXT_CODEC ; + } + +private: + int codec_; + ACE_FILE_Addr addr_; +}; + +# endif /* MESSAGE_INFO_H */ diff --git a/ACE/examples/APG/Streams/RecordingDevice.h b/ACE/examples/APG/Streams/RecordingDevice.h new file mode 100644 index 00000000000..cee3d7154de --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDevice.h @@ -0,0 +1,119 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef RECORDING_DEVICE_H +#define RECORDING_DEVICE_H + +#include "ace/FILE_Addr.h" +#include "ace/Event_Handler.h" +#include "ace/Log_Msg.h" +#include "ace/Reactor.h" +#include "ace/Semaphore.h" + +class CallerId; +class MessageType; + +class RecordingDevice +{ +public: + RecordingDevice () + { + // Initialize the semaphore so that we don't block on the + // first call to wait_for_activity(). + } + + virtual ~RecordingDevice () + { + } + + virtual const ACE_TCHAR *get_name (void) const + { + return ACE_TEXT ("UNKNOWN"); + } + + virtual int init (int, ACE_TCHAR *[]) + { + return 0; + } + + // Answer the incoming call + virtual int answer_call (void) = 0; + + // Fetch some form of caller identification at the hardware level. + virtual CallerId *retrieve_callerId (void) = 0; + + // Fetch the message at the location specified by 'addr' and play + // it for the caller. + virtual int play_message (ACE_FILE_Addr &addr) = 0; + + // Record data from our physical device into the file at the + // specified address. Return the number of bytes recorded. + virtual MessageType *record_message (ACE_FILE_Addr &addr) = 0; + + // Release the RecordingDevice to accept another incoming call + virtual void release (void) + { + this->release_semaphore (); + } + + // Get the handler of the device so that wait_for_activity() can + // wait for data to arrive. + virtual ACE_Event_Handler *get_handler (void) const + { + return 0; + } + + virtual RecordingDevice *wait_for_activity (void) + { + // Block on a semaphore until it says we're ready to do + // work. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Waiting for semaphore\n"))); + this->acquire_semaphore (); + + // Use the reactor to wait for activity on our handle + ACE_Reactor reactor; + + ACE_Event_Handler *handler = this->get_handler (); + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Handler is %@\n"), + (void *)handler)); + + reactor.register_handler (this->get_handler (), + ACE_Event_Handler::READ_MASK); + + reactor.handle_events (); + // Error-check this... + + // Leave the semaphore locked so that we'll block until + // recording_complete() is invoked. + + return this; + } + +protected: + void acquire_semaphore (void) + { + this->semaphore_.acquire (); + } + + void release_semaphore (void) + { + // Reset the semaphore so that wait_for_activity() will + // unblock. + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("Releasing semaphore\n"))); + this->semaphore_.release (); + } + +private: + ACE_Semaphore semaphore_; +}; + +#include "RecordingDevice_Text.h" +#include "RecordingDevice_USRVM.h" +#include "RecordingDevice_QC.h" + +#include "RecordingDeviceFactory.h" + +#endif /* RECORDING_DEVICE_H */ diff --git a/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp b/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp new file mode 100644 index 00000000000..f5585e1ec0a --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp @@ -0,0 +1,25 @@ +// $Id$ + +#include "RecordingDevice.h" +#include "RecordingDeviceFactory.h" +#include "RecordingDevice_Text.h" + +RecordingDevice *RecordingDeviceFactory::instantiate (int argc, + ACE_TCHAR *argv[]) +{ + RecordingDevice * device = 0; + + // Determine the implementation based on the values of argv + // Exclude 2 + device = new TextListenerAcceptor (); + // Exclude 2 + + // Initialize the device with the remaining parameters. + if (device->init (argc, argv) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("RecordingDeviceFactory::instantiate() - ") + ACE_TEXT ("%s->init(argc, argv)"), + device->get_name()), + 0); + return device; +} diff --git a/ACE/examples/APG/Streams/RecordingDeviceFactory.h b/ACE/examples/APG/Streams/RecordingDeviceFactory.h new file mode 100644 index 00000000000..13485b20947 --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDeviceFactory.h @@ -0,0 +1,22 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef RECORDING_DEVICE_FACTORY_H +#define RECORDING_DEVICE_FACTORY_H + +class RecordingDevice; + +/* + * A factory class that creates an appropriate RecordingDevice + * derivative based on command-line parameters. + */ +class RecordingDeviceFactory +{ +public: + + // Instantiate the appropriate RecordingDevice implementation + static RecordingDevice *instantiate (int argc, ACE_TCHAR *argv[]); +}; + +#endif /* RECORDING_DEVICE_FACTORY_H */ + diff --git a/ACE/examples/APG/Streams/RecordingDevice_QC.h b/ACE/examples/APG/Streams/RecordingDevice_QC.h new file mode 100644 index 00000000000..356d70afc5c --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDevice_QC.h @@ -0,0 +1,5 @@ +// $Id$ + +class QuickCam : public RecordingDevice + { + }; diff --git a/ACE/examples/APG/Streams/RecordingDevice_Text.cpp b/ACE/examples/APG/Streams/RecordingDevice_Text.cpp new file mode 100644 index 00000000000..01720bb2470 --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDevice_Text.cpp @@ -0,0 +1,197 @@ +/* + * $Id$ + * + * A RecordingDevice that listens to a socket and collects text. + */ + +#include "MessageInfo.h" +#include "RecordingDevice.h" +#include "RecordingDevice_Text.h" +#include "Util.h" + +TextListenerAcceptor::TextListenerAcceptor (void) + : ACE_Event_Handler(), RecordingDevice() +{ } + +// ACE_Event_Handler interface + +int TextListenerAcceptor::open (ACE_INET_Addr &addr) +{ + if (this->acceptor_.open (addr, 1) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("acceptor open")), + -1); + return 0; +} + +ACE_HANDLE TextListenerAcceptor::get_handle (void) const +{ + return this->acceptor_.get_handle (); +} + +int TextListenerAcceptor::handle_input (ACE_HANDLE) +{ + ACE_DEBUG ((LM_INFO, + ACE_TEXT ("TextListenerAcceptor - connection request\n" ))); + return 0; +} + +int TextListenerAcceptor::accept (ACE_SOCK_Stream &peer) +{ + return this->acceptor_.accept (peer); +} + +// RecordingDevice interface + +// Open a listening socket on the port specified by argv. +int TextListenerAcceptor::init (int argc, ACE_TCHAR *argv[]) +{ + ACE_UNUSED_ARG(argc); + + ACE_INET_Addr addr (argv[1]); + + if (this->open (addr) < 0) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("TextListener - open")), + -1); + return 0; +} + +ACE_Event_Handler *TextListenerAcceptor::get_handler (void) const +{ + return (ACE_Event_Handler *)this; +} + +RecordingDevice *TextListenerAcceptor::wait_for_activity (void) +{ + this->RecordingDevice::wait_for_activity (); + return new TextListener (this); +} + +int TextListenerAcceptor::answer_call (void) +{ + return -1; +} + +CallerId *TextListenerAcceptor::retrieve_callerId (void) +{ + return 0; +} + +int TextListenerAcceptor::play_message (ACE_FILE_Addr &addr) +{ + ACE_UNUSED_ARG(addr); + return 0; +} + +MessageType *TextListenerAcceptor::record_message (ACE_FILE_Addr &addr) +{ + ACE_UNUSED_ARG(addr); + return 0; +} + + +// Listing 01 code/ch18 +TextListener::TextListener (TextListenerAcceptor *acceptor) + : acceptor_(acceptor) +{ + ACE_TRACE ("TextListener ctor"); + + ACE_NEW (this->command_stream_, CommandStream (&(this->peer_))); + this->command_stream_->open (0); +} +// Listing 01 + +const ACE_TCHAR *TextListener::get_name (void) const +{ + return ACE_TEXT ("TextListener"); +} + +// Listing 02 code/ch18 +int TextListener::answer_call (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TextListener::answer_call()\n"))); + + Command *c = new Command (); + c->command_ = Command::CMD_ANSWER_CALL; + c->extra_data_ = this->acceptor_; + + c = this->command_stream_->execute (c); + + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TextListener::answer_call() ") + ACE_TEXT ("result is %d\n"), + c->numeric_result_)); + + return c->numeric_result_; +} +// Listing 02 + +// Listing 03 code/ch18 +CallerId *TextListener::retrieve_callerId (void) +{ + ACE_DEBUG ((LM_DEBUG, + ACE_TEXT ("TextListener::retrieve_callerId()\n"))); + + Command *c = new Command (); + c->command_ = Command::CMD_RETRIEVE_CALLER_ID; + + c = this->command_stream_->execute (c); + + CallerId *caller_id = new CallerId (c->result_); + return caller_id; +} +// Listing 03 + +// Listing 04 code/ch18 +int TextListener::play_message (ACE_FILE_Addr &addr) +{ + MessageType *type = Util::identify_message (addr); + if (type->is_text ()) + { + Command *c = new Command (); + c->command_ = Command::CMD_PLAY_MESSAGE; + c->extra_data_ = &addr; + c = this->command_stream_->execute (c); + return c->numeric_result_; + } + + ACE_FILE_Addr temp (ACE_TEXT ("/tmp/outgoing_message.text")); + ACE_FILE_IO *file; + if (type->is_audio ()) + file = Util::audio_to_text (addr, temp); + else if (type->is_video ()) + file = Util::video_to_text (addr, temp); + else + ACE_ERROR_RETURN + ((LM_ERROR, ACE_TEXT ("Invalid message type %d\n"), + type->get_codec ()), -1); + int rval = this->play_message (temp); + file->remove (); + return rval; +} +// Listing 04 + +// Listing 05 code/ch18 +MessageType *TextListener::record_message (ACE_FILE_Addr &addr) +{ + Command *c = new Command (); + c->command_ = Command::CMD_RECORD_MESSAGE; + c->extra_data_ = &addr; + c = this->command_stream_->execute (c); + if (c->numeric_result_ == -1) + return 0; + + return new MessageType (MessageType::RAWTEXT, addr); +} +// Listing 05 + +// Listing 06 code/ch18 +void TextListener::release (void) +{ + delete this; +} +// Listing 06 diff --git a/ACE/examples/APG/Streams/RecordingDevice_Text.h b/ACE/examples/APG/Streams/RecordingDevice_Text.h new file mode 100644 index 00000000000..a49f400d922 --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDevice_Text.h @@ -0,0 +1,84 @@ +/* -*- C++ -*- */ +/* + * $Id$ + * + * A RecordingDevice that listens to a socket and collects text. + */ + +#ifndef RECORDING_DEVICE_TEXT_H +#define RECORDING_DEVICE_TEXT_H + +#include "ace/FILE_IO.h" +#include "ace/FILE_Connector.h" +#include "ace/SOCK_Stream.h" +#include "ace/SOCK_Acceptor.h" + +#include "CommandStream.h" +#include "MessageInfo.h" +#include "RecordingDevice.h" + +class TextListenerAcceptor : + public ACE_Event_Handler, + public RecordingDevice +{ +public: + TextListenerAcceptor (); + + // ACE_Event_Handler interface + + int open (ACE_INET_Addr &addr); + + ACE_HANDLE get_handle (void) const; + + int handle_input (ACE_HANDLE); + + int accept (ACE_SOCK_Stream &peer); + + // RecordingDevice interface + + // Open a listening socket on the port specified by argv. + int init (int argc, ACE_TCHAR *argv[]); + + ACE_Event_Handler *get_handler (void) const; + + virtual RecordingDevice *wait_for_activity (void); + + virtual int answer_call (void); + + virtual CallerId *retrieve_callerId (void); + + virtual int play_message (ACE_FILE_Addr &addr); + + virtual MessageType *record_message (ACE_FILE_Addr &addr); + +private: + ACE_SOCK_Acceptor acceptor_; +}; + +// Listing 01 code/ch18 +class TextListener : public RecordingDevice +{ +public: + TextListener (TextListenerAcceptor *acceptor); + + virtual const ACE_TCHAR *get_name (void) const; + + int answer_call (void); + + CallerId *retrieve_callerId (void); + + int play_message (ACE_FILE_Addr &addr); + + MessageType *record_message (ACE_FILE_Addr &addr); + + virtual void release (void); + // Listing 01 + // Listing 02 code/ch18 +private: + CommandStream *command_stream_; + TextListenerAcceptor *acceptor_; + ACE_SOCK_Stream peer_; +}; +// Listing 02 + +#endif /* RECORDING_DEVICE_TEXT_H */ diff --git a/ACE/examples/APG/Streams/RecordingDevice_USRVM.h b/ACE/examples/APG/Streams/RecordingDevice_USRVM.h new file mode 100644 index 00000000000..7519f7c1c84 --- /dev/null +++ b/ACE/examples/APG/Streams/RecordingDevice_USRVM.h @@ -0,0 +1,5 @@ +// $Id$ + +class USRoboticsVoiceModem : public RecordingDevice + { + }; diff --git a/ACE/examples/APG/Streams/Util.h b/ACE/examples/APG/Streams/Util.h new file mode 100644 index 00000000000..d47a699aee7 --- /dev/null +++ b/ACE/examples/APG/Streams/Util.h @@ -0,0 +1,92 @@ +/* -*- C++ -*- */ +// $Id$ + +#ifndef UTIL_H +#define UTIL_H + +#include "ace/FILE_Addr.h" +#include "ace/FILE_Connector.h" +#include "ace/FILE_IO.h" + +class Util +{ +public: + static MessageType *identify_message (ACE_FILE_Addr &src) + { + // Determine the contents of the specified file + return new MessageType (MessageType::RAWTEXT, src); + } + + static ACE_FILE_IO *audio_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest) + { + ACE_FILE_Connector connector; + ACE_FILE_IO *file = 0; + if (connector.connect (*file, dest) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("create file")), + 0); + + // Convert audio data to printable text + + return file; + } + + static ACE_FILE_IO *video_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest) + { + ACE_FILE_Connector connector; + ACE_FILE_IO *file = 0; + if (connector.connect (*file, dest) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("create file")), + 0); + + // Extract audio data from video file and convert to printable text + return file; + } + + static int convert_to_unicode (ACE_FILE_Addr &src, ACE_FILE_Addr &dest) + { + ACE_FILE_Connector connector; + ACE_FILE_IO input; + if (connector.connect (input, src) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("read file")), + 0); + ACE_FILE_IO output; + if (connector.connect (output, dest) == -1) + ACE_ERROR_RETURN ((LM_ERROR, + ACE_TEXT ("%p\n"), + ACE_TEXT ("create file")), + 0); + + char rwbuf[512]; + ssize_t rwbytes; + while ((rwbytes = input.recv (rwbuf, 512)) > 0) + { + output.send_n (rwbuf, rwbytes); + } + + input.close (); + output.close (); + + // Convert arbirary text to unicode + return 0; + } + + static int convert_to_mp3 (ACE_FILE_Addr &, ACE_FILE_Addr &) + { + // Convert arbitrary audio data to some standard mp3 format + return 0; + } + + static int convert_to_mpeg (ACE_FILE_Addr &, ACE_FILE_Addr &) + { + // Convert arbitrary vidio data to some standard mpeg format + return 0; + } +}; + +#endif /* UTIL_H */ diff --git a/ACE/examples/APG/Streams/streams.mpc b/ACE/examples/APG/Streams/streams.mpc new file mode 100644 index 00000000000..df74b446031 --- /dev/null +++ b/ACE/examples/APG/Streams/streams.mpc @@ -0,0 +1,17 @@ +// -*- MPC -*- +// $Id$ + +project(Answerer) : aceexe { + avoids += ace_for_tao + exename = Answerer + requires += threads + Source_Files { + Answerer.cpp + CommandModule.cpp + CommandStream.cpp + CommandTask.cpp + CommandTasks.cpp + RecordingDeviceFactory.cpp + RecordingDevice_Text.cpp + } +} |