summaryrefslogtreecommitdiff
path: root/ACE/examples/APG/Streams
diff options
context:
space:
mode:
Diffstat (limited to 'ACE/examples/APG/Streams')
-rw-r--r--ACE/examples/APG/Streams/.cvsignore2
-rw-r--r--ACE/examples/APG/Streams/Answerer.cpp403
-rw-r--r--ACE/examples/APG/Streams/BasicTask.h143
-rw-r--r--ACE/examples/APG/Streams/Command.h40
-rw-r--r--ACE/examples/APG/Streams/CommandModule.cpp20
-rw-r--r--ACE/examples/APG/Streams/CommandModule.h27
-rw-r--r--ACE/examples/APG/Streams/CommandStream.cpp97
-rw-r--r--ACE/examples/APG/Streams/CommandStream.h44
-rw-r--r--ACE/examples/APG/Streams/CommandTask.cpp153
-rw-r--r--ACE/examples/APG/Streams/CommandTask.h39
-rw-r--r--ACE/examples/APG/Streams/CommandTasks.cpp221
-rw-r--r--ACE/examples/APG/Streams/CommandTasks.h108
-rw-r--r--ACE/examples/APG/Streams/EndTask.h27
-rw-r--r--ACE/examples/APG/Streams/Makefile.am52
-rw-r--r--ACE/examples/APG/Streams/Message.h92
-rw-r--r--ACE/examples/APG/Streams/MessageInfo.h100
-rw-r--r--ACE/examples/APG/Streams/RecordingDevice.h119
-rw-r--r--ACE/examples/APG/Streams/RecordingDeviceFactory.cpp25
-rw-r--r--ACE/examples/APG/Streams/RecordingDeviceFactory.h22
-rw-r--r--ACE/examples/APG/Streams/RecordingDevice_QC.h5
-rw-r--r--ACE/examples/APG/Streams/RecordingDevice_Text.cpp197
-rw-r--r--ACE/examples/APG/Streams/RecordingDevice_Text.h84
-rw-r--r--ACE/examples/APG/Streams/RecordingDevice_USRVM.h5
-rw-r--r--ACE/examples/APG/Streams/Util.h92
-rw-r--r--ACE/examples/APG/Streams/streams.mpc17
25 files changed, 2134 insertions, 0 deletions
diff --git a/ACE/examples/APG/Streams/.cvsignore b/ACE/examples/APG/Streams/.cvsignore
new file mode 100644
index 00000000000..ff318c0de98
--- /dev/null
+++ b/ACE/examples/APG/Streams/.cvsignore
@@ -0,0 +1,2 @@
+Answerer
+Answerer
diff --git a/ACE/examples/APG/Streams/Answerer.cpp b/ACE/examples/APG/Streams/Answerer.cpp
new file mode 100644
index 00000000000..507b6172108
--- /dev/null
+++ b/ACE/examples/APG/Streams/Answerer.cpp
@@ -0,0 +1,403 @@
+/**
+ * $Id$
+ *
+ * Streams Listing 01
+ *
+ * An answering machine based on a one-way ACE_Stream
+ */
+
+#include "ace/OS_NS_string.h"
+#include "ace/Stream.h"
+#include "ace/Message_Block.h"
+#include "ace/FILE_IO.h"
+
+#include "MessageInfo.h"
+#include "Message.h"
+#include "BasicTask.h"
+#include "EndTask.h"
+#include "Util.h"
+#include "RecordingDevice.h"
+
+// Listing 21 code/ch18
+class AnswerIncomingCall : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("AnswerIncomingCall::process()"));
+
+ if (message->recorder ()->answer_call () < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("AnswerIncomingCall")),
+ -1);
+ return 0;
+ }
+};
+// Listing 21
+
+// Listing 22 code/ch18
+class GetCallerId : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("GetCallerId::process()"));
+
+ CallerId *id;
+ id = message->recorder ()->retrieve_callerId ();
+ if (!id)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("GetCallerId")),
+ -1);
+
+ message->caller_id (id);
+ return 0;
+ }
+};
+// Listing 22
+
+// Listing 23 code/ch18
+class PlayOutgoingMessage : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("PlayOutgoingMessage::process()"));
+
+ ACE_FILE_Addr outgoing_message =
+ this->get_outgoing_message (message);
+
+ int pmrv =
+ message->recorder ()->play_message (outgoing_message);
+ if (pmrv < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("PlayOutgoingMessage")),
+ -1);
+ return 0;
+ }
+
+ ACE_FILE_Addr get_outgoing_message (Message *)
+ {
+ // Exclude 23
+ return ACE_FILE_Addr (ACE_TEXT ("/tmp/outgoing_message"));
+ // Exclude 23
+ }
+};
+// Listing 23
+
+// Listing 24 code/ch18
+class RecordIncomingMessage : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("RecordIncomingMessage::process()"));
+
+ ACE_FILE_Addr incoming_message =
+ this->get_incoming_message_queue ();
+
+ MessageType *type =
+ message->recorder ()->record_message (incoming_message);
+ if (!type)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("RecordIncomingMessage")),
+ -1);
+ message->incoming_message (incoming_message, type);
+ return 0;
+ }
+
+ ACE_FILE_Addr get_incoming_message_queue (void)
+ {
+ // Exclude 24
+ return ACE_FILE_Addr (ACE_TEXT ("/tmp/incoming_message"));
+ // Exclude 24
+ }
+};
+// Listing 24
+
+// Listing 25 code/ch18
+class ReleaseDevice : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));
+ message->recorder ()->release ();
+ return 0;
+ }
+};
+// Listing 25
+
+// Listing 26 code/ch18
+class EncodeMessage : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("ReleaseDevice::process()"));
+
+ ACE_FILE_Addr &incoming = message->addr ();
+ ACE_FILE_Addr addr = this->get_message_destination (message);
+
+ if (message->is_text ())
+ Util::convert_to_unicode (incoming, addr);
+ else if (message->is_audio ())
+ Util::convert_to_mp3 (incoming, addr);
+ else if (message->is_video ())
+ Util::convert_to_mpeg (incoming, addr);
+
+ message->addr (addr);
+ return 0;
+ }
+
+ ACE_FILE_Addr get_message_destination (Message *)
+ {
+ // Exclude 26
+ return ACE_FILE_Addr (ACE_TEXT ("/tmp/encoded_message"));
+ // Exclude 26
+ }
+};
+// Listing 26
+
+// Listing 27 code/ch18
+class SaveMetaData : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("SaveMetaData::process()"));
+
+ ACE_TString path (message->addr ().get_path_name ());
+ path += ACE_TEXT (".xml");
+
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO file;
+ ACE_FILE_Addr addr (path.c_str ());
+ if (connector.connect (file, addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("create meta-data file")),
+ 0);
+
+ file.truncate (0);
+ this->write (file, "<Message>\n");
+ // ...
+ this->write (file, "</Message>\n");
+ file.close ();
+ return 0;
+ }
+
+private:
+ int write (ACE_FILE_IO &file, const char *str)
+ {
+ return file.send (str, ACE_OS::strlen (str));
+ }
+};
+// Listing 27
+
+// Listing 28 code/ch18
+class NotifySomeone : public BasicTask
+{
+protected:
+ virtual int process (Message *message)
+ {
+ ACE_TRACE (ACE_TEXT ("NotifySomeone::process()"));
+
+ // Format an email to tell someone about the
+ // newly received message.
+ // ...
+
+ // Display message information in the logfile
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("New message from %s ")
+ ACE_TEXT ("received and stored at %s\n"),
+ message->caller_id ()->string (),
+ message->addr ().get_path_name ()));
+ return 0;
+ }
+};
+// Listing 28
+
+// Listing 10 code/ch18
+class RecordingStream : public ACE_Stream<ACE_MT_SYNCH>
+{
+public:
+ typedef ACE_Stream<ACE_MT_SYNCH> inherited;
+ typedef ACE_Module<ACE_MT_SYNCH> Module;
+
+ RecordingStream () : inherited()
+ { }
+// Listing 10
+
+ // Listing 1000 code/ch18
+ virtual int open (void *arg,
+ Module *head = 0, Module *tail = 0)
+ {
+ if (tail == 0)
+ ACE_NEW_RETURN (tail,
+ Module (ACE_TEXT ("End Module"), new EndTask ()),
+ -1);
+ this->inherited::open (arg, head, tail);
+ // Listing 1000
+
+ // Listing 1001 code/ch18
+ Module *answerIncomingCallModule;
+ ACE_NEW_RETURN (answerIncomingCallModule,
+ Module (ACE_TEXT ("Answer Incoming Call"),
+ new AnswerIncomingCall ()),
+ -1);
+
+ // Listing 11 code/ch18
+ Module *getCallerIdModule;
+ ACE_NEW_RETURN (getCallerIdModule,
+ Module (ACE_TEXT ("Get Caller ID"), new GetCallerId ()),
+ -1);
+ // Listing 11
+
+ Module *playOGMModule;
+ ACE_NEW_RETURN (playOGMModule,
+ Module (ACE_TEXT ("Play Outgoing Message"),
+ new PlayOutgoingMessage ()),
+ -1);
+
+ Module *recordModule;
+ ACE_NEW_RETURN (recordModule,
+ Module (ACE_TEXT ("Record Incoming Message"),
+ new RecordIncomingMessage ()),
+ -1);
+
+ Module *releaseModule;
+ ACE_NEW_RETURN (releaseModule,
+ Module (ACE_TEXT ("Release Device"),
+ new ReleaseDevice ()),
+ -1);
+
+ Module *conversionModule;
+ ACE_NEW_RETURN (conversionModule,
+ Module (ACE_TEXT ("Encode Message"),
+ new EncodeMessage ()),
+ -1);
+
+ Module *saveMetaDataModule;
+ ACE_NEW_RETURN (saveMetaDataModule,
+ Module (ACE_TEXT ("Save Meta-Data"),
+ new SaveMetaData ()),
+ -1);
+
+ Module *notificationModule;
+ ACE_NEW_RETURN (notificationModule,
+ Module (ACE_TEXT ("Notify Someone"),
+ new NotifySomeone ()),
+ -1);
+ // Listing 1001
+
+ // Listing 12 code/ch18
+ if (this->push (notificationModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("notificationModule")),
+ -1);
+ if (this->push (saveMetaDataModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("saveMetaDataModule")),
+ -1);
+ if (this->push (conversionModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("conversionModule")),
+ -1);
+ if (this->push (releaseModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("releaseModule")),
+ -1);
+ if (this->push (recordModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("recordModule")),
+ -1);
+ if (this->push (playOGMModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("playOGMModule")),
+ -1);
+ if (this->push (getCallerIdModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ ACE_TEXT ("getCallerIdModule")),
+ -1);
+ if (this->push (answerIncomingCallModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n")
+ ACE_TEXT ("answerIncomingCallModule")),
+ -1);
+ // Listing 12
+
+ return 0;
+ }
+
+ // Listing 13 code/ch18
+ int record (RecordingDevice *recorder)
+ {
+ ACE_Message_Block * mb;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(Message)), -1);
+
+ Message *message = (Message *)mb->wr_ptr ();
+ mb->wr_ptr (sizeof(Message));
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("RecordingStream::record() - ")
+ ACE_TEXT ("message->recorder(recorder)\n")));
+ message->recorder (recorder);
+
+ int rval = this->put (mb);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("RecordingStream::record() - ")
+ ACE_TEXT ("this->put() returns %d\n"),
+ rval));
+ return rval;
+ }
+ // Listing 13
+};
+
+
+// Listing 1 code/ch18
+int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
+{
+ RecordingDevice *recorder =
+ RecordingDeviceFactory::instantiate (argc, argv);
+ // Listing 1
+
+ // Listing 2 code/ch18
+ RecordingStream *recording_stream;
+ ACE_NEW_RETURN (recording_stream, RecordingStream, -1);
+
+ if (recording_stream->open (0) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("RecordingStream->open()")),
+ 0);
+ // Listing 2
+
+ // Listing 3 code/ch18
+ for (;;)
+ {
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Waiting for incoming message\n")));
+ RecordingDevice *activeRecorder =
+ recorder->wait_for_activity ();
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("Initiating recording process\n")));
+
+ recording_stream->record (activeRecorder);
+ }
+ // Listing 3
+
+ return 0;
+}
diff --git a/ACE/examples/APG/Streams/BasicTask.h b/ACE/examples/APG/Streams/BasicTask.h
new file mode 100644
index 00000000000..edebc397998
--- /dev/null
+++ b/ACE/examples/APG/Streams/BasicTask.h
@@ -0,0 +1,143 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef BASIC_TASK_H
+#define BASIC_TASK_H
+
+#include "ace/Task_T.h"
+#include "ace/ace_wchar.h"
+
+// Listing 100 code/ch18
+class BasicTask : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ typedef ACE_Task<ACE_MT_SYNCH> inherited;
+
+ BasicTask () : inherited()
+ { }
+
+ virtual int open (void * = 0)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("BasicTask::open() starting ")
+ ACE_TEXT ("%d threads\n"),
+ this->desired_threads ()));
+
+ return this->activate (THR_NEW_LWP | THR_JOINABLE,
+ this->desired_threads ());
+ }
+ // Listing 100
+
+ // Listing 101 code/ch18
+ int put (ACE_Message_Block *message,
+ ACE_Time_Value *timeout)
+ {
+ return this->putq (message, timeout);
+ }
+ // Listing 101
+
+ // Listing 1020 code/ch18
+ virtual int svc (void)
+ {
+ for (ACE_Message_Block *message = 0; ; )
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("BasicTask::svc() - ")
+ ACE_TEXT ("waiting for work\n" )));
+
+ if (this->getq (message) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("getq")),
+ -1);
+ // Listing 1020
+
+ // Listing 1021 code/ch18
+ if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
+ {
+ if (this->putq (message) == -1)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Task::svc() putq")));
+ message->release ();
+ }
+ break;
+ }
+ // Listing 1021
+
+ // Listing 1022 code/ch18
+ Message *recordedMessage =
+ (Message *)message->rd_ptr ();
+
+ if (this->process (recordedMessage) == -1)
+ {
+ message->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("process")),
+ -1);
+ }
+ // Listing 1022
+
+ // Listing 1023 code/ch18
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("BasicTask::svc() - ")
+ ACE_TEXT ("Continue to next stage\n" )));
+ if (this->next_step (message) < 0)
+ {
+ ACE_ERROR ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("put_next failed")));
+ message->release ();
+ break;
+ }
+ // Listing 1023
+ }
+
+ return 0;
+ }
+
+ // Listing 103 code/ch18
+ virtual int close (u_long flags)
+ {
+ int rval = 0;
+
+ if (flags == 1)
+ {
+ ACE_Message_Block *hangup = new ACE_Message_Block ();
+ hangup->msg_type (ACE_Message_Block::MB_HANGUP);
+ if (this->putq (hangup) == -1)
+ {
+ hangup->release ();
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Task::close() putq")),
+ -1);
+ }
+
+ rval = this->wait ();
+ }
+
+ return rval;
+ }
+ // Listing 103
+
+ // Listing 105 code/ch18
+protected:
+ virtual int next_step (ACE_Message_Block *message_block)
+ {
+ return this->put_next (message_block);
+ }
+ // Listing 105
+
+ // Listing 104 code/ch18
+ virtual int process (Message *message) = 0;
+
+ virtual int desired_threads (void)
+ {
+ return 1;
+ }
+};
+// Listing 104
+
+#endif /* BASIC_TASK_H */
diff --git a/ACE/examples/APG/Streams/Command.h b/ACE/examples/APG/Streams/Command.h
new file mode 100644
index 00000000000..eae0f5ecb5f
--- /dev/null
+++ b/ACE/examples/APG/Streams/Command.h
@@ -0,0 +1,40 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef COMMAND_H
+#define COMMAND_H
+
+#include "ace/SString.h"
+#include "ace/Message_Block.h"
+
+// Listing 01 code/ch18
+class Command : public ACE_Data_Block
+{
+public:
+ // Result Values
+ enum {
+ RESULT_PASS = 1,
+ RESULT_SUCCESS = 0,
+ RESULT_FAILURE = -1
+ };
+
+ // Commands
+ enum {
+ CMD_UNKNOWN = -1,
+ CMD_ANSWER_CALL = 10,
+ CMD_RETRIEVE_CALLER_ID,
+ CMD_PLAY_MESSAGE,
+ CMD_RECORD_MESSAGE
+ } commands;
+
+ int flags_;
+ int command_;
+
+ void *extra_data_;
+
+ int numeric_result_;
+ ACE_TString result_;
+};
+// Listing 01
+
+#endif /* COMMAND_H */
diff --git a/ACE/examples/APG/Streams/CommandModule.cpp b/ACE/examples/APG/Streams/CommandModule.cpp
new file mode 100644
index 00000000000..9ee5a92918a
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandModule.cpp
@@ -0,0 +1,20 @@
+// $Id$
+
+#include "CommandModule.h"
+
+// Listing 01 code/ch18
+CommandModule::CommandModule (const ACE_TCHAR *module_name,
+ CommandTask *writer,
+ CommandTask *reader,
+ ACE_SOCK_Stream *peer)
+ : inherited(module_name, writer, reader, peer)
+{ }
+// Listing 01
+
+// Listing 02 code/ch18
+ACE_SOCK_Stream &CommandModule::peer (void)
+{
+ ACE_SOCK_Stream *peer = (ACE_SOCK_Stream *)this->arg ();
+ return *peer;
+}
+// Listing 02
diff --git a/ACE/examples/APG/Streams/CommandModule.h b/ACE/examples/APG/Streams/CommandModule.h
new file mode 100644
index 00000000000..7fe65b81f78
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandModule.h
@@ -0,0 +1,27 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef COMMAND_MODULE_H
+#define COMMAND_MODULE_H
+
+#include "ace/Module.h"
+#include "ace/SOCK_Stream.h"
+#include "CommandTask.h"
+
+// Listing 01 code/ch18
+class CommandModule : public ACE_Module<ACE_MT_SYNCH>
+{
+public:
+ typedef ACE_Module<ACE_MT_SYNCH> inherited;
+ typedef ACE_Task<ACE_MT_SYNCH> Task;
+
+ CommandModule (const ACE_TCHAR *module_name,
+ CommandTask *writer,
+ CommandTask *reader,
+ ACE_SOCK_Stream *peer);
+
+ ACE_SOCK_Stream &peer (void);
+};
+// Listing 01
+
+#endif /* COMMAND_MODULE_H */
diff --git a/ACE/examples/APG/Streams/CommandStream.cpp b/ACE/examples/APG/Streams/CommandStream.cpp
new file mode 100644
index 00000000000..def3f123d77
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandStream.cpp
@@ -0,0 +1,97 @@
+// $Id$
+
+#include "ace/Log_Msg.h"
+#include "ace/OS_Memory.h"
+#include "CommandStream.h"
+#include "Command.h"
+#include "CommandModule.h"
+#include "CommandTasks.h"
+
+// Gotcha: superclass' open() won't open head/tail modules
+// Gotcha!! Must open the stream before pushing modules!
+
+// Listing 01 code/ch18
+int CommandStream::open (void *arg,
+ ACE_Module<ACE_MT_SYNCH> *head,
+ ACE_Module<ACE_MT_SYNCH> *tail)
+{
+ ACE_TRACE (ACE_TEXT ("CommandStream::open(peer)"));
+
+ if (this->inherited::open (arg, head, tail) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Failed to open superclass")),
+ -1);
+ // Listing 01
+
+ // Listing 02 code/ch18
+ CommandModule *answerCallModule;
+ ACE_NEW_RETURN (answerCallModule,
+ AnswerCallModule (this->peer_),
+ -1);
+
+ CommandModule *retrieveCallerIdModule;
+ ACE_NEW_RETURN (retrieveCallerIdModule,
+ RetrieveCallerIdModule (this->peer_),
+ -1);
+
+ CommandModule *playMessageModule;
+ ACE_NEW_RETURN (playMessageModule,
+ PlayMessageModule (this->peer_),
+ -1);
+
+ CommandModule *recordMessageModule;
+ ACE_NEW_RETURN (recordMessageModule,
+ RecordMessageModule (this->peer_),
+ -1);
+ // Listing 02
+
+ // Listing 03 code/ch18
+ if (this->push (answerCallModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ answerCallModule->name()),
+ -1);
+
+ if (this->push (retrieveCallerIdModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ retrieveCallerIdModule->name()),
+ -1);
+
+ if (this->push (playMessageModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ playMessageModule->name()),
+ -1);
+
+ if (this->push (recordMessageModule) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Failed to push %p\n"),
+ recordMessageModule->name()),
+ -1);
+ // Listing 03
+ return 0;
+}
+
+// Listing 04 code/ch18
+Command *CommandStream::execute (Command *command)
+{
+ ACE_Message_Block *mb;
+ ACE_NEW_RETURN (mb, ACE_Message_Block (command), 0);
+ if (this->put (mb) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("Fail on put command %d: %p\n"),
+ command->command_,
+ ACE_TEXT ("")),
+ 0);
+
+ this->get (mb);
+ command = (Command *)mb->data_block ();
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Command (%d) returns (%d)\n"),
+ command->command_,
+ command->numeric_result_));
+
+ return command;
+}
+// Listing 04
diff --git a/ACE/examples/APG/Streams/CommandStream.h b/ACE/examples/APG/Streams/CommandStream.h
new file mode 100644
index 00000000000..97e9e673f7c
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandStream.h
@@ -0,0 +1,44 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef COMMAND_STREAM_H
+#define COMMAND_STREAM_H
+
+#include "ace/Module.h"
+#include "ace/Stream.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/Synch_Traits.h"
+
+#include "Command.h"
+
+// A CommandStream is a bidirectional ACE_Stream implementing a chain
+// of commands. A message will move down the stream until a
+// CommandModule is capable of processing it. After processing, it
+// will move on down the stream to the end. Data received from the
+// tail will likewise move up the stream until the downstream's
+// partner module is encoutered. The retrieved data will be processed
+// and continue on up the stream.
+
+// Listing 01 code/ch18
+class CommandStream : public ACE_Stream<ACE_MT_SYNCH>
+{
+public:
+ typedef ACE_Stream<ACE_MT_SYNCH> inherited;
+
+ CommandStream (ACE_SOCK_Stream *peer)
+ : inherited (), peer_(peer) { }
+
+ virtual int open (void *arg,
+ ACE_Module<ACE_MT_SYNCH> *head = 0,
+ ACE_Module<ACE_MT_SYNCH> *tail = 0);
+
+ Command *execute (Command *command);
+
+private:
+ CommandStream () { }
+
+ ACE_SOCK_Stream *peer_;
+};
+// Listing 01
+
+#endif /* COMMAND_STREAM_H */
diff --git a/ACE/examples/APG/Streams/CommandTask.cpp b/ACE/examples/APG/Streams/CommandTask.cpp
new file mode 100644
index 00000000000..7ad63166ffd
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandTask.cpp
@@ -0,0 +1,153 @@
+// $Id$
+
+#include "CommandTask.h"
+
+// Listing 01 code/ch18
+CommandTask::CommandTask (int command)
+ : inherited (), command_(command)
+{ }
+// Listing 01
+
+// Listing 02 code/ch18
+int CommandTask::open (void *)
+{
+ return this->activate ();
+}
+// Listing 02
+
+// Listing 03 code/ch18
+int CommandTask::put (ACE_Message_Block *message,
+ ACE_Time_Value *timeout)
+{
+ return this->putq (message, timeout);
+}
+// Listing 03
+
+// Listing 04 code/ch18
+int CommandTask::process (Command *)
+{
+ ACE_TRACE (ACE_TEXT ("CommandTask::process()"));
+ return Command::RESULT_FAILURE;
+}
+// Listing 04
+
+// Listing 05 code/ch18
+int CommandTask::close (u_long flags)
+{
+ int rval = 0;
+ if (flags == 1)
+ {
+ ACE_Message_Block *hangup = new ACE_Message_Block;
+ hangup->msg_type (ACE_Message_Block::MB_HANGUP);
+ if (this->putq (hangup->duplicate ()) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Task::close() putq")),
+ -1);
+ }
+
+ hangup->release ();
+ rval = this->wait ();
+ }
+
+ return rval;
+}
+// Listing 05
+
+// Listing 06 code/ch18
+// Listing 061 code/ch18
+int CommandTask::svc (void)
+{
+ ACE_Message_Block *message;
+
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("CommandTask::svc() - ")
+ ACE_TEXT ("%s waiting for work\n"),
+ this->module ()->name ()));
+
+ if (this->getq (message) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("getq")),
+ -1);
+
+ if (message->msg_type () == ACE_Message_Block::MB_HANGUP)
+ {
+ if (this->putq (message->duplicate ()) == -1)
+ {
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("Task::svc() putq")),
+ -1);
+ }
+
+ message->release ();
+ break;
+ }
+ // Listing 061
+
+ // Listing 062 code/ch18
+ Command *command = (Command *)message->data_block ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("CommandTask::svc() - ")
+ ACE_TEXT ("%s got work request %d\n"),
+ this->module ()->name (),
+ command->command_));
+
+ if (command->command_ != this->command_)
+ {
+ this->put_next (message->duplicate ());
+ }
+ // Listing 062
+ // Listing 063 code/ch18
+ else
+ {
+ int result = this->process (command);
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("CommandTask::svc() - ")
+ ACE_TEXT ("%s work request %d result is %d\n"),
+ this->module ()->name (),
+ command->command_,
+ result));
+
+ if (result == Command::RESULT_FAILURE)
+ {
+ command->numeric_result_ = -1;
+ }
+ // Listing 063
+ // Listing 064 code/ch18
+ else if (result == Command::RESULT_PASS)
+ {
+ this->put_next (message->duplicate ());
+ }
+ // Listing 064
+ // Listing 065 code/ch18
+ else // result == Command::RESULT_SUCCESS
+ {
+ if (this->is_writer ())
+ {
+ this->sibling ()->putq
+ (message->duplicate ());
+ }
+ // Listing 065
+ // Listing 066 code/ch18
+ else // this->is_reader ()
+ {
+ this->put_next (message->duplicate ());
+ }
+ // Listing 066
+ } // result == ...
+ } // command->command_ ? = this->command_
+
+ // Listing 067 code/ch18
+ message->release ();
+ } // for (;;)
+
+ return 0;
+}
+// Listing 067
+// Listing 06
diff --git a/ACE/examples/APG/Streams/CommandTask.h b/ACE/examples/APG/Streams/CommandTask.h
new file mode 100644
index 00000000000..ae78017b0f9
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandTask.h
@@ -0,0 +1,39 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef COMMAND_TASK_H
+#define COMMAND_TASK_H
+
+#include "ace/Task.h"
+#include "ace/Module.h"
+
+#include "Command.h"
+
+// Listing 01 code/ch18
+class CommandTask : public ACE_Task<ACE_MT_SYNCH>
+{
+public:
+ typedef ACE_Task<ACE_MT_SYNCH> inherited;
+
+ virtual ~CommandTask () { }
+
+ virtual int open (void * = 0 );
+
+ int put (ACE_Message_Block *message,
+ ACE_Time_Value *timeout);
+
+ virtual int svc (void);
+
+ virtual int close (u_long flags);
+
+protected:
+ CommandTask (int command);
+
+ virtual int process (Command *message);
+
+ int command_;
+};
+// Listing 01
+
+
+#endif /* COMMAND_TASK_H */
diff --git a/ACE/examples/APG/Streams/CommandTasks.cpp b/ACE/examples/APG/Streams/CommandTasks.cpp
new file mode 100644
index 00000000000..78a5e2de451
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandTasks.cpp
@@ -0,0 +1,221 @@
+// $Id$
+
+#include "ace/FILE_Addr.h"
+#include "ace/FILE_Connector.h"
+#include "ace/FILE_IO.h"
+
+#include "Command.h"
+#include "CommandTasks.h"
+#include "RecordingDevice_Text.h"
+
+// Listing 011 code/ch18
+AnswerCallModule::AnswerCallModule (ACE_SOCK_Stream *peer)
+ : CommandModule (ACE_TEXT ("AnswerCall Module"),
+ new AnswerCallDownstreamTask (),
+ new AnswerCallUpstreamTask (),
+ peer)
+{ }
+// Listing 011
+// Listing 012 code/ch18
+AnswerCallDownstreamTask::AnswerCallDownstreamTask (void)
+ : CommandTask(Command::CMD_ANSWER_CALL)
+{ }
+// Listing 012
+// Listing 013 code/ch18
+int AnswerCallDownstreamTask::process (Command *command)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (downstream)\n")));
+
+ TextListenerAcceptor *acceptor =
+ (TextListenerAcceptor *)command->extra_data_;
+
+ CommandModule *module =
+ (CommandModule*)this->module ();
+
+ command->numeric_result_ =
+ acceptor->accept (module->peer ());
+
+ acceptor->release ();
+ return Command::RESULT_SUCCESS;
+}
+// Listing 013
+// Listing 014 code/ch18
+AnswerCallUpstreamTask::AnswerCallUpstreamTask (void)
+ : CommandTask(Command::CMD_ANSWER_CALL)
+{ }
+// Listing 014
+// Listing 015 code/ch18
+int AnswerCallUpstreamTask::process (Command *)
+{
+ ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Answer Call (upstream)\n")));
+
+ return Command::RESULT_SUCCESS;
+}
+// Listing 015
+
+// Listing 021 code/ch18
+RetrieveCallerIdModule::RetrieveCallerIdModule
+ (ACE_SOCK_Stream *peer)
+ : CommandModule (ACE_TEXT ("RetrieveCallerId Module"),
+ new RetrieveCallerIdDownstreamTask (),
+ new RetrieveCallerIdUpstreamTask (),
+ peer)
+{ }
+// Listing 021
+// Listing 022 code/ch18
+RetrieveCallerIdDownstreamTask::RetrieveCallerIdDownstreamTask
+ (void)
+ : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
+{ }
+
+int RetrieveCallerIdDownstreamTask::process (Command *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Retrieving Caller ID data\n")));
+
+ return Command::RESULT_SUCCESS;
+}
+// Listing 022
+// Listing 023 code/ch18
+RetrieveCallerIdUpstreamTask::RetrieveCallerIdUpstreamTask
+ (void)
+ : CommandTask(Command::CMD_RETRIEVE_CALLER_ID)
+{ }
+
+int RetrieveCallerIdUpstreamTask::process (Command *command)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Returning Caller ID data\n")));
+
+ ACE_INET_Addr remote_addr;
+
+ CommandModule *module =
+ (CommandModule*)this->module ();
+
+ module->peer ().get_remote_addr (remote_addr);
+ ACE_TCHAR remote_addr_str[256];
+ remote_addr.addr_to_string (remote_addr_str, 256);
+ command->result_ = ACE_TString (remote_addr_str);
+
+ return Command::RESULT_SUCCESS;
+}
+// Listing 023
+
+PlayMessageModule::PlayMessageModule (ACE_SOCK_Stream *peer)
+ : CommandModule (ACE_TEXT ("PlayMessage Module"),
+ new PlayMessageDownstreamTask (),
+ new PlayMessageUpstreamTask (),
+ peer)
+{ }
+
+PlayMessageDownstreamTask::PlayMessageDownstreamTask (void)
+ : CommandTask(Command::CMD_PLAY_MESSAGE)
+{ }
+// Listing 032 code/ch18
+int PlayMessageDownstreamTask::process (Command *command)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Play Outgoing Message\n")));
+
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO file;
+
+ ACE_FILE_Addr *addr =
+ (ACE_FILE_Addr *)command->extra_data_;
+
+ if (connector.connect (file, *addr) == -1)
+ {
+ command->numeric_result_ = -1;
+ }
+ else
+ {
+ command->numeric_result_ = 0;
+
+ CommandModule *module =
+ (CommandModule*)this->module ();
+
+ char rwbuf[512];
+ ssize_t rwbytes;
+ while ((rwbytes = file.recv (rwbuf, 512)) > 0)
+ {
+ module->peer ().send_n (rwbuf, rwbytes);
+ }
+ }
+
+ return Command::RESULT_SUCCESS;
+}
+// Listing 032
+PlayMessageUpstreamTask::PlayMessageUpstreamTask (void)
+ : CommandTask(Command::CMD_PLAY_MESSAGE)
+{ }
+
+int PlayMessageUpstreamTask::process (Command *command)
+{
+ ACE_FILE_Addr * addr =
+ (ACE_FILE_Addr *)command->extra_data_;
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Outgoing message (%s) sent\n"),
+ addr->get_path_name ()));
+
+ return Command::RESULT_SUCCESS;
+}
+
+RecordMessageModule::RecordMessageModule (ACE_SOCK_Stream *peer)
+ : CommandModule (ACE_TEXT ("RecordMessage Module"),
+ new RecordMessageDownstreamTask (),
+ new RecordMessageUpstreamTask (),
+ peer)
+{ }
+
+RecordMessageDownstreamTask::RecordMessageDownstreamTask (void)
+ : CommandTask(Command::CMD_RECORD_MESSAGE)
+{ }
+
+int RecordMessageDownstreamTask::process (Command *)
+{
+ return Command::RESULT_SUCCESS;
+}
+
+RecordMessageUpstreamTask::RecordMessageUpstreamTask (void)
+ : CommandTask(Command::CMD_RECORD_MESSAGE)
+{ }
+// Listing 033 code/ch18
+int RecordMessageUpstreamTask::process (Command *command)
+{
+ // Collect whatever the peer sends and write into the
+ // specified file.
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO file;
+
+ ACE_FILE_Addr *addr =
+ (ACE_FILE_Addr *)command->extra_data_;
+
+ if (connector.connect (file, *addr) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("create file")),
+ Command::RESULT_FAILURE);
+ file.truncate (0);
+
+ CommandModule *module =
+ (CommandModule*)this->module ();
+
+ ssize_t total_bytes = 0;
+ char rwbuf[512];
+ ssize_t rwbytes;
+ while ((rwbytes = module->peer ().recv (rwbuf, 512)) > 0)
+ {
+ total_bytes += file.send_n (rwbuf, rwbytes);
+ }
+
+ file.close ();
+
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("RecordMessageUpstreamTask ")
+ ACE_TEXT ("- recorded %d byte message\n"),
+ total_bytes));
+
+ return Command::RESULT_SUCCESS;
+}
+// Listing 033
diff --git a/ACE/examples/APG/Streams/CommandTasks.h b/ACE/examples/APG/Streams/CommandTasks.h
new file mode 100644
index 00000000000..0d55d4da07b
--- /dev/null
+++ b/ACE/examples/APG/Streams/CommandTasks.h
@@ -0,0 +1,108 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef COMMAND_TASKS_H
+#define COMMAND_TASKS_H
+
+#include "ace/SOCK_Stream.h"
+
+#include "Command.h"
+#include "CommandTask.h"
+#include "CommandModule.h"
+
+// CommandModule and CommandTask objects that implement the command
+// stream functions.
+
+// Listing 011 code/ch18
+class AnswerCallModule : public CommandModule
+{
+public:
+ AnswerCallModule (ACE_SOCK_Stream * peer);
+};
+// Listing 011
+// Listing 012 code/ch18
+class AnswerCallDownstreamTask : public CommandTask
+{
+public:
+ AnswerCallDownstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+// Listing 012
+// Listing 013 code/ch18
+class AnswerCallUpstreamTask : public CommandTask
+{
+public:
+ AnswerCallUpstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+// Listing 013
+
+// Listing 02 code/ch18
+class RetrieveCallerIdModule : public CommandModule
+{
+public:
+ RetrieveCallerIdModule (ACE_SOCK_Stream *peer);
+};
+class RetrieveCallerIdDownstreamTask : public CommandTask
+{
+public:
+ RetrieveCallerIdDownstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+class RetrieveCallerIdUpstreamTask : public CommandTask
+{
+public:
+ RetrieveCallerIdUpstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+// Listing 02
+
+// Listing 03 code/ch18
+class PlayMessageModule : public CommandModule
+{
+public:
+ PlayMessageModule (ACE_SOCK_Stream *peer);
+};
+class PlayMessageDownstreamTask : public CommandTask
+{
+public:
+ PlayMessageDownstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+class PlayMessageUpstreamTask : public CommandTask
+{
+public:
+ PlayMessageUpstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+// Listing 03
+
+// Listing 04 code/ch18
+class RecordMessageModule : public CommandModule
+{
+public:
+ RecordMessageModule (ACE_SOCK_Stream *peer);
+};
+class RecordMessageDownstreamTask : public CommandTask
+{
+public:
+ RecordMessageDownstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+class RecordMessageUpstreamTask : public CommandTask
+{
+public:
+ RecordMessageUpstreamTask ();
+protected:
+ virtual int process (Command *command);
+};
+// Listing 04
+
+#endif /* COMMAND_TASKS_H */
diff --git a/ACE/examples/APG/Streams/EndTask.h b/ACE/examples/APG/Streams/EndTask.h
new file mode 100644
index 00000000000..a42eca655d9
--- /dev/null
+++ b/ACE/examples/APG/Streams/EndTask.h
@@ -0,0 +1,27 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef END_TASK_H
+#define END_TASK_H
+
+// Listing 1 code/ch18
+class EndTask : public BasicTask {
+protected:
+ virtual int process (Message *)
+ {
+ ACE_TRACE (ACE_TEXT ("EndTask::process()"));
+ return 0;
+ }
+
+ virtual int next_step (ACE_Message_Block *mb)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("EndTask::next_step() - ")
+ ACE_TEXT ("end of the line.\n")));
+ mb->release ();
+ return 0;
+ }
+};
+// Listing 1
+
+#endif /* END_TASK_H */
diff --git a/ACE/examples/APG/Streams/Makefile.am b/ACE/examples/APG/Streams/Makefile.am
new file mode 100644
index 00000000000..e23f2f22143
--- /dev/null
+++ b/ACE/examples/APG/Streams/Makefile.am
@@ -0,0 +1,52 @@
+## Process this file with automake to create Makefile.in
+##
+## $Id$
+##
+## This file was generated by MPC. Any changes made directly to
+## this file will be lost the next time it is generated.
+##
+## MPC Command:
+## /acebuilds/ACE_wrappers-repository/bin/mwc.pl -include /acebuilds/MPC/config -include /acebuilds/MPC/templates -feature_file /acebuilds/ACE_wrappers-repository/local.features -noreldefs -type automake -exclude build,Kokyu
+
+ACE_BUILDDIR = $(top_builddir)
+ACE_ROOT = $(top_srcdir)
+
+
+## Makefile.Answerer.am
+
+if BUILD_THREADS
+if !BUILD_ACE_FOR_TAO
+noinst_PROGRAMS = Answerer
+
+Answerer_CPPFLAGS = \
+ -I$(ACE_ROOT) \
+ -I$(ACE_BUILDDIR)
+
+Answerer_SOURCES = \
+ Answerer.cpp \
+ CommandModule.cpp \
+ CommandStream.cpp \
+ CommandTask.cpp \
+ CommandTasks.cpp \
+ RecordingDeviceFactory.cpp \
+ RecordingDevice_Text.cpp \
+ CommandModule.h \
+ CommandStream.h \
+ CommandTask.h \
+ CommandTasks.h \
+ RecordingDeviceFactory.h \
+ RecordingDevice_Text.h
+
+Answerer_LDADD = \
+ $(ACE_BUILDDIR)/ace/libACE.la
+
+endif !BUILD_ACE_FOR_TAO
+endif BUILD_THREADS
+
+## Clean up template repositories, etc.
+clean-local:
+ -rm -f *~ *.bak *.rpo *.sym lib*.*_pure_* core core.*
+ -rm -f gcctemp.c gcctemp so_locations *.ics
+ -rm -rf cxx_repository ptrepository ti_files
+ -rm -rf templateregistry ir.out
+ -rm -rf ptrepository SunWS_cache Templates.DB
diff --git a/ACE/examples/APG/Streams/Message.h b/ACE/examples/APG/Streams/Message.h
new file mode 100644
index 00000000000..29ddd30d5a1
--- /dev/null
+++ b/ACE/examples/APG/Streams/Message.h
@@ -0,0 +1,92 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef MESSAGE_H
+#define MESSAGE_H
+
+class RecordingDevice;
+
+class Message
+{
+public:
+ Message () : device_(0), type_(0), id_(0)
+ { }
+
+ ~Message ()
+ { }
+
+ RecordingDevice *recorder (void)
+ {
+ return this->device_;
+ }
+
+ void recorder (RecordingDevice *device)
+ {
+ this->device_ = device;
+ }
+
+ void type (MessageType *type)
+ {
+ this->type_ = type;
+ }
+
+ MessageType *type (void)
+ {
+ return this->type_;
+ }
+
+ void caller_id (CallerId *id)
+ {
+ this->id_ = id;
+ }
+
+ CallerId *caller_id (void)
+ {
+ return this->id_;
+ }
+
+ void addr (ACE_FILE_Addr &addr)
+ {
+ this->addr_ = addr;
+ }
+
+ void incoming_message (ACE_FILE_Addr &addr, MessageType *type)
+ {
+ this->addr_ = addr;
+ this->type_ = type;
+ }
+
+ ACE_FILE_Addr &addr (void)
+ {
+ return this->addr_;
+ }
+
+ int is_text (void)
+ {
+ return this->type_->is_text ();
+ }
+
+ int is_audio (void)
+ {
+ return this->type_->is_audio ();
+ }
+
+ int is_video (void)
+ {
+ return this->type_->is_video ();
+ }
+
+private:
+ RecordingDevice *device_;
+ MessageType *type_;
+ CallerId *id_;
+ ACE_FILE_Addr addr_;
+};
+
+class AudioMessage : public Message
+{ };
+
+class VideoMessage : public Message
+{ };
+
+#endif /* MESSAGE_H */
diff --git a/ACE/examples/APG/Streams/MessageInfo.h b/ACE/examples/APG/Streams/MessageInfo.h
new file mode 100644
index 00000000000..0f0f1bc60dc
--- /dev/null
+++ b/ACE/examples/APG/Streams/MessageInfo.h
@@ -0,0 +1,100 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef MESSAGE_INFO_H
+#define MESSAGE_INFO_H
+
+#include "ace/FILE_Addr.h"
+#include "ace/SString.h"
+
+/* Opaque class that represents a caller's ID */
+class CallerId
+{
+public:
+ CallerId () : id_ (ACE_TEXT ("UNKNOWN"))
+ { }
+
+ CallerId (ACE_TString id) : id_(id)
+ { }
+
+ const ACE_TCHAR * string(void)
+ {
+ return this->id_.c_str ();
+ }
+
+private:
+ ACE_TString id_;
+};
+
+class MessageType
+{
+public:
+ enum {
+ // Known video codecs
+ FIRST_VIDEO_CODEC = 1,
+
+ DIVX,
+ // ...
+ LAST_VIDEO_CODEC,
+
+ // Known audio codecs
+ FIRST_AUDIO_CODEC,
+
+ MP3,
+ RAWPCM,
+ // ...
+ LAST_AUDIO_CODEC,
+
+ // Known text codecs
+ FIRST_TEXT_CODEC,
+
+ RAWTEXT,
+ XML,
+
+ // ...
+ LAST_TEXT_CODEC,
+
+ LAST_CODEC
+ };
+
+ MessageType (int codec, ACE_FILE_Addr addr)
+ : codec_(codec), addr_(addr)
+ { }
+
+ int get_codec (void)
+ {
+ return this->codec_;
+ }
+
+ ACE_FILE_Addr &get_addr (void)
+ {
+ return this->addr_;
+ }
+
+ int is_video (void)
+ {
+ return
+ this->get_codec () > FIRST_VIDEO_CODEC &&
+ this->get_codec () < LAST_VIDEO_CODEC;
+ }
+
+ int is_audio (void)
+ {
+ return
+ this->get_codec () > FIRST_AUDIO_CODEC &&
+ this->get_codec () < LAST_AUDIO_CODEC ;
+ }
+
+ int is_text (void)
+ {
+ return
+ this->get_codec () > FIRST_TEXT_CODEC &&
+ this->get_codec () < LAST_TEXT_CODEC ;
+ }
+
+private:
+ int codec_;
+ ACE_FILE_Addr addr_;
+};
+
+# endif /* MESSAGE_INFO_H */
diff --git a/ACE/examples/APG/Streams/RecordingDevice.h b/ACE/examples/APG/Streams/RecordingDevice.h
new file mode 100644
index 00000000000..cee3d7154de
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDevice.h
@@ -0,0 +1,119 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef RECORDING_DEVICE_H
+#define RECORDING_DEVICE_H
+
+#include "ace/FILE_Addr.h"
+#include "ace/Event_Handler.h"
+#include "ace/Log_Msg.h"
+#include "ace/Reactor.h"
+#include "ace/Semaphore.h"
+
+class CallerId;
+class MessageType;
+
+class RecordingDevice
+{
+public:
+ RecordingDevice ()
+ {
+ // Initialize the semaphore so that we don't block on the
+ // first call to wait_for_activity().
+ }
+
+ virtual ~RecordingDevice ()
+ {
+ }
+
+ virtual const ACE_TCHAR *get_name (void) const
+ {
+ return ACE_TEXT ("UNKNOWN");
+ }
+
+ virtual int init (int, ACE_TCHAR *[])
+ {
+ return 0;
+ }
+
+ // Answer the incoming call
+ virtual int answer_call (void) = 0;
+
+ // Fetch some form of caller identification at the hardware level.
+ virtual CallerId *retrieve_callerId (void) = 0;
+
+ // Fetch the message at the location specified by 'addr' and play
+ // it for the caller.
+ virtual int play_message (ACE_FILE_Addr &addr) = 0;
+
+ // Record data from our physical device into the file at the
+ // specified address. Return the number of bytes recorded.
+ virtual MessageType *record_message (ACE_FILE_Addr &addr) = 0;
+
+ // Release the RecordingDevice to accept another incoming call
+ virtual void release (void)
+ {
+ this->release_semaphore ();
+ }
+
+ // Get the handler of the device so that wait_for_activity() can
+ // wait for data to arrive.
+ virtual ACE_Event_Handler *get_handler (void) const
+ {
+ return 0;
+ }
+
+ virtual RecordingDevice *wait_for_activity (void)
+ {
+ // Block on a semaphore until it says we're ready to do
+ // work.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Waiting for semaphore\n")));
+ this->acquire_semaphore ();
+
+ // Use the reactor to wait for activity on our handle
+ ACE_Reactor reactor;
+
+ ACE_Event_Handler *handler = this->get_handler ();
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Handler is %@\n"),
+ (void *)handler));
+
+ reactor.register_handler (this->get_handler (),
+ ACE_Event_Handler::READ_MASK);
+
+ reactor.handle_events ();
+ // Error-check this...
+
+ // Leave the semaphore locked so that we'll block until
+ // recording_complete() is invoked.
+
+ return this;
+ }
+
+protected:
+ void acquire_semaphore (void)
+ {
+ this->semaphore_.acquire ();
+ }
+
+ void release_semaphore (void)
+ {
+ // Reset the semaphore so that wait_for_activity() will
+ // unblock.
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("Releasing semaphore\n")));
+ this->semaphore_.release ();
+ }
+
+private:
+ ACE_Semaphore semaphore_;
+};
+
+#include "RecordingDevice_Text.h"
+#include "RecordingDevice_USRVM.h"
+#include "RecordingDevice_QC.h"
+
+#include "RecordingDeviceFactory.h"
+
+#endif /* RECORDING_DEVICE_H */
diff --git a/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp b/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp
new file mode 100644
index 00000000000..f5585e1ec0a
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDeviceFactory.cpp
@@ -0,0 +1,25 @@
+// $Id$
+
+#include "RecordingDevice.h"
+#include "RecordingDeviceFactory.h"
+#include "RecordingDevice_Text.h"
+
+RecordingDevice *RecordingDeviceFactory::instantiate (int argc,
+ ACE_TCHAR *argv[])
+{
+ RecordingDevice * device = 0;
+
+ // Determine the implementation based on the values of argv
+ // Exclude 2
+ device = new TextListenerAcceptor ();
+ // Exclude 2
+
+ // Initialize the device with the remaining parameters.
+ if (device->init (argc, argv) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("RecordingDeviceFactory::instantiate() - ")
+ ACE_TEXT ("%s->init(argc, argv)"),
+ device->get_name()),
+ 0);
+ return device;
+}
diff --git a/ACE/examples/APG/Streams/RecordingDeviceFactory.h b/ACE/examples/APG/Streams/RecordingDeviceFactory.h
new file mode 100644
index 00000000000..13485b20947
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDeviceFactory.h
@@ -0,0 +1,22 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef RECORDING_DEVICE_FACTORY_H
+#define RECORDING_DEVICE_FACTORY_H
+
+class RecordingDevice;
+
+/*
+ * A factory class that creates an appropriate RecordingDevice
+ * derivative based on command-line parameters.
+ */
+class RecordingDeviceFactory
+{
+public:
+
+ // Instantiate the appropriate RecordingDevice implementation
+ static RecordingDevice *instantiate (int argc, ACE_TCHAR *argv[]);
+};
+
+#endif /* RECORDING_DEVICE_FACTORY_H */
+
diff --git a/ACE/examples/APG/Streams/RecordingDevice_QC.h b/ACE/examples/APG/Streams/RecordingDevice_QC.h
new file mode 100644
index 00000000000..356d70afc5c
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDevice_QC.h
@@ -0,0 +1,5 @@
+// $Id$
+
+class QuickCam : public RecordingDevice
+ {
+ };
diff --git a/ACE/examples/APG/Streams/RecordingDevice_Text.cpp b/ACE/examples/APG/Streams/RecordingDevice_Text.cpp
new file mode 100644
index 00000000000..01720bb2470
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDevice_Text.cpp
@@ -0,0 +1,197 @@
+/*
+ * $Id$
+ *
+ * A RecordingDevice that listens to a socket and collects text.
+ */
+
+#include "MessageInfo.h"
+#include "RecordingDevice.h"
+#include "RecordingDevice_Text.h"
+#include "Util.h"
+
+TextListenerAcceptor::TextListenerAcceptor (void)
+ : ACE_Event_Handler(), RecordingDevice()
+{ }
+
+// ACE_Event_Handler interface
+
+int TextListenerAcceptor::open (ACE_INET_Addr &addr)
+{
+ if (this->acceptor_.open (addr, 1) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("acceptor open")),
+ -1);
+ return 0;
+}
+
+ACE_HANDLE TextListenerAcceptor::get_handle (void) const
+{
+ return this->acceptor_.get_handle ();
+}
+
+int TextListenerAcceptor::handle_input (ACE_HANDLE)
+{
+ ACE_DEBUG ((LM_INFO,
+ ACE_TEXT ("TextListenerAcceptor - connection request\n" )));
+ return 0;
+}
+
+int TextListenerAcceptor::accept (ACE_SOCK_Stream &peer)
+{
+ return this->acceptor_.accept (peer);
+}
+
+// RecordingDevice interface
+
+// Open a listening socket on the port specified by argv.
+int TextListenerAcceptor::init (int argc, ACE_TCHAR *argv[])
+{
+ ACE_UNUSED_ARG(argc);
+
+ ACE_INET_Addr addr (argv[1]);
+
+ if (this->open (addr) < 0)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("TextListener - open")),
+ -1);
+ return 0;
+}
+
+ACE_Event_Handler *TextListenerAcceptor::get_handler (void) const
+{
+ return (ACE_Event_Handler *)this;
+}
+
+RecordingDevice *TextListenerAcceptor::wait_for_activity (void)
+{
+ this->RecordingDevice::wait_for_activity ();
+ return new TextListener (this);
+}
+
+int TextListenerAcceptor::answer_call (void)
+{
+ return -1;
+}
+
+CallerId *TextListenerAcceptor::retrieve_callerId (void)
+{
+ return 0;
+}
+
+int TextListenerAcceptor::play_message (ACE_FILE_Addr &addr)
+{
+ ACE_UNUSED_ARG(addr);
+ return 0;
+}
+
+MessageType *TextListenerAcceptor::record_message (ACE_FILE_Addr &addr)
+{
+ ACE_UNUSED_ARG(addr);
+ return 0;
+}
+
+
+// Listing 01 code/ch18
+TextListener::TextListener (TextListenerAcceptor *acceptor)
+ : acceptor_(acceptor)
+{
+ ACE_TRACE ("TextListener ctor");
+
+ ACE_NEW (this->command_stream_, CommandStream (&(this->peer_)));
+ this->command_stream_->open (0);
+}
+// Listing 01
+
+const ACE_TCHAR *TextListener::get_name (void) const
+{
+ return ACE_TEXT ("TextListener");
+}
+
+// Listing 02 code/ch18
+int TextListener::answer_call (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TextListener::answer_call()\n")));
+
+ Command *c = new Command ();
+ c->command_ = Command::CMD_ANSWER_CALL;
+ c->extra_data_ = this->acceptor_;
+
+ c = this->command_stream_->execute (c);
+
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TextListener::answer_call() ")
+ ACE_TEXT ("result is %d\n"),
+ c->numeric_result_));
+
+ return c->numeric_result_;
+}
+// Listing 02
+
+// Listing 03 code/ch18
+CallerId *TextListener::retrieve_callerId (void)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ ACE_TEXT ("TextListener::retrieve_callerId()\n")));
+
+ Command *c = new Command ();
+ c->command_ = Command::CMD_RETRIEVE_CALLER_ID;
+
+ c = this->command_stream_->execute (c);
+
+ CallerId *caller_id = new CallerId (c->result_);
+ return caller_id;
+}
+// Listing 03
+
+// Listing 04 code/ch18
+int TextListener::play_message (ACE_FILE_Addr &addr)
+{
+ MessageType *type = Util::identify_message (addr);
+ if (type->is_text ())
+ {
+ Command *c = new Command ();
+ c->command_ = Command::CMD_PLAY_MESSAGE;
+ c->extra_data_ = &addr;
+ c = this->command_stream_->execute (c);
+ return c->numeric_result_;
+ }
+
+ ACE_FILE_Addr temp (ACE_TEXT ("/tmp/outgoing_message.text"));
+ ACE_FILE_IO *file;
+ if (type->is_audio ())
+ file = Util::audio_to_text (addr, temp);
+ else if (type->is_video ())
+ file = Util::video_to_text (addr, temp);
+ else
+ ACE_ERROR_RETURN
+ ((LM_ERROR, ACE_TEXT ("Invalid message type %d\n"),
+ type->get_codec ()), -1);
+ int rval = this->play_message (temp);
+ file->remove ();
+ return rval;
+}
+// Listing 04
+
+// Listing 05 code/ch18
+MessageType *TextListener::record_message (ACE_FILE_Addr &addr)
+{
+ Command *c = new Command ();
+ c->command_ = Command::CMD_RECORD_MESSAGE;
+ c->extra_data_ = &addr;
+ c = this->command_stream_->execute (c);
+ if (c->numeric_result_ == -1)
+ return 0;
+
+ return new MessageType (MessageType::RAWTEXT, addr);
+}
+// Listing 05
+
+// Listing 06 code/ch18
+void TextListener::release (void)
+{
+ delete this;
+}
+// Listing 06
diff --git a/ACE/examples/APG/Streams/RecordingDevice_Text.h b/ACE/examples/APG/Streams/RecordingDevice_Text.h
new file mode 100644
index 00000000000..a49f400d922
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDevice_Text.h
@@ -0,0 +1,84 @@
+/* -*- C++ -*- */
+/*
+ * $Id$
+ *
+ * A RecordingDevice that listens to a socket and collects text.
+ */
+
+#ifndef RECORDING_DEVICE_TEXT_H
+#define RECORDING_DEVICE_TEXT_H
+
+#include "ace/FILE_IO.h"
+#include "ace/FILE_Connector.h"
+#include "ace/SOCK_Stream.h"
+#include "ace/SOCK_Acceptor.h"
+
+#include "CommandStream.h"
+#include "MessageInfo.h"
+#include "RecordingDevice.h"
+
+class TextListenerAcceptor :
+ public ACE_Event_Handler,
+ public RecordingDevice
+{
+public:
+ TextListenerAcceptor ();
+
+ // ACE_Event_Handler interface
+
+ int open (ACE_INET_Addr &addr);
+
+ ACE_HANDLE get_handle (void) const;
+
+ int handle_input (ACE_HANDLE);
+
+ int accept (ACE_SOCK_Stream &peer);
+
+ // RecordingDevice interface
+
+ // Open a listening socket on the port specified by argv.
+ int init (int argc, ACE_TCHAR *argv[]);
+
+ ACE_Event_Handler *get_handler (void) const;
+
+ virtual RecordingDevice *wait_for_activity (void);
+
+ virtual int answer_call (void);
+
+ virtual CallerId *retrieve_callerId (void);
+
+ virtual int play_message (ACE_FILE_Addr &addr);
+
+ virtual MessageType *record_message (ACE_FILE_Addr &addr);
+
+private:
+ ACE_SOCK_Acceptor acceptor_;
+};
+
+// Listing 01 code/ch18
+class TextListener : public RecordingDevice
+{
+public:
+ TextListener (TextListenerAcceptor *acceptor);
+
+ virtual const ACE_TCHAR *get_name (void) const;
+
+ int answer_call (void);
+
+ CallerId *retrieve_callerId (void);
+
+ int play_message (ACE_FILE_Addr &addr);
+
+ MessageType *record_message (ACE_FILE_Addr &addr);
+
+ virtual void release (void);
+ // Listing 01
+ // Listing 02 code/ch18
+private:
+ CommandStream *command_stream_;
+ TextListenerAcceptor *acceptor_;
+ ACE_SOCK_Stream peer_;
+};
+// Listing 02
+
+#endif /* RECORDING_DEVICE_TEXT_H */
diff --git a/ACE/examples/APG/Streams/RecordingDevice_USRVM.h b/ACE/examples/APG/Streams/RecordingDevice_USRVM.h
new file mode 100644
index 00000000000..7519f7c1c84
--- /dev/null
+++ b/ACE/examples/APG/Streams/RecordingDevice_USRVM.h
@@ -0,0 +1,5 @@
+// $Id$
+
+class USRoboticsVoiceModem : public RecordingDevice
+ {
+ };
diff --git a/ACE/examples/APG/Streams/Util.h b/ACE/examples/APG/Streams/Util.h
new file mode 100644
index 00000000000..d47a699aee7
--- /dev/null
+++ b/ACE/examples/APG/Streams/Util.h
@@ -0,0 +1,92 @@
+/* -*- C++ -*- */
+// $Id$
+
+#ifndef UTIL_H
+#define UTIL_H
+
+#include "ace/FILE_Addr.h"
+#include "ace/FILE_Connector.h"
+#include "ace/FILE_IO.h"
+
+class Util
+{
+public:
+ static MessageType *identify_message (ACE_FILE_Addr &src)
+ {
+ // Determine the contents of the specified file
+ return new MessageType (MessageType::RAWTEXT, src);
+ }
+
+ static ACE_FILE_IO *audio_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest)
+ {
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO *file = 0;
+ if (connector.connect (*file, dest) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("create file")),
+ 0);
+
+ // Convert audio data to printable text
+
+ return file;
+ }
+
+ static ACE_FILE_IO *video_to_text (ACE_FILE_Addr &, ACE_FILE_Addr &dest)
+ {
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO *file = 0;
+ if (connector.connect (*file, dest) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("create file")),
+ 0);
+
+ // Extract audio data from video file and convert to printable text
+ return file;
+ }
+
+ static int convert_to_unicode (ACE_FILE_Addr &src, ACE_FILE_Addr &dest)
+ {
+ ACE_FILE_Connector connector;
+ ACE_FILE_IO input;
+ if (connector.connect (input, src) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("read file")),
+ 0);
+ ACE_FILE_IO output;
+ if (connector.connect (output, dest) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR,
+ ACE_TEXT ("%p\n"),
+ ACE_TEXT ("create file")),
+ 0);
+
+ char rwbuf[512];
+ ssize_t rwbytes;
+ while ((rwbytes = input.recv (rwbuf, 512)) > 0)
+ {
+ output.send_n (rwbuf, rwbytes);
+ }
+
+ input.close ();
+ output.close ();
+
+ // Convert arbirary text to unicode
+ return 0;
+ }
+
+ static int convert_to_mp3 (ACE_FILE_Addr &, ACE_FILE_Addr &)
+ {
+ // Convert arbitrary audio data to some standard mp3 format
+ return 0;
+ }
+
+ static int convert_to_mpeg (ACE_FILE_Addr &, ACE_FILE_Addr &)
+ {
+ // Convert arbitrary vidio data to some standard mpeg format
+ return 0;
+ }
+};
+
+#endif /* UTIL_H */
diff --git a/ACE/examples/APG/Streams/streams.mpc b/ACE/examples/APG/Streams/streams.mpc
new file mode 100644
index 00000000000..df74b446031
--- /dev/null
+++ b/ACE/examples/APG/Streams/streams.mpc
@@ -0,0 +1,17 @@
+// -*- MPC -*-
+// $Id$
+
+project(Answerer) : aceexe {
+ avoids += ace_for_tao
+ exename = Answerer
+ requires += threads
+ Source_Files {
+ Answerer.cpp
+ CommandModule.cpp
+ CommandStream.cpp
+ CommandTask.cpp
+ CommandTasks.cpp
+ RecordingDeviceFactory.cpp
+ RecordingDevice_Text.cpp
+ }
+}