diff options
47 files changed, 2188 insertions, 570 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b index c47050767a9..38dd2bd7d7a 100644 --- a/ChangeLog-96b +++ b/ChangeLog-96b @@ -1,9 +1,37 @@ +Sun Dec 29 18:38:03 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * apps/Gateway/Gateway/File_Parser.h: Removed the endofline() + method declaration -- it doesn't seem to be defined anywhere. + + * ace/OS.h: Added an ACE_INT32 to complement the ACE_UINT32. + + * netsvcs/lib: Cleaned up all the ACE network services by removing + their SIGINT signal handler. This was interferring with the + main event loop's ability to shutdown... + + * apps/Gateway/Gateway: Once again changed the name of + *IO_Handler* to *Proxy_Handler* since these things are really + proxies, in the COS sense! + + * ace/Service_Record.cpp: Tidied up the implementation of + ACE_Module_Type::fini() so that it doesn't try to call fini() on + NULL pointers. Also, rather than explicitly deleting the reader + and writer Tasks, we call ACE_Module<>::close(), which knows how + to take care of all this stuff. + + * ace/Module.cpp: Added an extra parameter to close_i() so that we + can correctly pass the value of "flags" from close() in order to + prevent deleting tasks when we don't want to do this. + + * ace/Module.cpp: There was a bug in the open() method + since we were potentially deleting reader_q and writer_q twice + if memory allocation failed. + Sat Dec 28 19:02:13 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> * ace/ACE.cpp: Changed the implementation of ldfind() so that it - doesn't try to add the DLL prefix (e.g., "lib") unless given a - relative pathname. In other words, we never try to modify the - prefix for an absolute pathname. This is necessary to keep lots + doesn't try to add the DLL prefix (e.g., "lib") unless it + doesn't match filename. This is necessary to keep lots of existing svc.conf files from breaking. * ace/Event_Handler.h: Added a new ACCEPT_MASK for use with the diff --git a/ace/ACE.cpp b/ace/ACE.cpp index 353f0ebcb0c..817bd107d9c 100644 --- a/ace/ACE.cpp +++ b/ace/ACE.cpp @@ -93,6 +93,23 @@ ACE::hash_pjw (const char *str) return hash; } +size_t +ACE::strrepl (char *s, char search, char replace) +{ + ACE_TRACE ("ACE::strrepl"); + + size_t replaced = 0; + + for (size_t i = 0; s[i] != '\0'; i++) + if (s[i] == search) + { + s[i] = replace; + replaced++; + } + + return replaced; +} + char * ACE::strenvdup (const char *str) { @@ -136,7 +153,7 @@ ACE::ldfind (const char filename[], char tempfilename[MAXPATHLEN]; char searchfilename[MAXPATHLEN]; - // Create a working copy of filename to mess with + // Create a copy of filename to work with. if (ACE_OS::strlen (filename) + 1 > sizeof tempcopy) { errno = ENOMEM; @@ -148,17 +165,12 @@ ACE::ldfind (const char filename[], // Insert canonical directory separators. char *separator_ptr; - for (separator_ptr = tempcopy; - *separator_ptr != '\0'; - separator_ptr++) - if (*separator_ptr == '\\' - || *separator_ptr == ACE_DIRECTORY_SEPARATOR_CHAR) - *separator_ptr = '/'; - - int got_prefix = 0; + if (ACE_DIRECTORY_SEPARATOR_CHAR != '/') + // Make all the directory separators ``canonical'' to simplify + // subsequent code. + ACE::strrepl (tempcopy, ACE_DIRECTORY_SEPARATOR_CHAR, '/'); // Separate filename from pathname. - separator_ptr = ACE_OS::strrchr (tempcopy, '/'); // This is a relative path. @@ -166,13 +178,6 @@ ACE::ldfind (const char filename[], { searchpathname[0] = '\0'; ACE_OS::strcpy (tempfilename, tempcopy); - - // Note that we only try to set the prefix if we're dealing with - // a relative path. - if (ACE_OS::strlen (ACE_DLL_PREFIX) == 0 - || ACE_OS::strncmp (tempfilename, ACE_DLL_PREFIX, - ACE_OS::strlen (ACE_DLL_PREFIX) == 0)) - got_prefix = 1; } else // This is an absolute path. { @@ -181,37 +186,32 @@ ACE::ldfind (const char filename[], ACE_OS::strcpy (searchpathname, tempcopy); } - // Determine how the filename needs to be decorated. - int got_suffix = 0; // Check to see if this has an appropriate DLL suffix for the OS // platform. char *s = ACE_OS::strrchr (tempfilename, '.'); - if (s != 0 && ACE_OS::strcmp (s + 1, ACE_DLL_SUFFIX)) - got_suffix = 1; - // Create the properly decorated filename. + if (s != 0) + { + // Check whether this matches the appropriate platform-specific suffix. + if (ACE_OS::strcmp (s + 1, ACE_DLL_SUFFIX) == 0) + got_suffix = 1; + else + ACE_ERROR ((LM_WARNING, + "Warning: improper suffix for a shared library on this platform: %s\n", + s)); + } + + // Make sure we've got enough space in tempfilename. if (ACE_OS::strlen (tempfilename) + - got_prefix ? 0 : ACE_OS::strlen (ACE_DLL_PREFIX) + + ACE_OS::strlen (ACE_DLL_PREFIX) + got_suffix ? 0 : ACE_OS::strlen (ACE_DLL_SUFFIX) >= sizeof searchfilename) { errno = ENOMEM; return -1; } - else - ::sprintf (searchfilename, "%s%s%s", - got_prefix ? "" : ACE_DLL_PREFIX, - tempfilename, - got_suffix ? "" : ACE_DLL_SUFFIX); - - if (ACE_OS::strcmp (searchfilename - + ACE_OS::strlen (searchfilename) - ACE_OS::strlen (ACE_DLL_SUFFIX), - ACE_DLL_SUFFIX)) - ACE_ERROR ((LM_NOTICE, - "CAUTION: improper name for a shared library on this patform: %s\n", - searchfilename)); - + // Use absolute pathname if there is one. if (ACE_OS::strlen (searchpathname) > 0) { @@ -223,16 +223,27 @@ ACE::ldfind (const char filename[], } else { - - // Revert to native path name separators - for (separator_ptr = searchpathname; - *separator_ptr != '\0'; - separator_ptr++) - if (*separator_ptr == '/') - *separator_ptr = ACE_DIRECTORY_SEPARATOR_CHAR; - - ::sprintf (pathname, "%s%s", searchpathname, searchfilename); - return 0; + if (ACE_DIRECTORY_SEPARATOR_CHAR != '/') + // Revert to native path name separators + ACE::strrepl (searchpathname, '/', ACE_DIRECTORY_SEPARATOR_CHAR); + + // First, try matching the filename *without* adding a + // prefix. + ::sprintf (pathname, "%s%s%s", + searchpathname, + tempfilename, + got_suffix ? "" : ACE_DLL_SUFFIX); + if (ACE_OS::access (pathname, F_OK) == 0) + return 0; + + // Second, try matching the filename *with* adding a prefix. + ::sprintf (pathname, "%s%s%s%s", + searchpathname, + ACE_DLL_PREFIX, + tempfilename, + got_suffix ? "" : ACE_DLL_SUFFIX); + if (ACE_OS::access (pathname, F_OK) == 0) + return 0; } } @@ -259,15 +270,28 @@ ACE::ldfind (const char filename[], result = -1; break; } - ACE_OS::sprintf (pathname, "%s%c%s", + + // First, try matching the filename *without* adding a + // prefix. + ACE_OS::sprintf (pathname, "%s%c%s", path_entry, ACE_DIRECTORY_SEPARATOR_CHAR, searchfilename); + if (ACE_OS::access (pathname, F_OK) == 0) + break; - if (ACE_OS::access (pathname, R_OK) == 0) + // Second, try matching the filename *with* adding a + // prefix. + ACE_OS::sprintf (pathname, "%s%c%s%s", + path_entry, + ACE_DIRECTORY_SEPARATOR_CHAR, + ACE_DLL_PREFIX, + searchfilename); + if (ACE_OS::access (pathname, F_OK) == 0) break; - path_entry = ACE_OS::strtok (0, - ACE_LD_SEARCH_PATH_SEPARATOR_STR); + + path_entry = ACE_OS::strtok + (0, ACE_LD_SEARCH_PATH_SEPARATOR_STR); } ACE_OS::free ((void *) ld_path); diff --git a/ace/ACE.h b/ace/ACE.h index 3bd4202f45f..0d698ecca77 100644 --- a/ace/ACE.h +++ b/ace/ACE.h @@ -223,6 +223,10 @@ public: // Copies <t> to <s>, returning a pointer to the end of the copied // region (rather than the beginning, a la <strcpy>. + static size_t strrepl (char *s, char search, char replace); + // Replace all instances of <search> in <s> with <replace>. Returns + // the number of replacements made. + static const char *execname (const char *pathname); // On Win32 returns <pathname> if it already ends in ".exe," // otherwise returns a dynamically allocated buffer containing @@ -254,9 +258,9 @@ public: // a relative path in conjunction with ACE_LD_SEARCH_PATH (e.g., // $LD_LIBRARY_PATH on UNIX or $PATH on Win32). This function will // add appropriate suffix (e.g., .dll on Win32 or .so on UNIX) - // according to the OS platform. In addition, if a relative path is - // used, this function will prefix the appropriate prefix (e.g., - // "lib" on UNIX and "" on Win32). + // according to the OS platform. In addition, this function will + // apply the appropriate prefix (e.g., "lib" on UNIX and "" on + // Win32) if the <filename> doesn't match directly. static FILE *ldopen (const char *filename, const char *type); // Uses <ldopen> to locate and open the appropriate <filename> and diff --git a/ace/Module.cpp b/ace/Module.cpp index d38df5b7a7f..c0d30dc1ccf 100644 --- a/ace/Module.cpp +++ b/ace/Module.cpp @@ -27,13 +27,13 @@ ACE_Module<ACE_SYNCH_2>::writer (ACE_Task<ACE_SYNCH_2> *q, ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::writer"); // Close and maybe delete old writer - this->close_i (1); + this->close_i (1, flags); this->q_pair_[1] = q; if (q != 0) ACE_CLR_BITS (q->flags_, ACE_Task_Flags::ACE_READER); - // don't allow the caller to change the reader status + // Don't allow the caller to change the reader status. ACE_SET_BITS (flags_, (flags & M_DELETE_WRITER)); } @@ -44,7 +44,7 @@ ACE_Module<ACE_SYNCH_2>::reader (ACE_Task<ACE_SYNCH_2> *q, ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::reader"); // Close and maybe delete old writer - this->close_i (0); + this->close_i (0, flags); this->q_pair_[0] = q; if (q != 0) @@ -76,23 +76,24 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name, this->name (mod_name); this->arg_ = arg; - // we may already have readers and/or writers + // We may already have readers and/or writers. if (this->reader ()) - this->close_i (0); + this->close_i (0, M_DELETE_READER); if (this->writer ()) - this->close_i (1); - + this->close_i (1, M_DELETE_WRITER); - if (writer_q == 0) { - writer_q = new ACE_Thru_Task<ACE_SYNCH_2>; - ACE_SET_BITS (flags, M_DELETE_WRITER); - } + if (writer_q == 0) + { + writer_q = new ACE_Thru_Task<ACE_SYNCH_2>; + ACE_SET_BITS (flags, M_DELETE_WRITER); + } - if (reader_q == 0) { - reader_q = new ACE_Thru_Task<ACE_SYNCH_2>; - ACE_SET_BITS (flags, M_DELETE_READER); - } + if (reader_q == 0) + { + reader_q = new ACE_Thru_Task<ACE_SYNCH_2>; + ACE_SET_BITS (flags, M_DELETE_READER); + } this->reader (reader_q); this->writer (writer_q); @@ -102,21 +103,20 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name, writer_q->mod_ = this; // Save the flags - ACE_SET_BITS (flags_, flags); + this->flags_ = flags; // Make sure that the memory is allocated before proceding. if (writer_q == 0 || reader_q == 0) { - this->close_i (0); - this->close_i (1); + // These calls will delete writer_q and/or reader_q, if + // necessary. + this->close_i (0, M_DELETE_READER); + this->close_i (1, M_DELETE_WRITER); // Reset back pointers. - reader_q->mod_ = NULL; - writer_q->mod_ = NULL; + reader_q->mod_ = 0; + writer_q->mod_ = 0; - delete writer_q; - delete reader_q; - errno = ENOMEM; return -1; } @@ -140,6 +140,7 @@ ACE_Module<ACE_SYNCH_2>::sibling (ACE_Task<ACE_SYNCH_2> *orig) template <ACE_SYNCH_1> ACE_Module<ACE_SYNCH_2>::ACE_Module (void) + : flags_ (0) { ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::ACE_Module"); this->name ("<unknown>"); @@ -164,6 +165,7 @@ ACE_Module<ACE_SYNCH_2>::ACE_Module (const char *mod_name, ACE_Task<ACE_SYNCH_2> *reader_q, void *args, int flags /* = M_DELETE */) + : flags_ (0) { ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::ACE_Module"); @@ -183,21 +185,22 @@ ACE_Module<ACE_SYNCH_2>::close (int flags /* = M_DELETE_NONE */) ACE_SET_BITS (flags_, flags); - if (this->close_i (0) == -1) + if (this->close_i (0, flags) == -1) result = -1; - if (this->close_i (1) == -1) + if (this->close_i (1, flags) == -1) result = -1; return result; } template <ACE_SYNCH_1> int -ACE_Module<ACE_SYNCH_2>::close_i (int which) +ACE_Module<ACE_SYNCH_2>::close_i (int which, + int flags) { ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::close_i"); - if (this->q_pair_[which] == NULL) + if (this->q_pair_[which] == 0) return 0; // Copy task pointer to prevent problems when ACE_Task::close @@ -206,26 +209,28 @@ ACE_Module<ACE_SYNCH_2>::close_i (int which) // Change so that close doesn't get called again from the task base. - // Now close the task + // Now close the task. int result = 0; - if (task->module_closed() == -1) + if (task->module_closed () == -1) result = -1; task->flush (); task->next (0); // Should we also delete it ? - if (ACE_BIT_ENABLED (flags_, which+1) ) { - // Only delete the Tasks if there aren't any more threads - // running in them. - if (task->thr_count() == 0) - delete task; - } - - // Set the tasks pointer to NULL so that we don't try to close() + if (flags != M_DELETE_NONE + && ACE_BIT_ENABLED (flags_, which + 1)) + { + // Only delete the Tasks if there aren't any more threads + // running in them. + if (task->thr_count () == 0) + delete task; + } + + // Set the tasks pointer to 0 so that we don't try to close() // this object again if the destructor gets called. - this->q_pair_[which] = NULL; + this->q_pair_[which] = 0; // Finally remove the delete bit. ACE_CLR_BITS (flags_, which + 1); diff --git a/ace/Module.h b/ace/Module.h index f587ef7744f..d2bd2c608f0 100644 --- a/ace/Module.h +++ b/ace/Module.h @@ -49,7 +49,8 @@ public: M_DELETE = 3 // Indicates that close() deletes the Tasks. Don't change this // value without updating the same enum in class ACE_Stream... - // The above flags may be or'ed together. + // The <M_DELETE_READER> and <M_DELETE_WRITER> flags may be or'ed + // together. }; // = Initialization and termination methods. @@ -63,7 +64,7 @@ public: ACE_Task<ACE_SYNCH_2> *writer = 0, ACE_Task<ACE_SYNCH_2> *reader = 0, void *args = 0, - int flags = M_DELETE); + int flags = M_DELETE); // Create an initialized module with <module_name> as its identity // and <reader> and <writer> as its tasks. @@ -71,20 +72,19 @@ public: ACE_Task<ACE_SYNCH_2> *writer = 0, ACE_Task<ACE_SYNCH_2> *reader = 0, void *a = 0, - int flags = M_DELETE); + int flags = M_DELETE); // Create an initialized module with <module_name> as its identity - // and <reader> and <writer> as its tasks. - // Previously register reader or writers or closed down and deleted - // according to the value of flags_. - // Should not be called from within ACE_Task::module_closed() + // and <reader> and <writer> as its tasks. Previously register + // reader or writers or closed down and deleted according to the + // value of flags_. Should not be called from within + // ACE_Task::module_closed(). int close (int flags = M_DELETE_NONE); // Close down the Module and its Tasks. The flags argument can be - // used to override the default behaviour, which depends on - // previous <flags> values in calls to c'tor(), open(), reader() and - // writer(). - // A previous value M_DELETE[_XXX] can not be overridden. - // Should not be called from within ACE_Task::module_closed() + // used to override the default behaviour, which depends on previous + // <flags> values in calls to c'tor(), open(), reader() and + // writer(). A previous value M_DELETE[_XXX] can not be overridden. + // Should not be called from within ACE_Task::module_closed(). // = ACE_Task manipulation routines ACE_Task<ACE_SYNCH_2> *writer (void); @@ -92,23 +92,23 @@ public: void writer (ACE_Task<ACE_SYNCH_2> *q, int flags = M_DELETE_WRITER); // Set the writer task. <flags> can be used to indicate that the - // module should delete the writer during a call to close or - // to the destructor. If a previous writer exists, it is closed. - // It may also be deleted, depending on the old flags_ value. - // Should not be called from within ACE_Task::module_closed() + // module should delete the writer during a call to close or to the + // destructor. If a previous writer exists, it is closed. It may + // also be deleted, depending on the old flags_ value. Should not + // be called from within ACE_Task::module_closed(). ACE_Task<ACE_SYNCH_2> *reader (void); // Get the reader task. void reader (ACE_Task<ACE_SYNCH_2> *q, int flags = M_DELETE_READER); // Set the reader task. <flags> can be used to indicate that the - // module should delete the reader during a call to close or - // to the destructor. If a previous reader exists, it is closed. - // It may also be deleted, depending on the old flags_ value. - // Should not be called from within ACE_Task::module_closed() + // module should delete the reader during a call to close or to the + // destructor. If a previous reader exists, it is closed. It may + // also be deleted, depending on the old flags_ value. Should not + // be called from within ACE_Task::module_closed() ACE_Task<ACE_SYNCH_2> *sibling (ACE_Task<ACE_SYNCH_2> *orig); - // Set and get pointer to sibling ACE_Task in ACE_Module + // Set and get pointer to sibling ACE_Task in ACE_Module. // = Identify the module const char *name (void) const; @@ -139,9 +139,9 @@ public: // Declare the dynamic allocation hooks. private: - int close_i(int which); - // Implements the close operation for either the reader - // or the writer task (depending on <which>) + int close_i (int which, int flags); + // Implements the close operation for either the reader or the + // writer task (depending on <which>). ACE_Task<ACE_SYNCH_2> *q_pair_[2]; // Pair of Tasks that form the "read-side" and "write-side" of the @@ -1699,8 +1699,10 @@ typedef fd_set ACE_FD_SET_TYPE; // Necessary to support the Alphas, which have 64 bit longs and 32 bit // ints... typedef u_int ACE_UINT32; +typedef int ACE_INT32; #else typedef u_long ACE_UINT32; +typedef long ACE_INT32; #endif /* ACE_HAS_64BIT_LONGS */ #if !defined (ETIMEDOUT) && defined (ETIME) diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp index 6e0f4438ad5..bfa8ddbd2f6 100644 --- a/ace/Proactor.cpp +++ b/ace/Proactor.cpp @@ -446,6 +446,7 @@ ACE_Proactor::cancel_io (ACE_Event_Handler *handler) #if defined (ACE_WIN32) && defined (ACE_HAS_CANCEL_IO) return ::CancelIO (handler->get_handle ()) ? -1 : 0; #else + ACE_UNUSED_ARG(handler); return 0; #endif /* ACE_WIN32 */ } diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h index 0cf84d67de4..e5903fb5eeb 100644 --- a/ace/ReactorEx.h +++ b/ace/ReactorEx.h @@ -14,8 +14,8 @@ // // ============================================================================ -#if !defined (ACE_ReactorEx_H) -#define ACE_ReactorEx_H +#if !defined (ACE_REACTOREX_H) +#define ACE_REACTOREX_H #include "ace/Time_Value.h" #include "ace/Timer_Queue.h" @@ -291,4 +291,4 @@ public: #if defined (__ACE_INLINE__) #include "ace/ReactorEx.i" #endif /* __ACE_INLINE__ */ -#endif /* ACE_ReactorEx_H */ +#endif /* ACE_REACTOREX_H */ diff --git a/ace/Service_Main.cpp b/ace/Service_Main.cpp index b96c06fe7db..14e8fdbcd0b 100644 --- a/ace/Service_Main.cpp +++ b/ace/Service_Main.cpp @@ -26,8 +26,7 @@ sc_main (int argc, char *argv[]) // Run forever, performing the configured services until we are shut // down by a SIGINT/SIGQUIT signal. - while (daemon.reactor_event_loop_done () == 0) - daemon.run_reactor_event_loop (); + daemon.run_reactor_event_loop (); return 0; } diff --git a/ace/Service_Record.cpp b/ace/Service_Record.cpp index 53271cf49a1..21cd5462e71 100644 --- a/ace/Service_Record.cpp +++ b/ace/Service_Record.cpp @@ -103,15 +103,20 @@ int ACE_Module_Type::fini (void) const { ACE_TRACE ("ACE_Module_Type::fini"); + const void *obj = this->object (); MT_Module *mod = (MT_Module *) obj; MT_Task *reader = mod->reader (); MT_Task *writer = mod->writer (); - reader->fini (); - writer->fini (); - delete reader; - delete writer; + if (reader != 0) + reader->fini (); + + if (writer != 0) + writer->fini (); + + // Close the module and delete the memory. + mod->close (MT_Module::M_DELETE); return ACE_Service_Type::fini (); } @@ -227,7 +232,7 @@ ACE_Stream_Type::fini (void) const return ACE_Service_Type::fini (); } -// Locate and remove MOD_NAME from the ACE_Stream. +// Locate and remove <mod_name> from the ACE_Stream. int ACE_Stream_Type::remove (ACE_Module_Type *mod) @@ -337,6 +342,15 @@ ACE_Service_Record::resume (void) const this->type_->resume (); } +int +ACE_Service_Object_Type::fini (void) const +{ + ACE_TRACE ("ACE_Service_Object_Type::fini"); + ACE_Service_Object *so = (ACE_Service_Object *) this->object (); + so->fini (); + return ACE_Service_Type::fini (); +} + #if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) template class ACE_Module<ACE_SYNCH>; template class ACE_Stream<ACE_SYNCH>; diff --git a/ace/Service_Record.i b/ace/Service_Record.i index bd6d667a593..0afc4051ee2 100644 --- a/ace/Service_Record.i +++ b/ace/Service_Record.i @@ -18,15 +18,6 @@ ACE_Service_Object_Type::resume (void) const } ACE_INLINE int -ACE_Service_Object_Type::fini (void) const -{ - ACE_TRACE ("ACE_Service_Object_Type::fini"); - ACE_Service_Object *so = (ACE_Service_Object *) this->object (); - so->fini (); - return ACE_Service_Type::fini (); -} - -ACE_INLINE int ACE_Service_Object_Type::info (char **str, size_t len) const { ACE_TRACE ("ACE_Service_Object_Type::info"); diff --git a/ace/Stream.cpp b/ace/Stream.cpp index 13a1874ed51..d345c0f7662 100644 --- a/ace/Stream.cpp +++ b/ace/Stream.cpp @@ -23,7 +23,9 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::dump"); ACE_DEBUG ((LM_DEBUG, "-------- module links --------\n")); - for (ACE_Module<ACE_SYNCH_2> *mp = this->stream_head_; ; mp = mp->next ()) + for (ACE_Module<ACE_SYNCH_2> *mp = this->stream_head_; + ; + mp = mp->next ()) { ACE_DEBUG ((LM_DEBUG, "module name = %s\n", mp->name ())); if (mp == this->stream_tail_) @@ -34,13 +36,16 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const ACE_Task<ACE_SYNCH_2> *tp; - for (tp = this->stream_head_->writer (); ; tp = tp->next ()) + for (tp = this->stream_head_->writer (); + ; + tp = tp->next ()) { ACE_DEBUG ((LM_DEBUG, "writer queue name = %s\n", tp->name ())); tp->dump (); ACE_DEBUG ((LM_DEBUG, "-------\n")); if (tp == this->stream_tail_->writer () - || (this->linked_us_ && tp == this->linked_us_->stream_head_->reader ())) + || (this->linked_us_ + && tp == this->linked_us_->stream_head_->reader ())) break; } @@ -51,7 +56,8 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const tp->dump (); ACE_DEBUG ((LM_DEBUG, "-------\n")); if (tp == this->stream_head_->reader () - || (this->linked_us_ && tp == this->linked_us_->stream_head_->writer ())) + || (this->linked_us_ + && tp == this->linked_us_->stream_head_->writer ())) break; } } @@ -157,12 +163,13 @@ ACE_Stream<ACE_SYNCH_2>::remove (const char *name, else prev->link (mod->next ()); - // Close down the module and release the memory. - mod->close (flags); - // Don't delete the Module unless the flags request this. if (flags != ACE_Module<ACE_SYNCH_2>::M_DELETE_NONE) - delete mod; + { + // Close down the module and release the memory. + mod->close (flags); + delete mod; + } return 0; } @@ -228,8 +235,8 @@ ACE_Stream<ACE_SYNCH_2>::push_module (ACE_Module<ACE_SYNCH_2> *new_top, #if 0 int ACE_Stream<ACE_SYNCH_2>::open (void *a, - ACE_Multiplexor &muxer, - ACE_Module<ACE_SYNCH_2> *head) + ACE_Multiplexor &muxer, + ACE_Module<ACE_SYNCH_2> *head) { ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::open"); this->stream_head_ = head == 0 diff --git a/ace/Stream.h b/ace/Stream.h index cd50967ceb7..25e756c4ddf 100644 --- a/ace/Stream.h +++ b/ace/Stream.h @@ -1,7 +1,6 @@ /* -*- C++ -*- */ // $Id$ - // ============================================================================ // // = LIBRARY diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp index 922b6b89eaa..67855016c1a 100644 --- a/ace/Svc_Handler.cpp +++ b/ace/Svc_Handler.cpp @@ -137,6 +137,7 @@ template <PR_ST_1, ACE_SYNCH_1> void ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown (void) { ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown"); + // Deregister this handler with the ACE_Reactor. if (this->reactor ()) { @@ -145,7 +146,7 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown (void) ACE_Event_Handler::DONT_CALL; // Make sure there are no timers. - this->reactor ()->cancel_timer( this ); + this->reactor ()->cancel_timer (this); // Remove self from reactor. this->reactor ()->remove_handler (this, mask); diff --git a/ace/Task.cpp b/ace/Task.cpp index 3046b40a9c3..a1217fff683 100644 --- a/ace/Task.cpp +++ b/ace/Task.cpp @@ -232,11 +232,12 @@ ACE_Task_Base::svc_run (void *args) /* NOTREACHED */ } -// Forward the call to close() so that existing -// applications don't break. +// Forward the call to close() so that existing applications don't +// break. + int ACE_Task_Base::module_closed (void) { - return this->close (1); + return this->close (1); } diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h index e2fbc934c93..8d1b2979a49 100644 --- a/apps/Gateway/Gateway/Concurrency_Strategies.h +++ b/apps/Gateway/Gateway/Concurrency_Strategies.h @@ -29,7 +29,7 @@ typedef ACE_Null_Mutex MAP_MUTEX; #else /* ACE_HAS_THREADS */ // Note that we only need to make the ACE_Task thread-safe if we are -// using the multi-threaded Thr_Consumer_Handler... +// using the multi-threaded Thr_Consumer_Proxy... #if defined (USE_OUTPUT_MT) #define SYNCH_STRATEGY ACE_MT_SYNCH #else @@ -37,7 +37,7 @@ typedef ACE_Null_Mutex MAP_MUTEX; #endif /* USE_OUTPUT_MT || USE_INPUT_MT */ // Note that we only need to make the ACE_Map_Manager thread-safe if -// we are using the multi-threaded Thr_Supplier_Handler. In this +// we are using the multi-threaded Thr_Supplier_Proxy. In this // case, we use an RW_Mutex since we'll lookup Consumers far more // often than we'll update them. #if defined (USE_INPUT_MT) @@ -48,27 +48,27 @@ typedef ACE_Null_Mutex MAP_MUTEX; #endif /* ACE_HAS_THREADS */ // = Forward decls -class Thr_Consumer_Handler; -class Thr_Supplier_Handler; -class Consumer_Handler; -class Supplier_Handler; +class Thr_Consumer_Proxy; +class Thr_Supplier_Proxy; +class Consumer_Proxy; +class Supplier_Proxy; #if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT)) #if defined (USE_OUTPUT_MT) -typedef Thr_Consumer_Handler CONSUMER_HANDLER; +typedef Thr_Consumer_Proxy CONSUMER_HANDLER; #else -typedef Consumer_Handler CONSUMER_HANDLER; +typedef Consumer_Proxy CONSUMER_HANDLER; #endif /* USE_OUTPUT_MT */ #if defined (USE_INPUT_MT) -typedef Thr_Supplier_Handler SUPPLIER_HANDLER; +typedef Thr_Supplier_Proxy SUPPLIER_HANDLER; #else -typedef Supplier_Handler SUPPLIER_HANDLER; +typedef Supplier_Proxy SUPPLIER_HANDLER; #endif /* USE_INPUT_MT */ #else // Instantiate a non-multi-threaded Gateway. -typedef Supplier_Handler SUPPLIER_HANDLER; -typedef Consumer_Handler CONSUMER_HANDLER; +typedef Supplier_Proxy SUPPLIER_HANDLER; +typedef Consumer_Proxy CONSUMER_HANDLER; #endif /* ACE_HAS_THREADS */ #endif /* _CONCURRENCY_STRATEGIES */ diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index 4c2648addf0..7e99902b0db 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -28,11 +28,11 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry, } // Get the logic id. - if ((read_result = this->getint (entry.logical_id_)) != FP::SUCCESS) + if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS) return read_result; // Get the payload type. - if ((read_result = this->getint (entry.payload_type_)) != FP::SUCCESS) + if ((read_result = this->getint (entry.type_)) != FP::SUCCESS) return read_result; // get all the destinations. @@ -104,7 +104,7 @@ int main (int argc, char *argv[]) { if (argc != 4) { // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1); - cerr << argv[0] << " CCfilename RTfilename Mapfilename.\n"; + cerr << argv[0] << " CCfilename filename Mapfilename.\n"; exit (1); } FP_RETURN_TYPE result; @@ -130,30 +130,30 @@ int main (int argc, char *argv[]) } CCfile.close(); - Consumer_Config_File_Entry RTentry; - Consumer_Config_File_Parser RTfile; + Consumer_Config_File_Entry entry; + Consumer_Config_File_Parser file; - RTfile.open (argv[2]); + file.open (argv[2]); line_number = 0; printf ("\nConnID\tLogic\tPayload\tDestinations\n"); // Read config file line at a time. - while ((result = RTfile.read_entry (RTentry, line_number)) != EOF) + while ((result = file.read_entry (entry, line_number)) != EOF) { if (result != FP::SUCCESS) cerr << "Error at line " << line_number << endl; else { printf ("%d\t%d\t%d\t%d\t", - RTentry.conn_id_, RTentry.logical_id_, RTentry.payload_type_); - while (--RTentry.total_destinations_ >= 0) - printf ("%d,", RTentry.destinations_[RTentry.total_destinations_]); + entry.conn_id_, entry.supplier_id_, entry.type_); + while (--entry.total_destinations_ >= 0) + printf ("%d,", entry.destinations_[entry.total_destinations_]); printf ("\n"); } } - RTfile.close(); + file.close(); return 0; } diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index 145c3233bae..2620301e25b 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -22,11 +22,11 @@ class Connection_Config_File_Entry // = TITLE - // Stores the IO_Handler entry for connection configuration. + // Stores the Proxy_Handler entry for connection configuration. { public: int conn_id_; - // Connection id for this IO_Handler. + // Connection id for this Proxy_Handler. char host_[BUFSIZ]; // Host to connect with. @@ -46,7 +46,7 @@ public: class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry> // = TITLE - // Parser for the IO_Handler Connection file. + // Parser for the Proxy_Handler Connection file. { public: virtual FP::Return_Type @@ -65,10 +65,10 @@ public: int conn_id_; // Connection id for this channel. - int logical_id_; + int supplier_id_; // Logical routing id for this channel. - int payload_type_; + int type_; // Type of payload in the message. int destinations_[MAX_DESTINATIONS]; diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h new file mode 100644 index 00000000000..a867f1ca5ff --- /dev/null +++ b/apps/Gateway/Gateway/Dispatch_Set.h @@ -0,0 +1,28 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Dispatch_Set.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_DISPATCH_SET) +#define _DISPATCH_SET + +#include "ace/Set.h" + +// Forward reference. +class Proxy_Handler; + +typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set; +typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator; + +#endif /* _DISPATCH_SET */ diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h index a8a9374be3c..24881c3e85b 100644 --- a/apps/Gateway/Gateway/Event.h +++ b/apps/Gateway/Gateway/Event.h @@ -17,33 +17,45 @@ #if !defined (EVENT) #define EVENT +#include "ace/OS.h" + // This is the unique connection identifier that denotes a particular -// IO_Handler in the Gateway. -typedef short CONN_ID; +// Proxy_Handler in the Gateway. +typedef ACE_INT32 ACE_INT32; class Event_Addr // = TITLE // Address used to identify the source/destination of an event. + // + // = DESCRIPTION + // This is really a "virtual forwarding address" thatis used to + // decouple the filtering and forwarding logic of the Event + // Channel from the format of the data. { public: - Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0) - : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {} - - int operator== (const Event_Addr &pa) const + Event_Addr (ACE_INT32 cid = -1, + u_char sid = 0, + u_char type = 0) + : conn_id_ (cid), + supplier_id_ (sid), + type_ (type) {} + + int operator== (const Event_Addr &event_addr) const { - return this->conn_id_ == pa.conn_id_ - && this->logical_id_ == pa.logical_id_ - && this->payload_ == pa.payload_; + return this->conn_id_ == event_addr.conn_id_ + && this->supplier_id_ == event_addr.supplier_id_ + && this->type_ == event_addr.type_; } - CONN_ID conn_id_; - // Unique connection identifier that denotes a particular IO_Handler. + ACE_INT32 conn_id_; + // Unique connection identifier that denotes a particular + // Proxy_Handler. - unsigned char logical_id_; + ACE_INT32 supplier_id_; // Logical ID. - unsigned char payload_; - // Payload type. + ACE_INT32 type_; + // Event type. }; @@ -52,24 +64,27 @@ class Event_Header // Fixed sized header. { public: - typedef unsigned short SUPPLIER_ID; - // Type used to route messages from gatewayd. + typedef ACE_INT32 SUPPLIER_ID; + // Type used to forward events from gatewayd. enum { INVALID_ID = -1 // No peer can validly use this number. }; - SUPPLIER_ID routing_id_; + SUPPLIER_ID supplier_id_; // Source ID. + ACE_INT32 type_; + // Event type. + size_t len_; - // Length of the message in bytes. + // Length of the entire event (including data payload) in bytes. }; class Event // = TITLE - // Variable-sized message (buf_ may be variable-sized between + // Variable-sized event (data_ may be variable-sized between // 0 and MAX_PAYLOAD_SIZE). { public: @@ -77,10 +92,10 @@ public: // The maximum size of an Event. Event_Header header_; - // Message header. + // Event header. - char buf_[MAX_PAYLOAD_SIZE]; - // Message payload. + char data_[MAX_PAYLOAD_SIZE]; + // Event data. }; #endif /* EVENT */ diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp index 815755216c7..d146ddfb362 100644 --- a/apps/Gateway/Gateway/Event_Channel.cpp +++ b/apps/Gateway/Gateway/Event_Channel.cpp @@ -1,9 +1,10 @@ /* -*- C++ -*- */ // $Id$ +#define ACE_BUILD_SVC_DLL #include "ace/Get_Opt.h" #include "Config_Files.h" -#include "IO_Handler_Connector.h" +#include "Proxy_Handler_Connector.h" #include "Event_Channel.h" #if !defined (ACE_EVENT_CHANNEL_C) @@ -46,18 +47,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &, size_t total_bytes_in = 0; size_t total_bytes_out = 0; - // Iterate through the consumer map connecting all the IO_Handlers. + // Iterate through the consumer map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; - if (io_handler->direction () == 'C') - total_bytes_out += io_handler->total_bytes (); - else // io_handler->direction () == 'S' - total_bytes_in += io_handler->total_bytes (); + if (proxy_handler->direction () == 'C') + total_bytes_out += proxy_handler->total_bytes (); + else // proxy_handler->direction () == 'S' + total_bytes_in += proxy_handler->total_bytes (); } #if defined (ACE_NLOGGING) @@ -108,16 +109,16 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) else synch_options = ACE_Synch_Options::synch; - // Iterate through the Consumer Map connecting all the IO_Handlers. + // Iterate through the Consumer Map connecting all the Proxy_Handlers. for (CONNECTION_MAP_ENTRY *me = 0; cti.next (me) != 0; cti.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; if (this->connector_->initiate_connection - (io_handler, synch_options) == -1) + (proxy_handler, synch_options) == -1) continue; } @@ -125,7 +126,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void) } // This method gracefully shuts down all the Handlers in the -// IO_Handler Connection Map. +// Proxy_Handler Connection Map. template <class SH, class CH> int ACE_Event_Channel<SH, CH>::close (void) @@ -136,26 +137,26 @@ ACE_Event_Channel<SH, CH>::close (void) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1); #endif /* USE_INPUT_MT || USE_OUTPUT_MT */ - CONNECTION_MAP_ITERATOR cti (this->connection_map_); + CONNECTION_MAP_ITERATOR cmi (this->connection_map_); // Iterate over all the handlers and shut them down. for (CONNECTION_MAP_ENTRY *me; - cti.next (me) != 0; - cti.advance ()) + cmi.next (me) != 0; + cmi.advance ()) { - IO_Handler *io_handler = me->int_id_; + Proxy_Handler *proxy_handler = me->int_id_; ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n", - io_handler->id ())); + proxy_handler->id ())); - if (io_handler->state () != IO_Handler::IDLE) - // Mark IO_Handler as DISCONNECTING so we don't try to + if (proxy_handler->state () != Proxy_Handler::IDLE) + // Mark Proxy_Handler as DISCONNECTING so we don't try to // reconnect... - io_handler->state (IO_Handler::DISCONNECTING); + proxy_handler->state (Proxy_Handler::DISCONNECTING); - // Deallocate IO_Handler resources. - io_handler->destroy (); // Will trigger a delete. + // Deallocate Proxy_Handler resources. + proxy_handler->destroy (); // Will trigger a delete. } // Free up the resources allocated dynamically by the ACE_Connector. @@ -168,7 +169,10 @@ ACE_Event_Channel<SH, CH>::open (int argc, char *argv[]) { this->parse_args (argc, argv); - ACE_NEW_RETURN (this->connector_, IO_Handler_Connector (), -1); + ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1); + + // Ignore SIPPIPE so each Consumer_Proxy can handle it. + ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); if (this->active_connector_role_) { @@ -226,38 +230,38 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) entry.max_retry_delay_, entry.local_poconsumer_)); - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; // The next few lines of code are dependent on whether we are - // making an Supplier_Handler or an Consumer_Handler. + // making an Supplier_Proxy or an Consumer_Proxy. - if (entry.direction_ == 'C') // Configure a Consumer_Handler. - ACE_NEW_RETURN (io_handler, - CONSUMER_HANDLER (&this->consumer_map_, + if (entry.direction_ == 'C') // Configure a Consumer_Proxy. + ACE_NEW_RETURN (proxy_handler, + CONSUMER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - else /* direction == 'S' */ // Configure a Supplier_Handler. - ACE_NEW_RETURN (io_handler, - SUPPLIER_HANDLER (&this->consumer_map_, + else /* direction == 'S' */ // Configure a Supplier_Proxy. + ACE_NEW_RETURN (proxy_handler, + SUPPLIER_HANDLER (&this->efd_, this->connector_, ACE_Service_Config::thr_mgr (), this->socket_queue_size_), -1); - // The following code is common to both Supplier_Handlers_ and - // Consumer_Handlers. + // The following code is common to both Supplier_Proxys_ and + // Consumer_Proxys. // Initialize the routing entry's peer addressing info. - io_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), + proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_), ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_); // Initialize max timeout. - io_handler->max_timeout (entry.max_retry_delay_); + proxy_handler->max_timeout (entry.max_retry_delay_); - // Try to bind the new IO_Handler to the connection ID. - switch (this->connection_map_.bind (entry.conn_id_, io_handler)) + // Try to bind the new Proxy_Handler to the connection ID. + switch (this->connection_map_.bind (entry.conn_id_, proxy_handler)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, @@ -277,7 +281,7 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void) if (file_empty) ACE_ERROR ((LM_WARNING, - "warning: connection io_handler configuration file was empty\n")); + "warning: connection proxy_handler configuration file was empty\n")); return 0; } @@ -303,43 +307,37 @@ ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void) ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, " "number of destinations = %d\n", entry.conn_id_, - entry.logical_id_, - entry.payload_type_, + entry.supplier_id_, + entry.type_, entry.total_destinations_)); for (int i = 0; i < entry.total_destinations_; i++) ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n", i, entry.destinations_[i])); } - Consumer_Entry *re; - ACE_NEW_RETURN (re, Consumer_Entry, -1); - - Consumer_Entry::ENTRY_SET *io_handler_set; - ACE_NEW_RETURN (io_handler_set, Consumer_Entry::ENTRY_SET, -1); + Dispatch_Set *dispatch_set; + ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1); Event_Addr event_addr (entry.conn_id_, - entry.logical_id_, - entry.payload_type_); + entry.supplier_id_, + entry.type_); // Add the destinations to the Routing Entry. for (int i = 0; i < entry.total_destinations_; i++) { - IO_Handler *io_handler = 0; + Proxy_Handler *proxy_handler = 0; - // Lookup destination and add to Consumer_Entry set if found. + // Lookup destination and add to Dispatch_Set set if found. if (this->connection_map_.find (entry.destinations_[i], - io_handler) != -1) - io_handler_set->insert (io_handler); + proxy_handler) != -1) + dispatch_set->insert (proxy_handler); else ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n", i, entry.destinations_[i])); } - // Attach set of destination io_handlers to routing entry. - re->destinations (io_handler_set); - // Bind with consumer map, keyed by peer address. - switch (this->consumer_map_.bind (event_addr, re)) + switch (this->efd_.bind (event_addr, dispatch_set)) { case -1: ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n", diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h index 0cb928d7884..4e7afc5d328 100644 --- a/apps/Gateway/Gateway/Event_Channel.h +++ b/apps/Gateway/Gateway/Event_Channel.h @@ -17,7 +17,7 @@ #if !defined (ACE_EVENT_CHANNEL) #define ACE_EVENT_CHANNEL -#include "IO_Handler_Connector.h" +#include "Proxy_Handler_Connector.h" template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER> class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler @@ -70,22 +70,22 @@ private: int debug_; // Are we debugging? - IO_Handler_Connector *connector_; + Proxy_Handler_Connector *connector_; // This is used to establish the connections actively. int socket_queue_size_; // Size of the socket queue (0 means "use default"). // = Make life easier by defining typedefs. - typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP; - typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; - typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY; + typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP; + typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR; + typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY; CONNECTION_MAP connection_map_; - // Table that maps Connection IDs to IO_Handler *'s. + // Table that maps Connection IDs to Proxy_Handler *'s. - Consumer_Map consumer_map_; - // Map that associates event addresses to a set of Consumer_Handler + Event_Forwarding_Discriminator efd_; + // Map that associates event addresses to a set of Consumer_Proxy // *'s. }; diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp new file mode 100644 index 00000000000..8261ea13eb2 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp @@ -0,0 +1,61 @@ +/* -*- C++ -*- */ +// $Id$ + +#if !defined (_CONSUMER_MAP_C) +#define _CONSUMER_MAP_C + +#include "Event_Forwarding_Discriminator.h" + +// Bind the Event_Addr to the INT_ID. + +int +Event_Forwarding_Discriminator::bind (Event_Addr event_addr, + Dispatch_Set *Dispatch_Set) +{ + return this->map_.bind (event_addr, Dispatch_Set); +} + +// Find the Dispatch_Set corresponding to the Event_Addr. + +int +Event_Forwarding_Discriminator::find (Event_Addr event_addr, + Dispatch_Set *&Dispatch_Set) +{ + return this->map_.find (event_addr, Dispatch_Set); +} + +// Unbind (remove) the Event_Addr from the map. + +int +Event_Forwarding_Discriminator::unbind (Event_Addr event_addr) +{ + return this->map_.unbind (event_addr); +} + +Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt) + : map_iter_ (rt.map_) +{ +} + +int +Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss) +{ + // Loop in order to skip over inactive entries if necessary. + + for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0; + this->map_iter_.next (temp) != 0; + this->advance ()) + { + // Otherwise, return the next item. + ss = temp->int_id_; + return 1; + } + return 0; +} + +int +Event_Forwarding_Discriminator_Iterator::advance (void) +{ + return this->map_iter_.advance (); +} +#endif /* _CONSUMER_MAP_C */ diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h new file mode 100644 index 00000000000..35a594b61b5 --- /dev/null +++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h @@ -0,0 +1,62 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event_Forwarding_Discriminator.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_CONSUMER_MAP_H) +#define _CONSUMER_MAP_H + +#include "ace/Map_Manager.h" +#include "Concurrency_Strategies.h" +#include "Event.h" +#include "Dispatch_Set.h" + +class Event_Forwarding_Discriminator +{ + // = TITLE + // Define a generic consumer map based on the ACE Map_Manager. + // + // = DESCRIPTION + // This class makes it easier to use the Map_Manager. +public: + int bind (Event_Addr event, Dispatch_Set *Dispatch_Set); + // Associate Event with the Dispatch_Set. + + int find (Event_Addr event, Dispatch_Set *&Dispatch_Set); + // Break any association of EXID. + + int unbind (Event_Addr event); + // Locate EXID and pass out parameter via INID. If found, + // return 0, else -1. + +public: + ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_; + // Map that associates Event Addrs (external ids) with Dispatch_Set *'s + // <internal IDs>. +}; + +class Event_Forwarding_Discriminator_Iterator +{ + // = TITLE + // Define an iterator for the Consumer Map. +public: + Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm); + int next (Dispatch_Set *&); + int advance (void); + +private: + ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_; + // Map we are iterating over. +}; +#endif /* _CONSUMER_MAP_H */ diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index 776d1b2f338..80b768aff84 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -57,7 +57,6 @@ protected: FP::Return_Type readword (char buf[]); int delimiter (char ch); - int endofline (char ch); int comments (char ch); int skipline (void); diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp index 82666406070..2c963ff3d7f 100644 --- a/apps/Gateway/Gateway/Gateway.cpp +++ b/apps/Gateway/Gateway/Gateway.cpp @@ -6,11 +6,14 @@ #include "Gateway.h" class Gateway : public ACE_Service_Object + // = TITLE + // Integrates the whole Gateway application. + // + // = DESCRIPTION + // This implementation uses the <ACE_Event_Channel> as the basis + // for the <Gateway> routing. { public: - // = Initialization method. - Gateway (void); - // = Service configurator hooks. virtual int init (int argc, char *argv[]); // Perform initialization. @@ -34,6 +37,7 @@ protected: // file. ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_; + // The Event Channel routes events from Supplier(s) to Consumer(s). }; // Convenient shorthands. @@ -46,8 +50,6 @@ Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *) if (signum > 0) ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum)); - this->event_channel_.close (); - // Shut down the main event loop. ACE_Service_Config::end_reactor_event_loop (); return 0; @@ -68,22 +70,12 @@ Gateway::handle_input (ACE_HANDLE h) return this->handle_signal (h); } -// Give default values to data members. - - -Gateway::Gateway (void) -{ -} - int Gateway::init (int argc, char *argv[]) { if (this->event_channel_.open (argc, argv) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1); - // Ignore SIPPIPE so each Consumer_Handler can handle it. - ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); - ACE_Sig_Set sig_set; sig_set.sig_add (SIGINT); sig_set.sig_add (SIGQUIT); @@ -91,13 +83,12 @@ Gateway::init (int argc, char *argv[]) // Register ourselves to receive SIGINT and SIGQUIT // so we can shut down gracefully via signals. - if (ACE_Service_Config::reactor ()->register_handler (sig_set, - this) == -1) + if (ACE_Service_Config::reactor ()->register_handler + (sig_set, this) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); - if (ACE_Service_Config::reactor ()->register_handler (0, - this, - ACE_Event_Handler::READ_MASK) == -1) + if (ACE_Service_Config::reactor ()->register_handler + (0, this, ACE_Event_Handler::READ_MASK) == -1) ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); return 0; } diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile index f81df3d73af..0f5ddc07eb0 100644 --- a/apps/Gateway/Gateway/Makefile +++ b/apps/Gateway/Gateway/Makefile @@ -1,7 +1,7 @@ #---------------------------------------------------------------------------- # @(#)Makefile 1.1 10/18/96 # -# Makefile for the Gateway prototype. +# Makefile for the Gateway. #---------------------------------------------------------------------------- #---------------------------------------------------------------------------- @@ -12,15 +12,14 @@ BIN = gatewayd LIB = libGateway.a SHLIB = libGateway.so -FILES = Event_Channel \ - IO_Handler \ - IO_Handler_Connector \ - Config_Files \ +FILES = Config_Files \ File_Parser \ Gateway \ - Consumer_Entry \ - Consumer_Map \ - Thr_IO_Handler + Event_Channel \ + Event_Forwarding_Discriminator \ + Proxy_Handler \ + Proxy_Handler_Connector \ + Thr_Proxy_Handler LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) @@ -52,7 +51,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # Default behavior is to use single-threading. See the README # file for information on how to configure this with multiple # strategies for threading the input and output channels. -DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT +# DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT #---------------------------------------------------------------------------- # Dependencies @@ -61,9 +60,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ - $(WRAPPER_ROOT)/ace/Get_Opt.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ +.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ $(WRAPPER_ROOT)/ace/config.h \ @@ -72,13 +69,38 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Msg.h \ $(WRAPPER_ROOT)/ace/Log_Record.h \ $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/ACE.i \ $(WRAPPER_ROOT)/ace/Log_Record.i \ + Config_Files.h File_Parser.h +.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/ACE.i \ - Config_Files.h File_Parser.h IO_Handler_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + File_Parser.h +.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -119,19 +141,21 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ + Event_Channel.h Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \ - Event_Channel.h -.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h Gateway.h +.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \ + $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -143,7 +167,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - IO_Handler_Connector.h \ + Config_Files.h File_Parser.h Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ @@ -158,6 +182,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ $(WRAPPER_ROOT)/ace/Timer_Queue.h \ @@ -193,16 +218,14 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h -.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \ - IO_Handler_Connector.h \ - $(WRAPPER_ROOT)/ace/Connector.h \ - $(WRAPPER_ROOT)/ace/Service_Config.h \ - $(WRAPPER_ROOT)/ace/Service_Object.h \ - $(WRAPPER_ROOT)/ace/Shared_Object.h \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h Event_Channel.h +.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \ + Event_Forwarding_Discriminator.h \ + $(WRAPPER_ROOT)/ace/Map_Manager.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ $(WRAPPER_ROOT)/ace/Time_Value.h \ @@ -214,6 +237,34 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ + Concurrency_Strategies.h \ + $(WRAPPER_ROOT)/ace/Synch.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ + $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ + $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Event_Handler.h \ + Event.h Dispatch_Set.h \ + $(WRAPPER_ROOT)/ace/Set.h +.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \ + $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/ACE.h \ + $(WRAPPER_ROOT)/ace/OS.h \ + $(WRAPPER_ROOT)/ace/Time_Value.h \ + $(WRAPPER_ROOT)/ace/config.h \ + $(WRAPPER_ROOT)/ace/stdcpp.h \ + $(WRAPPER_ROOT)/ace/Trace.h \ + $(WRAPPER_ROOT)/ace/Log_Msg.h \ + $(WRAPPER_ROOT)/ace/Log_Record.h \ + $(WRAPPER_ROOT)/ace/Log_Priority.h \ + $(WRAPPER_ROOT)/ace/Log_Record.i \ + $(WRAPPER_ROOT)/ace/ACE.i \ + Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ + $(WRAPPER_ROOT)/ace/Service_Config.h \ + $(WRAPPER_ROOT)/ace/Service_Object.h \ + $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/Event_Handler.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ @@ -224,7 +275,6 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ $(WRAPPER_ROOT)/ace/Timer_Queue.h \ @@ -260,37 +310,13 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ $(WRAPPER_ROOT)/ace/Connector.i \ - Thr_IO_Handler.h IO_Handler.h \ + Thr_Proxy_Handler.h Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ - Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h -.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - Config_Files.h File_Parser.h -.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/ACE.i \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - File_Parser.h -.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h +.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \ + Proxy_Handler_Connector.h \ + $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -345,36 +371,19 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ - Gateway.h -.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i -.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - $(WRAPPER_ROOT)/ace/ACE.h \ - $(WRAPPER_ROOT)/ace/OS.h \ - $(WRAPPER_ROOT)/ace/Time_Value.h \ - $(WRAPPER_ROOT)/ace/config.h \ - $(WRAPPER_ROOT)/ace/stdcpp.h \ - $(WRAPPER_ROOT)/ace/Trace.h \ - $(WRAPPER_ROOT)/ace/Log_Msg.h \ - $(WRAPPER_ROOT)/ace/Log_Record.h \ - $(WRAPPER_ROOT)/ace/Log_Priority.h \ - $(WRAPPER_ROOT)/ace/Log_Record.i \ - $(WRAPPER_ROOT)/ace/ACE.i \ - Concurrency_Strategies.h Event.h Consumer_Entry.h \ - $(WRAPPER_ROOT)/ace/Set.h -.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \ + $(WRAPPER_ROOT)/ace/Svc_Handler.h \ + $(WRAPPER_ROOT)/ace/Synch_Options.h \ + $(WRAPPER_ROOT)/ace/Task.h \ + $(WRAPPER_ROOT)/ace/Task_T.h \ + $(WRAPPER_ROOT)/ace/Connector.i \ + Thr_Proxy_Handler.h Proxy_Handler.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ + $(WRAPPER_ROOT)/ace/SOCK_Connector.i \ + Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \ + Dispatch_Set.h +.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \ + Proxy_Handler.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ @@ -435,10 +444,10 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - Consumer_Map.h \ + Event_Forwarding_Discriminator.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ - Concurrency_Strategies.h Event.h Consumer_Entry.h \ - IO_Handler_Connector.h \ + Concurrency_Strategies.h Event.h Dispatch_Set.h \ + Proxy_Handler_Connector.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Connector.i diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp new file mode 100644 index 00000000000..86e0fff8e41 --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler.cpp @@ -0,0 +1,698 @@ +// $Id$ + +#include "Dispatch_Set.h" +#include "Proxy_Handler_Connector.h" + +// Convenient short-hands. +#define CO CONDITION +#define MU MAP_MUTEX + +// The total number of bytes sent/received on this Proxy. + +size_t +Proxy_Handler::total_bytes (void) +{ + return this->total_bytes_; +} + +void +Proxy_Handler::total_bytes (size_t bytes) +{ + this->total_bytes_ += bytes; +} + +Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr), + efd_ (efd), + id_ (-1), + total_bytes_ (0), + state_ (Proxy_Handler::IDLE), + connector_ (ioc), + timeout_ (1), + max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT), + socket_queue_size_ (socket_queue_size) +{ +} + +// Set the direction. + +void +Proxy_Handler::direction (char d) +{ + this->direction_ = d; +} + +// Get the direction. + +char +Proxy_Handler::direction (void) +{ + return this->direction_; +} + +// Sets the timeout delay. + +void +Proxy_Handler::timeout (int to) +{ + if (to > this->max_timeout_) + to = this->max_timeout_; + + this->timeout_ = to; +} + +// Recalculate the current retry timeout delay using exponential +// backoff. Returns the original timeout (i.e., before the +// recalculation). + +int +Proxy_Handler::timeout (void) +{ + int old_timeout = this->timeout_; + this->timeout_ *= 2; + + if (this->timeout_ > this->max_timeout_) + this->timeout_ = this->max_timeout_; + + return old_timeout; +} + +// Sets the max timeout delay. + +void +Proxy_Handler::max_timeout (int mto) +{ + this->max_timeout_ = mto; +} + +// Gets the max timeout delay. + +int +Proxy_Handler::max_timeout (void) +{ + return this->max_timeout_; +} + +// Restart connection asynchronously when timeout occurs. + +int +Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n", + this->id (), this->timeout_)); + return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch); +} + +// Restart connection (blocking_semantics dicates whether we +// restart synchronously or asynchronously). + +int +Proxy_Handler::reinitiate_connection (void) +{ + // Skip over deactivated descriptors. + if (this->get_handle () != ACE_INVALID_HANDLE) + { + // Make sure to close down peer to reclaim descriptor. + this->peer ().close (); + + ACE_DEBUG ((LM_DEBUG, + "(%t) scheduling reinitiation of Proxy_Handler %d\n", + this->id ())); + + // Reschedule ourselves to try and connect again. + if (ACE_Service_Config::reactor ()->schedule_timer + (this, 0, this->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + return 0; +} + +// Handle shutdown of the Proxy_Handler object. + +int +Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask) +{ + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down Proxy_Handler %d on handle %d\n", + this->id (), this->get_handle ())); + + return this->reinitiate_connection (); +} + +// Set the state of the Proxy. + +void +Proxy_Handler::state (Proxy_Handler::State s) +{ + this->state_ = s; +} + +// Perform the first-time initiation of a connection to the peer. + +int +Proxy_Handler::initialize_connection (void) +{ + this->state_ = Proxy_Handler::ESTABLISHED; + + // Restart the timeout to 1. + this->timeout (1); + + // Action that sends the connection id to the peerd. + + ACE_INT32 id = htonl (this->id ()); + + ssize_t n = this->peer ().send ((const void *) &id, sizeof id); + + if (n != sizeof id) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + n == 0 ? "peer has closed down unexpectedly" : "send"), + -1); + return 0; +} + +// Set the size of the socket queue. + +void +Proxy_Handler::socket_queue_size (void) +{ + if (this->socket_queue_size_ > 0) + { + int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF; + + if (this->peer ().set_option (SOL_SOCKET, option, + &this->socket_queue_size_, + sizeof (int)) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option")); + } +} + +// Upcall from the ACE_Acceptor::handle_input() that +// delegates control to our application-specific Proxy_Handler. + +int +Proxy_Handler::open (void *a) +{ + ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n", + this->peer ().get_handle ())); + + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn on non-blocking I/O. + if (this->peer ().enable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Call down to the base class to activate and register this handler. + if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1); + + return this->initialize_connection (); +} + +// Return the current state of the Proxy. + +Proxy_Handler::State +Proxy_Handler::state (void) +{ + return this->state_; +} + +void +Proxy_Handler::id (ACE_INT32 id) +{ + this->id_ = id; +} + +ACE_INT32 +Proxy_Handler::id (void) +{ + return this->id_; +} + +// Set the peer's address information. +int +Proxy_Handler::bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32 id) +{ + this->remote_addr_ = remote_addr; + this->local_addr_ = local_addr; + this->id_ = id; + return 0; +} + +ACE_INET_Addr & +Proxy_Handler::remote_addr (void) +{ + return this->remote_addr_; +} + +ACE_INET_Addr & +Proxy_Handler::local_addr (void) +{ + return this->local_addr_; +} + +// Constructor sets the consumer map pointer. + +Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'C'; + this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE); +} + +// This method should be called only when the Consumer shuts down +// unexpectedly. This method simply marks the Proxy_Handler as having +// failed so that handle_close () can reconnect. + +int +Consumer_Proxy::handle_input (ACE_HANDLE) +{ + char buf[1]; + + this->state (Proxy_Handler::FAILED); + + switch (this->peer ().recv (buf, sizeof buf)) + { + case -1: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + case 0: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + default: + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n", + this->id ()), -1); + /* NOTREACHED */ + } +} + +// Perform a non-blocking put() of event. If we are unable to send +// the entire event the remainder is re-queued at the *front* of the +// Event_List. + +int +Consumer_Proxy::nonblk_put (ACE_Message_Block *event) +{ + // Try to send the event. If we don't send it all (e.g., due to + // flow control), then re-queue the remainder at the head of the + // Event_List and ask the ACE_Reactor to inform us (via + // handle_output()) when it is possible to try again. + + ssize_t n = this->send (event); + + if (n == -1) + { + // Things have gone wrong, let's try to close down and set up a + // new reconnection by calling handle_close(). + this->state (Proxy_Handler::FAILED); + this->handle_close (); + return -1; + } + else if (errno == EWOULDBLOCK) // Didn't manage to send everything. + { + ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + // ACE_Queue in *front* of the list to preserve order. + if (this->msg_queue ()->enqueue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1); + + // Tell ACE_Reactor to call us back when we can send again. + else if (ACE_Service_Config::reactor ()->schedule_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1); + return 0; + } + else + return n; +} + +ssize_t +Consumer_Proxy::send (ACE_Message_Block *event) +{ + ssize_t len = event->length (); + ssize_t n = this->peer ().send (event->rd_ptr (), len); + + if (n <= 0) + return errno == EWOULDBLOCK ? 0 : n; + else if (n < len) + // Re-adjust pointer to skip over the part we did send. + event->rd_ptr (n); + else /* if (n == length) */ + { + // The whole event is sent, we can now safely deallocate the + // buffer. Note that this should decrement a reference count... + delete event; + errno = 0; + } + this->total_bytes (n); + return n; +} + +// Finish sending an event when flow control conditions abate. +// This method is automatically called by the ACE_Reactor. + +int +Consumer_Proxy::handle_output (ACE_HANDLE) +{ + ACE_Message_Block *event = 0; + + ACE_DEBUG ((LM_DEBUG, + "(%t) in handle_output on handle %d\n", + this->get_handle ())); + // The list had better not be empty, otherwise there's a bug! + + if (this->msg_queue ()->dequeue_head + (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1) + { + switch (this->nonblk_put (event)) + { + case 0: // Partial send. + ACE_ASSERT (errno == EWOULDBLOCK); + // Didn't write everything this time, come back later... + break; + + case -1: + // We are responsible for freeing an ACE_Message_Block if + // failures occur. + delete event; + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure")); + + /* FALLTHROUGH */ + default: // Sent the whole thing. + + // If we succeed in writing the entire event (or we did not + // fail due to EWOULDBLOCK) then check if there are more + // events on the Message_Queue. If there aren't, tell the + // ACE_Reactor not to notify us anymore (at least until + // there are new events queued up). + + if (this->msg_queue ()->is_empty ()) + { + ACE_DEBUG ((LM_DEBUG, + "(%t) queueing deactivated on handle %d to routing id %d\n", + this->get_handle (), this->id ())); + + + if (ACE_Service_Config::reactor ()->cancel_wakeup + (this, ACE_Event_Handler::WRITE_MASK) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup")); + } + } + } + else + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head")); + return 0; +} + +// Send an event to a Consumer (may queue if necessary). + +int +Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *) +{ + if (this->msg_queue ()->is_empty ()) + // Try to send the event *without* blocking! + return this->nonblk_put (event); + else + // If we have queued up events due to flow control then just + // enqueue and return. + return this->msg_queue ()->enqueue_tail + (event, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Constructor sets the consumer map pointer and the connector +// pointer. + +Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : msg_frag_ (0), + Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size) +{ + this->direction_ = 'S'; + this->msg_queue ()->high_water_mark (0); +} + +// Receive an Event from a Supplier. Handles fragmentation. +// +// The event returned from recv consists of two parts: +// +// 1. The Address part, contains the "virtual" routing id. +// +// 2. The Data part, which contains the actual data to be forwarded. +// +// The reason for having two parts is to shield the higher layers +// of software from knowledge of the event structure. + +int +Supplier_Proxy::recv (ACE_Message_Block *&forward_addr) +{ + if (this->msg_frag_ == 0) + // No existing fragment... + ACE_NEW_RETURN (this->msg_frag_, + ACE_Message_Block (sizeof (Event)), + -1); + + Event *event = (Event *) this->msg_frag_->rd_ptr (); + ssize_t header_received = 0; + + const ssize_t HEADER_SIZE = sizeof (Event_Header); + ssize_t header_bytes_left_to_read = + HEADER_SIZE - this->msg_frag_->length (); + + if (header_bytes_left_to_read > 0) + { + header_received = this->peer ().recv + (this->msg_frag_->wr_ptr (), header_bytes_left_to_read); + + if (header_received == -1 /* error */ + || header_received == 0 /* EOF */) + { + ACE_ERROR ((LM_ERROR, "%p\n", + "Recv error during header read ")); + ACE_DEBUG ((LM_DEBUG, + "attempted to read %d\n", + header_bytes_left_to_read)); + delete this->msg_frag_; + this->msg_frag_ = 0; + return header_received; + } + + // Bump the write pointer by the amount read. + this->msg_frag_->wr_ptr (header_received); + + // At this point we may or may not have the ENTIRE header. + if (this->msg_frag_->length () < HEADER_SIZE) + { + ACE_DEBUG ((LM_DEBUG, + "Partial header received: only %d bytes\n", + this->msg_frag_->length ())); + // Notify the caller that we didn't get an entire event. + errno = EWOULDBLOCK; + return -1; + } + } + + // At this point there is a complete, valid header in msg_frag_ + ssize_t data_bytes_left_to_read = + sizeof (Event) - this->msg_frag_->length (); + + ssize_t data_received = + this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read); + + // Try to receive the remainder of the event. + + switch (data_received) + { + case -1: + if (errno == EWOULDBLOCK) + // This might happen if only the header came through. + return -1; + else + /* FALLTHROUGH */; + + case 0: // Premature EOF. + delete this->msg_frag_; + this->msg_frag_ = 0; + return 0; + + default: + // Set the write pointer at 1 past the end of the event. + this->msg_frag_->wr_ptr (data_received); + + if (data_received != data_bytes_left_to_read) + { + errno = EWOULDBLOCK; + // Inform caller that we didn't get the whole event. + return -1; + } + else + { + // Set the read pointer to the beginning of the event. + this->msg_frag_->rd_ptr (this->msg_frag_->base ()); + + // Allocate an event forwarding header and chain the data + // portion onto its continuation field. + forward_addr = new ACE_Message_Block (sizeof (Event_Addr), + ACE_Message_Block::MB_PROTO, + this->msg_frag_); + if (forward_addr == 0) + { + delete this->msg_frag_; + this->msg_frag_ = 0; + errno = ENOMEM; + return -1; + } + + Event_Addr event_addr (this->id (), + event->header_.supplier_id_, + event->header_.type_); + // Copy the forwarding address from the Event_Addr into + // forward_addr. + forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr)); + + // Reset the pointer to indicate we've got an entire event. + this->msg_frag_ = 0; + } + + this->total_bytes (data_received + header_received); +#if defined (VERBOSE) + ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s", + event_addr.conn_id_, event->header_.supplier_id_, event->header_.len_, + event->header_.len_, event->data_)); +#else + ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n", + event->header_.supplier_id_, event->header_.len_, this->total_bytes ())); +#endif + return data_received + header_received; + } +} + +// Receive various types of input (e.g., Peer event from the +// gatewayd, as well as stdio). + +int +Supplier_Proxy::handle_input (ACE_HANDLE) +{ + ACE_Message_Block *forward_addr = 0; + + switch (this->recv (forward_addr)) + { + case 0: + // Note that a peer should never initiate a shutdown by closing + // the connection. Instead, it should reconnect. + this->state (Proxy_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, + "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n", + this->id ()), -1); + /* NOTREACHED */ + case -1: + if (errno == EWOULDBLOCK) + // A short-read, we'll come back and finish it up later on! + return 0; + else // A weird problem occurred, shut down and start again. + { + this->state (Proxy_Handler::FAILED); + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n", + "Peer has failed unexpectedly", + this->id ()), -1); + } + /* NOTREACHED */ + default: + return this->forward (forward_addr); + } +} + +// Forward an event to its appropriate Consumer(s). + +int +Supplier_Proxy::forward (ACE_Message_Block *forward_addr) +{ + // We got a valid event, so determine its virtual forwarding + // address, which is stored in the first of the two event blocks, + // which are chained together by this->recv(). + + Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr (); + + // Skip over the address portion and get the data. + const ACE_Message_Block *const data = forward_addr->cont (); + + // <dispatch_set> points to the set of Consumers associated with + // this forwarding address. + Dispatch_Set *dispatch_set = 0; + + if (this->efd_->find (*forwarding_addr, dispatch_set) != -1) + { + // Check to see if there are any destinations. + if (dispatch_set->size () == 0) + ACE_DEBUG ((LM_WARNING, + "there are no active destinations for this event currently\n")); + + else // There are destinations, so forward the event. + { + Dispatch_Set_Iterator dsi (*dispatch_set); + + for (Proxy_Handler **proxy_handler = 0; + dsi.next (proxy_handler) != 0; + dsi.advance ()) + { + // Only process active proxy_handlers. + if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED) + { + // Clone the event portion (should be doing + // reference counting here...) + ACE_Message_Block *newmsg = data->clone (); + + ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n", + (*proxy_handler)->id ())); + + if ((*proxy_handler)->put (newmsg) == -1) + { + if (errno == EWOULDBLOCK) // The queue has filled up! + ACE_ERROR ((LM_ERROR, "(%t) %p\n", + "gateway is flow controlled, so we're dropping events")); + else + ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n", + "put", (*proxy_handler)->id ())); + + // We are responsible for freeing a + // ACE_Message_Block if failures occur. + delete newmsg; + } + } + } + // Will become superfluous once we have reference + // counting... + delete forward_addr; + return 0; + } + } + delete forward_addr; + // Failure return. + ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n", + forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_)); + return 0; +} + +#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION) +template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>; +template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>; +#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */ diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h new file mode 100644 index 00000000000..d91fa3108ff --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler.h @@ -0,0 +1,215 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Proxy_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_PROXY_HANDLER) +#define _PROXY_HANDLER + +#include "ace/Service_Config.h" +#include "ace/SOCK_Connector.h" +#include "ace/Svc_Handler.h" +#include "Event_Forwarding_Discriminator.h" +#include "Dispatch_Set.h" +#include "Event.h" + +// Forward declaration. +class Proxy_Handler_Connector; + +class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> + // = TITLE + // Proxy_Handler contains info about connection state and addressing. + // + // = DESCRIPTION + // The Proxy_Handler classes process events sent to the Event + // Channel from Suppliers and forward them to Consumers. +{ +public: + Proxy_Handler (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int open (void * = 0); + // Initialize and activate a single-threaded Proxy_Handler (called by + // ACE_Connector::handle_output()). + + int bind (const ACE_INET_Addr &remote_addr, + const ACE_INET_Addr &local_addr, + ACE_INT32); + // Set the peer's addressing and routing information. + + ACE_INET_Addr &remote_addr (void); + // Returns the peer's routing address. + + ACE_INET_Addr &local_addr (void); + // Returns our local address. + + // = Set/get routing id. + ACE_INT32 id (void); + void id (ACE_INT32); + + // = The current state of the Proxy_Handler. + enum State + { + IDLE = 1, // Prior to initialization. + CONNECTING, // During connection establishment. + ESTABLISHED, // Proxy_Handler is established and active. + DISCONNECTING, // Proxy_Handler is in the process of connecting. + FAILED // Proxy_Handler has failed. + }; + + // = Set/get the current state. + void state (State); + State state (void); + + // = Set/get the current retry timeout delay. + void timeout (int); + int timeout (void); + + // = Set/get the maximum retry timeout delay. + void max_timeout (int); + int max_timeout (void); + + // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer + // (necessary for error checking). + void direction (char); + char direction (void); + + // = The total number of bytes sent/received on this channel. + size_t total_bytes (void); + void total_bytes (size_t bytes); + // Increment count by <bytes>. + + virtual int handle_timeout (const ACE_Time_Value &, const void *arg); + // Perform timer-based Proxy_Handler reconnection. + +protected: + enum + { + MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout. + }; + + int initialize_connection (void); + // Perform the first-time initiation of a connection to the peer. + + int reinitiate_connection (void); + // Reinitiate a connection asynchronously when peers fail. + + void socket_queue_size (void); + // Set the socket queue size. + + virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, + ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK); + // Perform Proxy_Handler termination. + + Event_Forwarding_Discriminator *efd_; + // Maps Events to a set of Consumers. + + ACE_INET_Addr remote_addr_; + // Address of peer. + + ACE_INET_Addr local_addr_; + // Address of us. + + ACE_INT32 id_; + // The assigned routing ID of this entry. + + size_t total_bytes_; + // The total number of bytes sent/received on this channel. + + State state_; + // The current state of the channel. + + Proxy_Handler_Connector *connector_; + // Back pointer to Proxy_Handler_Connector to reestablish broken + // connections. + + int timeout_; + // Amount of time to wait between reconnection attempts. + + int max_timeout_; + // Maximum amount of time to wait between reconnection attempts. + + char direction_; + // Indicates which direction data flows through the channel ('S' == + // Supplier and 'C' == Consumer). + + int socket_queue_size_; + // Size of the socket queue (0 means "use default"). +}; + +class Supplier_Proxy : public Proxy_Handler + // = TITLE + // Handles reception of Events from Suppliers + // + // = DESCRIPTION + // Performs framing and error checking. +{ +public: + Supplier_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + // Constructor sets the consumer map pointer. + +protected: + virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE); + // Receive and process peer events. + + virtual int recv (ACE_Message_Block *&); + // Receive an event from a Supplier. + + int forward (ACE_Message_Block *event); + // Forward the Event to a Consumer. + + ACE_Message_Block *msg_frag_; + // Keep track of event fragment to handle non-blocking recv's from + // Suppliers. +}; + +class Consumer_Proxy : public Proxy_Handler + // = TITLE + // Handles transmission of events to Consumers. + // + // = DESCRIPTION + // Uses a single-threaded approach. +{ +public: + Consumer_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager * = 0, + int socket_queue_size = 0); + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send an event to a Consumer (may be queued if necessary). + +protected: + // = We'll allow up to 16 megabytes to be queued per-output channel. + enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16}; + + virtual int handle_output (ACE_HANDLE); + // Finish sending event when flow control conditions abate. + + int nonblk_put (ACE_Message_Block *mb); + // Perform a non-blocking put(). + + virtual ssize_t send (ACE_Message_Block *); + // Send an event to a Consumer. + + virtual int handle_input (ACE_HANDLE); + // Receive and process shutdowns from a Consumer. +}; + +#endif /* _PROXY_HANDLER */ diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp new file mode 100644 index 00000000000..7ac0a77a2d4 --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp @@ -0,0 +1,92 @@ +#include "Proxy_Handler_Connector.h" +// $Id$ + + +Proxy_Handler_Connector::Proxy_Handler_Connector (void) +{ +} + +// Override the connection-failure method to add timer support. +// Note that these timers perform "expoential backoff" to +// avoid rapidly trying to reestablish connections when a link +// goes down. + +int +Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask) +{ + ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0; + + // Locate the ACE_Svc_Handler corresponding to the socket descriptor. + if (this->handler_map_.find (sd, stp) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n", + sd, "find"), -1); + + Proxy_Handler *channel = stp->svc_handler (); + + // Schedule a reconnection request at some point in the future + // (note that channel uses an exponential backoff scheme). + if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0, + channel->timeout ()) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + return 0; +} + +// Initiate (or reinitiate) a connection to the Proxy_Handler. + +int +Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel, + ACE_Synch_Options &synch_options) +{ + char buf[MAXHOSTNAMELEN]; + + // Mark ourselves as idle so that the various iterators + // will ignore us until we are reconnected. + channel->state (Proxy_Handler::IDLE); + + if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1 + || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "can't obtain peer's address"), -1); + + // Try to connect to the Peer. + + if (this->connect (channel, channel->remote_addr (), + synch_options, channel->local_addr ()) == -1) + { + if (errno != EWOULDBLOCK) + { + channel->state (Proxy_Handler::FAILED); + ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n", + "connect", buf)); + + // Reschedule ourselves to try and connect again. + if (synch_options[ACE_Synch_Options::USE_REACTOR]) + { + if (ACE_Service_Config::reactor ()->schedule_timer + (channel, 0, channel->timeout ()) == 0) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "schedule_timer"), -1); + } + else + // Failures on synchronous connects are reported as errors + // so that the caller can decide how to proceed. + return -1; + } + else + { + channel->state (Proxy_Handler::CONNECTING); + ACE_DEBUG ((LM_DEBUG, + "(%t) in the process of connecting %s to %s\n", + synch_options[ACE_Synch_Options::USE_REACTOR] + ? "asynchronously" : "synchronously", buf)); + } + } + else + { + channel->state (Proxy_Handler::ESTABLISHED); + ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n", + buf, channel->get_handle ())); + } + return 0; +} diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Proxy_Handler_Connector.h new file mode 100644 index 00000000000..3baea75934a --- /dev/null +++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.h @@ -0,0 +1,40 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Proxy_Handler_Connector.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_IO_HANDLER_CONNECTOR) +#define _IO_HANDLER_CONNECTOR + +#include "ace/Connector.h" +#include "Thr_Proxy_Handler.h" + +class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR> + // = TITLE + // A concrete factory class that setups connections to peerds + // and produces a new Proxy_Handler object to do the dirty work... +{ +public: + Proxy_Handler_Connector (void); + + // Initiate (or reinitiate) a connection on the Proxy_Handler. + int initiate_connection (Proxy_Handler *, + ACE_Synch_Options & = ACE_Synch_Options::synch); + +protected: + // Override the connection-failure method to add timer support. + virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask); +}; + +#endif /* _IO_HANDLER_CONNECTOR */ diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README index 4e986354aaa..e64ad26b568 100644 --- a/apps/Gateway/Gateway/README +++ b/apps/Gateway/Gateway/README @@ -2,15 +2,15 @@ This application illustrates an application-level Gateway which routes messages between Consumer and Suppliers in a distributed environment. The default configuration is single-threaded, i.e., all -Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE +Supplier_Proxys and Consumer_Proxys are multiplexed via the ACE Reactor within a single thread of control. To obtain a version that -multi-threads both Consumer_Handlers and Supplier_Handlers simply set +multi-threads both Consumer_Proxys and Supplier_Proxys simply set the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT -To get a version that uses single-threading for all Supplier_Handlers, -but a separate thread per-Consumer_Handler set the following flag in +To get a version that uses single-threading for all Supplier_Proxys, +but a separate thread per-Consumer_Proxy set the following flag in the Makefile: DEFFLAGS += -DUSE_OUTPUT_MT diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp new file mode 100644 index 00000000000..98722a96295 --- /dev/null +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp @@ -0,0 +1,204 @@ +#include "Thr_Proxy_Handler.h" +// $Id$ + +#include "Proxy_Handler_Connector.h" + +#if defined (ACE_HAS_THREADS) +Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size) +{ +} + +// This method should be called only when the peer shuts down +// unexpectedly. This method marks the Proxy_Handler as having failed and +// deactivates the ACE_Message_Queue (to wake up the thread blocked on +// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will +// eventually try to reconnect... + +int +Thr_Consumer_Proxy::handle_input (ACE_HANDLE h) +{ + this->Consumer_Proxy::handle_input (h); + ACE_Service_Config::reactor ()->remove_handler (h, + ACE_Event_Handler::RWE_MASK + | ACE_Event_Handler::DONT_CALL); + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + return 0; +} + +// Initialize the threaded Consumer_Proxy object and spawn a new +// thread. + +int +Thr_Consumer_Proxy::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + // Register ourselves to receive input events (which indicate that + // the Peer has shut down unexpectedly). + if (ACE_Service_Config::reactor ()->register_handler (this, + ACE_Event_Handler::READ_MASK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// ACE_Queue up a message for transmission (must not block since all +// Supplier_Proxys are single-threaded). + +int +Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *) +{ + // Perform non-blocking enqueue. + return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero); +} + +// Transmit messages to the peer (note simplification resulting from +// threads...) + +int +Thr_Consumer_Proxy::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread it is OK to block on + // output. + + for (ACE_Message_Block *mb = 0; + this->msg_queue ()->dequeue_head (mb) != -1; ) + if (this->send (mb) == -1) + ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed")); + + ACE_ASSERT (errno == ESHUTDOWN); + + ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n", + this->id (), this->get_handle ())); + + this->peer ().close (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", + tv.sec ())); + ACE_OS::sleep (tv); + } + } + + return 0; +} + +Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd, + Proxy_Handler_Connector *ioc, + ACE_Thread_Manager *thr_mgr, + int socket_queue_size) + : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size) +{ +} + +int +Thr_Supplier_Proxy::open (void *) +{ + // Set the size of the socket queue. + this->socket_queue_size (); + + // Turn off non-blocking I/O. + if (this->peer ().disable (ACE_NONBLOCK) == -1) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1); + + if (this->initialize_connection ()) + ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", + "initialize_connection"), -1); + + // Reactivate message queue. If it was active then this is the + // first time in and we need to spawn a thread, otherwise the queue + // was inactive due to some problem and we've already got a thread. + if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE) + { + ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n")); + // Become an active object by spawning a new thread to transmit + // messages to peers. + return this->activate (THR_NEW_LWP | THR_DETACHED); + } + else + { + ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n")); + return 0; + } +} + +// Receive messages from a Peer in a separate thread (note reuse of +// existing code!). + +int +Thr_Supplier_Proxy::svc (void) +{ + for (;;) + { + ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n", + this->peer ().get_handle ())); + + // Since this method runs in its own thread and processes + // messages for one connection it is OK to block on input and + // output. + + while (this->handle_input () != -1) + continue; + + ACE_DEBUG ((LM_DEBUG, + "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n", + this->id (), + this->get_handle ())); + + this->peer ().close (); + + // Deactivate the queue while we try to get reconnected. + this->msg_queue ()->deactivate (); + + for (this->timeout (1); + // Default is to reconnect synchronously. + this->connector_->initiate_connection (this) == -1; ) + { + ACE_Time_Value tv (this->timeout ()); + ACE_ERROR ((LM_ERROR, + "(%t) reattempting connection, sec = %d\n", tv.sec ())); + ACE_OS::sleep (tv); + } + } + return 0; +} + +#endif /* ACE_HAS_THREADS */ diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h new file mode 100644 index 00000000000..8ecced3805c --- /dev/null +++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h @@ -0,0 +1,64 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Thr_Proxy_Handler.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (_THR_IO_HANDLER) +#define _THR_IO_HANDLER + +#include "Proxy_Handler.h" + +#if defined (ACE_HAS_THREADS) +class Thr_Consumer_Proxy : public Consumer_Proxy + // = TITLE + // Runs each Output Proxy_Handler in a separate thread. +{ +public: + Thr_Consumer_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the threaded Consumer_Proxy object and spawn a new + // thread. + + virtual int handle_input (ACE_HANDLE); + // Called when Peer shutdown unexpectedly. + + virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); + // Send a message to a peer. + + virtual int svc (void); + // Transmit peer messages. +}; + +class Thr_Supplier_Proxy : public Supplier_Proxy + // = TITLE + // Runs each Input Proxy_Handler in a separate thread. +{ +public: + Thr_Supplier_Proxy (Event_Forwarding_Discriminator *, + Proxy_Handler_Connector *, + ACE_Thread_Manager *, + int socket_queue_size); + + virtual int open (void *); + // Initialize the object and spawn a new thread. + + virtual int svc (void); + // Transmit peer messages. +}; +#endif /* ACE_HAS_THREADS */ +#endif /* _THR_IO_HANDLER */ diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config index d33469ee157..58884340e61 100644 --- a/apps/Gateway/Gateway/consumer_config +++ b/apps/Gateway/Gateway/consumer_config @@ -1,8 +1,8 @@ -# Consumer map configuration file -# Conn ID Logical ID Payload Destinations -# ------- ---------- ------- ------------ -# 1 1 0 3,4,5 - 1 1 0 3 - 3 1 0 3 -# 4 1 0 4 -# 5 1 0 5 +# Consumer configuration file +# Conn ID Supplier ID Type Consumers +# ------- ----------- ------- ------------ +# 1 1 0 3,4,5 + 1 1 0 3 + 3 1 0 3 +# 4 1 0 4 +# 5 1 0 5 diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h new file mode 100644 index 00000000000..24881c3e85b --- /dev/null +++ b/apps/Gateway/Peer/Event.h @@ -0,0 +1,101 @@ +/* -*- C++ -*- */ +// $Id$ + +// ============================================================================ +// +// = LIBRARY +// apps +// +// = FILENAME +// Event.h +// +// = AUTHOR +// Doug Schmidt +// +// ============================================================================ + +#if !defined (EVENT) +#define EVENT + +#include "ace/OS.h" + +// This is the unique connection identifier that denotes a particular +// Proxy_Handler in the Gateway. +typedef ACE_INT32 ACE_INT32; + +class Event_Addr + // = TITLE + // Address used to identify the source/destination of an event. + // + // = DESCRIPTION + // This is really a "virtual forwarding address" thatis used to + // decouple the filtering and forwarding logic of the Event + // Channel from the format of the data. +{ +public: + Event_Addr (ACE_INT32 cid = -1, + u_char sid = 0, + u_char type = 0) + : conn_id_ (cid), + supplier_id_ (sid), + type_ (type) {} + + int operator== (const Event_Addr &event_addr) const + { + return this->conn_id_ == event_addr.conn_id_ + && this->supplier_id_ == event_addr.supplier_id_ + && this->type_ == event_addr.type_; + } + + ACE_INT32 conn_id_; + // Unique connection identifier that denotes a particular + // Proxy_Handler. + + ACE_INT32 supplier_id_; + // Logical ID. + + ACE_INT32 type_; + // Event type. +}; + + +class Event_Header + // = TITLE + // Fixed sized header. +{ +public: + typedef ACE_INT32 SUPPLIER_ID; + // Type used to forward events from gatewayd. + + enum + { + INVALID_ID = -1 // No peer can validly use this number. + }; + + SUPPLIER_ID supplier_id_; + // Source ID. + + ACE_INT32 type_; + // Event type. + + size_t len_; + // Length of the entire event (including data payload) in bytes. +}; + +class Event + // = TITLE + // Variable-sized event (data_ may be variable-sized between + // 0 and MAX_PAYLOAD_SIZE). +{ +public: + enum { MAX_PAYLOAD_SIZE = 1024 }; + // The maximum size of an Event. + + Event_Header header_; + // Event header. + + char data_[MAX_PAYLOAD_SIZE]; + // Event data. +}; + +#endif /* EVENT */ diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile index 9909eb2ef2a..3176a62b455 100644 --- a/apps/Gateway/Peer/Makefile +++ b/apps/Gateway/Peer/Makefile @@ -1,7 +1,7 @@ #---------------------------------------------------------------------------- # @(#)Makefile 1.1 10/18/96 # -# Makefile for the peer portion of the communication gateway +# Makefile for the Peer portion of the Gateway application #---------------------------------------------------------------------------- #---------------------------------------------------------------------------- @@ -10,7 +10,7 @@ BIN = peerd -FILES = Gateway_Handler +FILES = Peer LSRC = $(addsuffix .cpp,$(FILES)) LOBJ = $(addsuffix .o,$(FILES)) @@ -46,7 +46,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU # DO NOT DELETE THIS LINE -- g++dep uses it. # DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY. -.obj/Gateway_Handler.o .shobj/Gateway_Handler.so: Gateway_Handler.cpp \ +.obj/Peer.o .shobj/Peer.so: Peer.cpp \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ @@ -59,7 +59,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Log_Priority.h \ $(WRAPPER_ROOT)/ace/Log_Record.i \ $(WRAPPER_ROOT)/ace/ACE.i \ - Gateway_Handler.h \ + Peer.h \ $(WRAPPER_ROOT)/ace/Service_Config.h \ $(WRAPPER_ROOT)/ace/Service_Object.h \ $(WRAPPER_ROOT)/ace/Shared_Object.h \ diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp index 7836e9892f8..3b7bdb0cb2d 100644 --- a/apps/Gateway/Peer/peerd.cpp +++ b/apps/Gateway/Peer/peerd.cpp @@ -1,11 +1,10 @@ -/* Driver for the peer daemon (peerd). Note that this // $Id$ - is completely generic code due to the Service Configurator - framework! */ +// Driver for the peer daemon (peerd). Note that this is completely +// generic code due to the Service Configurator framework! #include "ace/Service_Config.h" -#include "Gateway_Handler.h" +#include "Peer.h" int main (int argc, char *argv[]) @@ -20,7 +19,7 @@ main (int argc, char *argv[]) { static char *l_argv[3] = { "-d", "-p", "10002" }; - ACE_Service_Object *so = _alloc_peerd (); + ACE_Service_Object *so = _make_Peer_Acceptor (); if (so->init (3, l_argv) == -1) @@ -28,7 +27,19 @@ main (int argc, char *argv[]) } } - /* Run forever, performing the configured services (until SIGINT/SIGQUIT occurs) */ + // Create an adapter to end the event loop. + ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop); + + ACE_Sig_Set sig_set; + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + // Register ourselves to receive SIGINT and SIGQUIT so we can shut + // down gracefully via signals. + ACE_Service_Config::reactor ()->register_handler (sig_set, &sa); + + // Run forever, performing the configured services until we are shut + // down by a SIGINT/SIGQUIT signal. daemon.run_reactor_event_loop (); diff --git a/apps/Gateway/Peer/svc.conf b/apps/Gateway/Peer/svc.conf index 3bd0cd55891..6c9bc3bc3d7 100644 --- a/apps/Gateway/Peer/svc.conf +++ b/apps/Gateway/Peer/svc.conf @@ -1,3 +1,3 @@ #static Svc_Manager "-d -p 291" -dynamic Peer1 Service_Object *.shobj/Gateway_Handler:_alloc_peerd() active "-p 10004" -#dynamic Peer2 Service_Object *.shobj/Gateway_Handler:_alloc_peerd() active "-p 10003" +dynamic Peer1 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10004" +#dynamic Peer2 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10003" diff --git a/examples/ASX/CCM_App/svc.conf b/examples/ASX/CCM_App/svc.conf index b083976607e..f77d9b14d9d 100644 --- a/examples/ASX/CCM_App/svc.conf +++ b/examples/ASX/CCM_App/svc.conf @@ -1,4 +1,4 @@ -static ACE_Service_Manager "-d -p 3911" +static ACE_Service_Manager "-d -p 4911" dynamic My_Task Service_Object *.shobj/CCM_App:make_task() "-p 3000" diff --git a/examples/Logger/Acceptor-server/server_loggerd.cpp b/examples/Logger/Acceptor-server/server_loggerd.cpp index 21776447b73..20522d9168c 100644 --- a/examples/Logger/Acceptor-server/server_loggerd.cpp +++ b/examples/Logger/Acceptor-server/server_loggerd.cpp @@ -1,6 +1,6 @@ -// This server daemon collects, formats, and displays logging // $Id$ +// This server daemon collects, formats, and displays logging // information forwarded from client daemons running on other hosts in // the network. In addition, this example illustrates how to use the // ACE_Reactor, ACE_Acceptor, ACE_Singleton, and ACE_Test_and_Set diff --git a/netsvcs/lib/Client_Logging_Handler.cpp b/netsvcs/lib/Client_Logging_Handler.cpp index 719e56abd00..20212064353 100644 --- a/netsvcs/lib/Client_Logging_Handler.cpp +++ b/netsvcs/lib/Client_Logging_Handler.cpp @@ -1,10 +1,10 @@ -// Client_Logging_Handler.cpp // $Id$ +// Client_Logging_Handler.cpp + #define ACE_BUILD_SVC_DLL #include "ace/Service_Config.h" #include "ace/Connector.h" - #include "ace/Get_Opt.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Stream.h" @@ -16,9 +16,6 @@ class ACE_Svc_Export ACE_Client_Logging_Handler : public ACE_Svc_Handler<ACE_SOC // This client logging daemon is a mediator that receives logging // records from local applications processes and forwards them to // the server logging daemon running on another host. - // - // = DESCRIPTION - // { public: // = Initialization and termination. @@ -34,11 +31,11 @@ public: // Return the handle of the message_fifo_; virtual int close (u_long); - // Called when object is removed from the ACE_Reactor + // Called when object is removed from the ACE_Reactor. protected: virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle SIGINT. + // Handle SIGPIPE. virtual int handle_input (ACE_HANDLE); // Receive logging records from applications. @@ -69,10 +66,13 @@ ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (const char rendezvous[]) { if (ACE_OS::unlink (rendezvous) == -1 && errno == EACCES) ACE_ERROR ((LM_ERROR, "%p\n", "unlink")); + else if (this->message_fifo_.open (rendezvous) == -1) ACE_ERROR ((LM_ERROR, "%p\n", "open")); - // Register message FIFO to receive input from clients. Note that we need to - // put the EXCEPT_MASK here to deal with SVR4 MSG_BAND data correctly... + + // Register message FIFO to receive input from clients. Note that + // we need to put the EXCEPT_MASK here to deal with SVR4 MSG_BAND + // data correctly... else if (ACE_Service_Config::reactor ()->register_handler (this->message_fifo_.get_handle (), this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::EXCEPT_MASK) == -1) @@ -90,7 +90,6 @@ int ACE_Client_Logging_Handler::handle_signal (int, siginfo_t *, ucontext_t *) { ACE_TRACE ("ACE_Client_Logging_Connector::handle_signal"); -// return 0; return -1; } @@ -126,7 +125,6 @@ ACE_Client_Logging_Handler::get_handle (void) const return this->message_fifo_.get_handle (); } - // Receive a logging record from an application. int @@ -170,7 +168,15 @@ int ACE_Client_Logging_Handler::close (u_long) { ACE_DEBUG ((LM_DEBUG, "shutting down!!!\n")); + + if (ACE_Service_Config::reactor ()->remove_handler + (this->message_fifo_.get_handle (), + ACE_Event_Handler::READ_MASK | ACE_Event_Handler::EXCEPT_MASK | ACE_Event_Handler::DONT_CALL) == -1) + ACE_ERROR ((LM_ERROR, "%n: %p\n", + "remove_handler (message_fifo)")); + this->message_fifo_.close (); + this->destroy (); return 0; } @@ -207,7 +213,6 @@ ACE_Client_Logging_Handler::send (ACE_Log_Record &log_record) return 0; } - class ACE_Client_Logging_Connector : public ACE_Connector<ACE_Client_Logging_Handler, ACE_SOCK_CONNECTOR> // = TITLE // This class contains the service-specific methods that can't @@ -228,9 +233,6 @@ protected: virtual int suspend (void); virtual int resume (void); - virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle SIGINT. - private: int parse_args (int argc, char *argv[]); // Parse svc.conf arguments. @@ -256,7 +258,7 @@ private: int ACE_Client_Logging_Connector::fini (void) { - this->handler_->destroy (); + this->handler_->close (0); return 0; } @@ -285,12 +287,6 @@ ACE_Client_Logging_Connector::init (int argc, char *argv[]) // options. this->parse_args (argc, argv); - // Register ourselves to receive SIGINT so we can shutdown - // gracefully. - if (ACE_Service_Config::reactor ()->register_handler (SIGINT, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler (SIGINT)"), -1); - ACE_NEW_RETURN (this->handler_, ACE_Client_Logging_Handler (this->rendezvous_key_), -1); @@ -352,16 +348,6 @@ ACE_Client_Logging_Connector::resume (void) return 0; } -// Signal the server to shutdown gracefully. - -int -ACE_Client_Logging_Connector::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_TRACE ("ACE_Client_Logging_Connector::handle_signal"); - ACE_Service_Config::end_reactor_event_loop (); - return 0; -} - // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the // single-threaded logging server. diff --git a/netsvcs/lib/Makefile b/netsvcs/lib/Makefile index 8e97448a749..7f2caec6a8c 100644 --- a/netsvcs/lib/Makefile +++ b/netsvcs/lib/Makefile @@ -67,19 +67,11 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -93,14 +85,22 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ $(WRAPPER_ROOT)/ace/Time_Request_Reply.h \ @@ -129,20 +129,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -156,6 +148,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ @@ -163,9 +166,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ @@ -197,20 +197,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -224,6 +216,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Connector.h \ $(WRAPPER_ROOT)/ace/Map_Manager.h \ @@ -231,9 +234,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Connector.i \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \ @@ -285,22 +285,25 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Proactor.h \ $(WRAPPER_ROOT)/ace/Message_Block.h \ $(WRAPPER_ROOT)/ace/Malloc.h \ $(WRAPPER_ROOT)/ace/Malloc_T.h \ $(WRAPPER_ROOT)/ace/Memory_Pool.h \ - $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ - $(WRAPPER_ROOT)/ace/Reactor.h \ - $(WRAPPER_ROOT)/ace/Handle_Set.h \ - $(WRAPPER_ROOT)/ace/Pipe.h \ - $(WRAPPER_ROOT)/ace/Pipe.i \ - $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Name_Request_Reply.h \ @@ -309,9 +312,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ Name_Handler.h @@ -358,31 +358,31 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/Shared_Object.h \ $(WRAPPER_ROOT)/ace/Thread_Manager.h \ $(WRAPPER_ROOT)/ace/Thread.h \ + $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Set.h \ + $(WRAPPER_ROOT)/ace/Reactor.h \ + $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Pipe.h \ + $(WRAPPER_ROOT)/ace/Pipe.i \ + $(WRAPPER_ROOT)/ace/Reactor.i \ $(WRAPPER_ROOT)/ace/Proactor.h \ $(WRAPPER_ROOT)/ace/Message_Block.h \ $(WRAPPER_ROOT)/ace/Malloc.h \ $(WRAPPER_ROOT)/ace/Malloc_T.h \ $(WRAPPER_ROOT)/ace/Memory_Pool.h \ - $(WRAPPER_ROOT)/ace/Signal.h \ $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ - $(WRAPPER_ROOT)/ace/Reactor.h \ - $(WRAPPER_ROOT)/ace/Handle_Set.h \ - $(WRAPPER_ROOT)/ace/Pipe.h \ - $(WRAPPER_ROOT)/ace/Pipe.i \ - $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ Server_Logging_Handler.h .obj/Token_Handler.o .shobj/Token_Handler.so: Token_Handler.cpp \ @@ -411,20 +411,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \ $(WRAPPER_ROOT)/ace/Synch_T.h \ - $(WRAPPER_ROOT)/ace/Set.h \ - $(WRAPPER_ROOT)/ace/Proactor.h \ - $(WRAPPER_ROOT)/ace/Message_Block.h \ - $(WRAPPER_ROOT)/ace/Malloc.h \ - $(WRAPPER_ROOT)/ace/Malloc_T.h \ - $(WRAPPER_ROOT)/ace/Memory_Pool.h \ $(WRAPPER_ROOT)/ace/Signal.h \ - $(WRAPPER_ROOT)/ace/Mem_Map.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.h \ - $(WRAPPER_ROOT)/ace/Timer_Queue.i \ - $(WRAPPER_ROOT)/ace/ReactorEx.h \ - $(WRAPPER_ROOT)/ace/Token.h \ + $(WRAPPER_ROOT)/ace/Set.h \ $(WRAPPER_ROOT)/ace/Reactor.h \ $(WRAPPER_ROOT)/ace/Handle_Set.h \ + $(WRAPPER_ROOT)/ace/Timer_Queue.h \ + $(WRAPPER_ROOT)/ace/Token.h \ $(WRAPPER_ROOT)/ace/Pipe.h \ $(WRAPPER_ROOT)/ace/Pipe.i \ $(WRAPPER_ROOT)/ace/SOCK_Stream.h \ @@ -438,14 +430,22 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/INET_Addr.h \ $(WRAPPER_ROOT)/ace/SOCK_Stream.i \ $(WRAPPER_ROOT)/ace/Reactor.i \ + $(WRAPPER_ROOT)/ace/Proactor.h \ + $(WRAPPER_ROOT)/ace/Message_Block.h \ + $(WRAPPER_ROOT)/ace/Malloc.h \ + $(WRAPPER_ROOT)/ace/Malloc_T.h \ + $(WRAPPER_ROOT)/ace/Memory_Pool.h \ + $(WRAPPER_ROOT)/ace/Mem_Map.h \ + $(WRAPPER_ROOT)/ace/ReactorEx.h \ + $(WRAPPER_ROOT)/ace/Message_Queue.h \ + $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ + $(WRAPPER_ROOT)/ace/Strategies.h \ + $(WRAPPER_ROOT)/ace/Strategies_T.h \ $(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \ $(WRAPPER_ROOT)/ace/Svc_Handler.h \ $(WRAPPER_ROOT)/ace/Synch_Options.h \ $(WRAPPER_ROOT)/ace/Task.h \ $(WRAPPER_ROOT)/ace/Task_T.h \ - $(WRAPPER_ROOT)/ace/Message_Queue.h \ - $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \ - $(WRAPPER_ROOT)/ace/Strategies.h \ $(WRAPPER_ROOT)/ace/Acceptor.i \ $(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \ $(WRAPPER_ROOT)/ace/Token_Request_Reply.h \ @@ -456,6 +456,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU $(WRAPPER_ROOT)/ace/SString.h \ Token_Handler.h .obj/Logging_Strategy.o .shobj/Logging_Strategy.so: Logging_Strategy.cpp \ + /pkg/gnu/lib/g++-include/iostream.h \ + /pkg/gnu/lib/g++-include/fstream.h \ $(WRAPPER_ROOT)/ace/Get_Opt.h \ $(WRAPPER_ROOT)/ace/ACE.h \ $(WRAPPER_ROOT)/ace/OS.h \ diff --git a/netsvcs/lib/Name_Handler.cpp b/netsvcs/lib/Name_Handler.cpp index f2ddb2e950f..fc4a8b9cb46 100644 --- a/netsvcs/lib/Name_Handler.cpp +++ b/netsvcs/lib/Name_Handler.cpp @@ -174,21 +174,12 @@ public: int parse_args (int argc, char *argv[]); // Parse svc.conf arguments. - int handle_signal (int, siginfo_t *, ucontext_t *); - private: ACE_Schedule_All_Reactive_Strategy<ACE_Name_Handler> scheduling_strategy_; // The scheduling strategy is designed for Reactive services. }; int -ACE_Name_Acceptor::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_DEBUG ((LM_DEBUG, "ACE_Name_Acceptor::handle_signal got called\n")); - return 0; -} - -int ACE_Name_Acceptor::parse_args (int argc, char *argv[]) { ACE_TRACE ("ACE_Name_Acceptor::parse_args"); @@ -237,12 +228,6 @@ ACE_Name_Acceptor::init (int argc, char *argv[]) "acceptor::open failed", this->service_addr_.get_port_number ()), -1); - // Register ourselves to receive SIGINT so we can shutdown - // gracefully. - if (this->reactor ()->register_handler (SIGINT, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler (SIGINT)"), -1); - // Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its // own. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); diff --git a/netsvcs/lib/Server_Logging_Handler.cpp b/netsvcs/lib/Server_Logging_Handler.cpp index 5f18494fb3c..67c67ee4e74 100644 --- a/netsvcs/lib/Server_Logging_Handler.cpp +++ b/netsvcs/lib/Server_Logging_Handler.cpp @@ -131,12 +131,6 @@ ACE_Server_Logging_Acceptor::init (int argc, "acceptor::open failed", this->service_addr_.get_port_number ()), -1); - // Register ourselves to receive SIGINT so we can shutdown - // gracefully. - if (this->reactor ()->register_handler (SIGINT, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler (SIGINT)"), -1); - // Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its // own. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); @@ -364,12 +358,6 @@ ACE_Thr_Server_Logging_Acceptor::init (int argc, "acceptor::open failed", this->service_addr_.get_port_number ()), -1); - // Register ourselves to receive SIGINT so we can shutdown - // gracefully. - if (this->reactor ()->register_handler (SIGINT, this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler (SIGINT)"), -1); - // Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its // own. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); diff --git a/netsvcs/lib/TS_Clerk_Handler.cpp b/netsvcs/lib/TS_Clerk_Handler.cpp index 533e98291bb..e0142256ab0 100644 --- a/netsvcs/lib/TS_Clerk_Handler.cpp +++ b/netsvcs/lib/TS_Clerk_Handler.cpp @@ -94,7 +94,7 @@ public: protected: virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle SIGINT. + // Handle SIGPIPE. static void stderr_output (int = 0); @@ -147,7 +147,7 @@ class ACE_TS_Clerk_Processor : public ACE_Connector <ACE_TS_Clerk_Handler, ACE_S // computing a synchronized system time. { public: - ACE_TS_Clerk_Processor (); + ACE_TS_Clerk_Processor (void); // Default constructor virtual int handle_timeout (const ACE_Time_Value &tv, @@ -172,9 +172,6 @@ protected: virtual int suspend (void); virtual int resume (void); - virtual int handle_signal (int signum, siginfo_t *, ucontext_t *); - // Handle SIGINT. - private: int parse_args (int argc, char *argv[]); // Parse svc.conf arguments. @@ -234,14 +231,6 @@ ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor, this->time_info_.sequence_num_ = 0; } -// This is called when a <send> to a server fails... -int -ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_TRACE ("ACE_TS_Clerk_Handler::handle_signal"); - return 0; -} - // Set the connection state void ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state) @@ -285,6 +274,15 @@ ACE_TS_Clerk_Handler::timeout (void) return old_timeout; } +// This is called when a <send> to the logging server fails... + +int +ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *) +{ + ACE_TRACE ("ACE_TS_Clerk_Handler::handle_signal"); + return -1; +} + // Set the max timeout delay. void ACE_TS_Clerk_Handler::max_timeout (int mto) @@ -651,14 +649,8 @@ ACE_TS_Clerk_Processor::init (int argc, char *argv[]) #if !defined (ACE_WIN32) // Ignore SIPPIPE so each Output_Channel can handle it. ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE); +#endif /* ACE_WIN32 */ - // Register ourselves to receive SIGINT and SIGPIPE - // so we can shut down gracefully via signals. - if (ACE_Service_Config::reactor ()->register_handler (SIGINT, - this) == -1) - ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n", - "register_handler"), -1); -#endif ACE_Synch_Options &synch_options = this->blocking_semantics_ == 0 ? ACE_Synch_Options::asynch : ACE_Synch_Options::synch; @@ -803,16 +795,6 @@ ACE_TS_Clerk_Processor::resume (void) return 0; } -// Signal the server to shutdown gracefully. - -int -ACE_TS_Clerk_Processor::handle_signal (int, siginfo_t *, ucontext_t *) -{ - ACE_TRACE ("ACE_TS_Clerk_Processor::handle_signal"); - ACE_Service_Config::end_reactor_event_loop (); - return 0; -} - // The following is a "Factory" used by the ACE_Service_Config and // svc.conf file to dynamically initialize the state of the TS_Clerk. diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp index c072d012211..130b7f1ab30 100644 --- a/netsvcs/servers/main.cpp +++ b/netsvcs/servers/main.cpp @@ -1,6 +1,6 @@ -#include "ace/Service_Config.h" // $Id$ +#include "ace/Service_Config.h" #include "TS_Clerk_Handler.h" #include "TS_Server_Handler.h" #include "Client_Logging_Handler.h" @@ -69,6 +69,17 @@ main (int argc, char *argv[]) } } + // Create an adapter to end the event loop. + ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop); + + ACE_Sig_Set sig_set; + sig_set.sig_add (SIGINT); + sig_set.sig_add (SIGQUIT); + + // Register ourselves to receive SIGINT and SIGQUIT so we can shut + // down gracefully via signals. + ACE_Service_Config::reactor ()->register_handler (sig_set, &sa); + // Run forever, performing the configured services until we are shut // down by a SIGINT/SIGQUIT signal. |