summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-30 06:50:29 +0000
committerschmidt <douglascraigschmidt@users.noreply.github.com>1996-12-30 06:50:29 +0000
commita5c4d8047ab58df5c45092e18ae503ad40518f0e (patch)
treedaa1ac46caf738dfe049fd5ca5eafc0eb84a0eb0
parent645a9c1a4457349320e62d85af49f62e80da53c0 (diff)
downloadATCD-a5c4d8047ab58df5c45092e18ae503ad40518f0e.tar.gz
foo
-rw-r--r--ChangeLog-96b34
-rw-r--r--ace/ACE.cpp124
-rw-r--r--ace/ACE.h10
-rw-r--r--ace/Module.cpp81
-rw-r--r--ace/Module.h48
-rw-r--r--ace/OS.h2
-rw-r--r--ace/Proactor.cpp1
-rw-r--r--ace/ReactorEx.h6
-rw-r--r--ace/Service_Main.cpp3
-rw-r--r--ace/Service_Record.cpp24
-rw-r--r--ace/Service_Record.i9
-rw-r--r--ace/Stream.cpp27
-rw-r--r--ace/Stream.h1
-rw-r--r--ace/Svc_Handler.cpp3
-rw-r--r--ace/Task.cpp7
-rw-r--r--apps/Gateway/Gateway/Concurrency_Strategies.h24
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp22
-rw-r--r--apps/Gateway/Gateway/Config_Files.h10
-rw-r--r--apps/Gateway/Gateway/Dispatch_Set.h28
-rw-r--r--apps/Gateway/Gateway/Event.h59
-rw-r--r--apps/Gateway/Gateway/Event_Channel.cpp106
-rw-r--r--apps/Gateway/Gateway/Event_Channel.h16
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp61
-rw-r--r--apps/Gateway/Gateway/Event_Forwarding_Discriminator.h62
-rw-r--r--apps/Gateway/Gateway/File_Parser.h1
-rw-r--r--apps/Gateway/Gateway/Gateway.cpp31
-rw-r--r--apps/Gateway/Gateway/Makefile189
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.cpp698
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler.h215
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.cpp92
-rw-r--r--apps/Gateway/Gateway/Proxy_Handler_Connector.h40
-rw-r--r--apps/Gateway/Gateway/README8
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.cpp204
-rw-r--r--apps/Gateway/Gateway/Thr_Proxy_Handler.h64
-rw-r--r--apps/Gateway/Gateway/consumer_config16
-rw-r--r--apps/Gateway/Peer/Event.h101
-rw-r--r--apps/Gateway/Peer/Makefile8
-rw-r--r--apps/Gateway/Peer/peerd.cpp23
-rw-r--r--apps/Gateway/Peer/svc.conf4
-rw-r--r--examples/ASX/CCM_App/svc.conf2
-rw-r--r--examples/Logger/Acceptor-server/server_loggerd.cpp2
-rw-r--r--netsvcs/lib/Client_Logging_Handler.cpp50
-rw-r--r--netsvcs/lib/Makefile160
-rw-r--r--netsvcs/lib/Name_Handler.cpp15
-rw-r--r--netsvcs/lib/Server_Logging_Handler.cpp12
-rw-r--r--netsvcs/lib/TS_Clerk_Handler.cpp42
-rw-r--r--netsvcs/servers/main.cpp13
47 files changed, 2188 insertions, 570 deletions
diff --git a/ChangeLog-96b b/ChangeLog-96b
index c47050767a9..38dd2bd7d7a 100644
--- a/ChangeLog-96b
+++ b/ChangeLog-96b
@@ -1,9 +1,37 @@
+Sun Dec 29 18:38:03 1996 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * apps/Gateway/Gateway/File_Parser.h: Removed the endofline()
+ method declaration -- it doesn't seem to be defined anywhere.
+
+ * ace/OS.h: Added an ACE_INT32 to complement the ACE_UINT32.
+
+ * netsvcs/lib: Cleaned up all the ACE network services by removing
+ their SIGINT signal handler. This was interferring with the
+ main event loop's ability to shutdown...
+
+ * apps/Gateway/Gateway: Once again changed the name of
+ *IO_Handler* to *Proxy_Handler* since these things are really
+ proxies, in the COS sense!
+
+ * ace/Service_Record.cpp: Tidied up the implementation of
+ ACE_Module_Type::fini() so that it doesn't try to call fini() on
+ NULL pointers. Also, rather than explicitly deleting the reader
+ and writer Tasks, we call ACE_Module<>::close(), which knows how
+ to take care of all this stuff.
+
+ * ace/Module.cpp: Added an extra parameter to close_i() so that we
+ can correctly pass the value of "flags" from close() in order to
+ prevent deleting tasks when we don't want to do this.
+
+ * ace/Module.cpp: There was a bug in the open() method
+ since we were potentially deleting reader_q and writer_q twice
+ if memory allocation failed.
+
Sat Dec 28 19:02:13 1996 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
* ace/ACE.cpp: Changed the implementation of ldfind() so that it
- doesn't try to add the DLL prefix (e.g., "lib") unless given a
- relative pathname. In other words, we never try to modify the
- prefix for an absolute pathname. This is necessary to keep lots
+ doesn't try to add the DLL prefix (e.g., "lib") unless it
+ doesn't match filename. This is necessary to keep lots
of existing svc.conf files from breaking.
* ace/Event_Handler.h: Added a new ACCEPT_MASK for use with the
diff --git a/ace/ACE.cpp b/ace/ACE.cpp
index 353f0ebcb0c..817bd107d9c 100644
--- a/ace/ACE.cpp
+++ b/ace/ACE.cpp
@@ -93,6 +93,23 @@ ACE::hash_pjw (const char *str)
return hash;
}
+size_t
+ACE::strrepl (char *s, char search, char replace)
+{
+ ACE_TRACE ("ACE::strrepl");
+
+ size_t replaced = 0;
+
+ for (size_t i = 0; s[i] != '\0'; i++)
+ if (s[i] == search)
+ {
+ s[i] = replace;
+ replaced++;
+ }
+
+ return replaced;
+}
+
char *
ACE::strenvdup (const char *str)
{
@@ -136,7 +153,7 @@ ACE::ldfind (const char filename[],
char tempfilename[MAXPATHLEN];
char searchfilename[MAXPATHLEN];
- // Create a working copy of filename to mess with
+ // Create a copy of filename to work with.
if (ACE_OS::strlen (filename) + 1 > sizeof tempcopy)
{
errno = ENOMEM;
@@ -148,17 +165,12 @@ ACE::ldfind (const char filename[],
// Insert canonical directory separators.
char *separator_ptr;
- for (separator_ptr = tempcopy;
- *separator_ptr != '\0';
- separator_ptr++)
- if (*separator_ptr == '\\'
- || *separator_ptr == ACE_DIRECTORY_SEPARATOR_CHAR)
- *separator_ptr = '/';
-
- int got_prefix = 0;
+ if (ACE_DIRECTORY_SEPARATOR_CHAR != '/')
+ // Make all the directory separators ``canonical'' to simplify
+ // subsequent code.
+ ACE::strrepl (tempcopy, ACE_DIRECTORY_SEPARATOR_CHAR, '/');
// Separate filename from pathname.
-
separator_ptr = ACE_OS::strrchr (tempcopy, '/');
// This is a relative path.
@@ -166,13 +178,6 @@ ACE::ldfind (const char filename[],
{
searchpathname[0] = '\0';
ACE_OS::strcpy (tempfilename, tempcopy);
-
- // Note that we only try to set the prefix if we're dealing with
- // a relative path.
- if (ACE_OS::strlen (ACE_DLL_PREFIX) == 0
- || ACE_OS::strncmp (tempfilename, ACE_DLL_PREFIX,
- ACE_OS::strlen (ACE_DLL_PREFIX) == 0))
- got_prefix = 1;
}
else // This is an absolute path.
{
@@ -181,37 +186,32 @@ ACE::ldfind (const char filename[],
ACE_OS::strcpy (searchpathname, tempcopy);
}
- // Determine how the filename needs to be decorated.
-
int got_suffix = 0;
// Check to see if this has an appropriate DLL suffix for the OS
// platform.
char *s = ACE_OS::strrchr (tempfilename, '.');
- if (s != 0 && ACE_OS::strcmp (s + 1, ACE_DLL_SUFFIX))
- got_suffix = 1;
- // Create the properly decorated filename.
+ if (s != 0)
+ {
+ // Check whether this matches the appropriate platform-specific suffix.
+ if (ACE_OS::strcmp (s + 1, ACE_DLL_SUFFIX) == 0)
+ got_suffix = 1;
+ else
+ ACE_ERROR ((LM_WARNING,
+ "Warning: improper suffix for a shared library on this platform: %s\n",
+ s));
+ }
+
+ // Make sure we've got enough space in tempfilename.
if (ACE_OS::strlen (tempfilename) +
- got_prefix ? 0 : ACE_OS::strlen (ACE_DLL_PREFIX) +
+ ACE_OS::strlen (ACE_DLL_PREFIX) +
got_suffix ? 0 : ACE_OS::strlen (ACE_DLL_SUFFIX) >= sizeof searchfilename)
{
errno = ENOMEM;
return -1;
}
- else
- ::sprintf (searchfilename, "%s%s%s",
- got_prefix ? "" : ACE_DLL_PREFIX,
- tempfilename,
- got_suffix ? "" : ACE_DLL_SUFFIX);
-
- if (ACE_OS::strcmp (searchfilename
- + ACE_OS::strlen (searchfilename) - ACE_OS::strlen (ACE_DLL_SUFFIX),
- ACE_DLL_SUFFIX))
- ACE_ERROR ((LM_NOTICE,
- "CAUTION: improper name for a shared library on this patform: %s\n",
- searchfilename));
-
+
// Use absolute pathname if there is one.
if (ACE_OS::strlen (searchpathname) > 0)
{
@@ -223,16 +223,27 @@ ACE::ldfind (const char filename[],
}
else
{
-
- // Revert to native path name separators
- for (separator_ptr = searchpathname;
- *separator_ptr != '\0';
- separator_ptr++)
- if (*separator_ptr == '/')
- *separator_ptr = ACE_DIRECTORY_SEPARATOR_CHAR;
-
- ::sprintf (pathname, "%s%s", searchpathname, searchfilename);
- return 0;
+ if (ACE_DIRECTORY_SEPARATOR_CHAR != '/')
+ // Revert to native path name separators
+ ACE::strrepl (searchpathname, '/', ACE_DIRECTORY_SEPARATOR_CHAR);
+
+ // First, try matching the filename *without* adding a
+ // prefix.
+ ::sprintf (pathname, "%s%s%s",
+ searchpathname,
+ tempfilename,
+ got_suffix ? "" : ACE_DLL_SUFFIX);
+ if (ACE_OS::access (pathname, F_OK) == 0)
+ return 0;
+
+ // Second, try matching the filename *with* adding a prefix.
+ ::sprintf (pathname, "%s%s%s%s",
+ searchpathname,
+ ACE_DLL_PREFIX,
+ tempfilename,
+ got_suffix ? "" : ACE_DLL_SUFFIX);
+ if (ACE_OS::access (pathname, F_OK) == 0)
+ return 0;
}
}
@@ -259,15 +270,28 @@ ACE::ldfind (const char filename[],
result = -1;
break;
}
- ACE_OS::sprintf (pathname, "%s%c%s",
+
+ // First, try matching the filename *without* adding a
+ // prefix.
+ ACE_OS::sprintf (pathname, "%s%c%s",
path_entry,
ACE_DIRECTORY_SEPARATOR_CHAR,
searchfilename);
+ if (ACE_OS::access (pathname, F_OK) == 0)
+ break;
- if (ACE_OS::access (pathname, R_OK) == 0)
+ // Second, try matching the filename *with* adding a
+ // prefix.
+ ACE_OS::sprintf (pathname, "%s%c%s%s",
+ path_entry,
+ ACE_DIRECTORY_SEPARATOR_CHAR,
+ ACE_DLL_PREFIX,
+ searchfilename);
+ if (ACE_OS::access (pathname, F_OK) == 0)
break;
- path_entry = ACE_OS::strtok (0,
- ACE_LD_SEARCH_PATH_SEPARATOR_STR);
+
+ path_entry = ACE_OS::strtok
+ (0, ACE_LD_SEARCH_PATH_SEPARATOR_STR);
}
ACE_OS::free ((void *) ld_path);
diff --git a/ace/ACE.h b/ace/ACE.h
index 3bd4202f45f..0d698ecca77 100644
--- a/ace/ACE.h
+++ b/ace/ACE.h
@@ -223,6 +223,10 @@ public:
// Copies <t> to <s>, returning a pointer to the end of the copied
// region (rather than the beginning, a la <strcpy>.
+ static size_t strrepl (char *s, char search, char replace);
+ // Replace all instances of <search> in <s> with <replace>. Returns
+ // the number of replacements made.
+
static const char *execname (const char *pathname);
// On Win32 returns <pathname> if it already ends in ".exe,"
// otherwise returns a dynamically allocated buffer containing
@@ -254,9 +258,9 @@ public:
// a relative path in conjunction with ACE_LD_SEARCH_PATH (e.g.,
// $LD_LIBRARY_PATH on UNIX or $PATH on Win32). This function will
// add appropriate suffix (e.g., .dll on Win32 or .so on UNIX)
- // according to the OS platform. In addition, if a relative path is
- // used, this function will prefix the appropriate prefix (e.g.,
- // "lib" on UNIX and "" on Win32).
+ // according to the OS platform. In addition, this function will
+ // apply the appropriate prefix (e.g., "lib" on UNIX and "" on
+ // Win32) if the <filename> doesn't match directly.
static FILE *ldopen (const char *filename, const char *type);
// Uses <ldopen> to locate and open the appropriate <filename> and
diff --git a/ace/Module.cpp b/ace/Module.cpp
index d38df5b7a7f..c0d30dc1ccf 100644
--- a/ace/Module.cpp
+++ b/ace/Module.cpp
@@ -27,13 +27,13 @@ ACE_Module<ACE_SYNCH_2>::writer (ACE_Task<ACE_SYNCH_2> *q,
ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::writer");
// Close and maybe delete old writer
- this->close_i (1);
+ this->close_i (1, flags);
this->q_pair_[1] = q;
if (q != 0)
ACE_CLR_BITS (q->flags_, ACE_Task_Flags::ACE_READER);
- // don't allow the caller to change the reader status
+ // Don't allow the caller to change the reader status.
ACE_SET_BITS (flags_, (flags & M_DELETE_WRITER));
}
@@ -44,7 +44,7 @@ ACE_Module<ACE_SYNCH_2>::reader (ACE_Task<ACE_SYNCH_2> *q,
ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::reader");
// Close and maybe delete old writer
- this->close_i (0);
+ this->close_i (0, flags);
this->q_pair_[0] = q;
if (q != 0)
@@ -76,23 +76,24 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name,
this->name (mod_name);
this->arg_ = arg;
- // we may already have readers and/or writers
+ // We may already have readers and/or writers.
if (this->reader ())
- this->close_i (0);
+ this->close_i (0, M_DELETE_READER);
if (this->writer ())
- this->close_i (1);
-
+ this->close_i (1, M_DELETE_WRITER);
- if (writer_q == 0) {
- writer_q = new ACE_Thru_Task<ACE_SYNCH_2>;
- ACE_SET_BITS (flags, M_DELETE_WRITER);
- }
+ if (writer_q == 0)
+ {
+ writer_q = new ACE_Thru_Task<ACE_SYNCH_2>;
+ ACE_SET_BITS (flags, M_DELETE_WRITER);
+ }
- if (reader_q == 0) {
- reader_q = new ACE_Thru_Task<ACE_SYNCH_2>;
- ACE_SET_BITS (flags, M_DELETE_READER);
- }
+ if (reader_q == 0)
+ {
+ reader_q = new ACE_Thru_Task<ACE_SYNCH_2>;
+ ACE_SET_BITS (flags, M_DELETE_READER);
+ }
this->reader (reader_q);
this->writer (writer_q);
@@ -102,21 +103,20 @@ ACE_Module<ACE_SYNCH_2>::open (const char *mod_name,
writer_q->mod_ = this;
// Save the flags
- ACE_SET_BITS (flags_, flags);
+ this->flags_ = flags;
// Make sure that the memory is allocated before proceding.
if (writer_q == 0 || reader_q == 0)
{
- this->close_i (0);
- this->close_i (1);
+ // These calls will delete writer_q and/or reader_q, if
+ // necessary.
+ this->close_i (0, M_DELETE_READER);
+ this->close_i (1, M_DELETE_WRITER);
// Reset back pointers.
- reader_q->mod_ = NULL;
- writer_q->mod_ = NULL;
+ reader_q->mod_ = 0;
+ writer_q->mod_ = 0;
- delete writer_q;
- delete reader_q;
-
errno = ENOMEM;
return -1;
}
@@ -140,6 +140,7 @@ ACE_Module<ACE_SYNCH_2>::sibling (ACE_Task<ACE_SYNCH_2> *orig)
template <ACE_SYNCH_1>
ACE_Module<ACE_SYNCH_2>::ACE_Module (void)
+ : flags_ (0)
{
ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::ACE_Module");
this->name ("<unknown>");
@@ -164,6 +165,7 @@ ACE_Module<ACE_SYNCH_2>::ACE_Module (const char *mod_name,
ACE_Task<ACE_SYNCH_2> *reader_q,
void *args,
int flags /* = M_DELETE */)
+ : flags_ (0)
{
ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::ACE_Module");
@@ -183,21 +185,22 @@ ACE_Module<ACE_SYNCH_2>::close (int flags /* = M_DELETE_NONE */)
ACE_SET_BITS (flags_, flags);
- if (this->close_i (0) == -1)
+ if (this->close_i (0, flags) == -1)
result = -1;
- if (this->close_i (1) == -1)
+ if (this->close_i (1, flags) == -1)
result = -1;
return result;
}
template <ACE_SYNCH_1> int
-ACE_Module<ACE_SYNCH_2>::close_i (int which)
+ACE_Module<ACE_SYNCH_2>::close_i (int which,
+ int flags)
{
ACE_TRACE ("ACE_Module<ACE_SYNCH_2>::close_i");
- if (this->q_pair_[which] == NULL)
+ if (this->q_pair_[which] == 0)
return 0;
// Copy task pointer to prevent problems when ACE_Task::close
@@ -206,26 +209,28 @@ ACE_Module<ACE_SYNCH_2>::close_i (int which)
// Change so that close doesn't get called again from the task base.
- // Now close the task
+ // Now close the task.
int result = 0;
- if (task->module_closed() == -1)
+ if (task->module_closed () == -1)
result = -1;
task->flush ();
task->next (0);
// Should we also delete it ?
- if (ACE_BIT_ENABLED (flags_, which+1) ) {
- // Only delete the Tasks if there aren't any more threads
- // running in them.
- if (task->thr_count() == 0)
- delete task;
- }
-
- // Set the tasks pointer to NULL so that we don't try to close()
+ if (flags != M_DELETE_NONE
+ && ACE_BIT_ENABLED (flags_, which + 1))
+ {
+ // Only delete the Tasks if there aren't any more threads
+ // running in them.
+ if (task->thr_count () == 0)
+ delete task;
+ }
+
+ // Set the tasks pointer to 0 so that we don't try to close()
// this object again if the destructor gets called.
- this->q_pair_[which] = NULL;
+ this->q_pair_[which] = 0;
// Finally remove the delete bit.
ACE_CLR_BITS (flags_, which + 1);
diff --git a/ace/Module.h b/ace/Module.h
index f587ef7744f..d2bd2c608f0 100644
--- a/ace/Module.h
+++ b/ace/Module.h
@@ -49,7 +49,8 @@ public:
M_DELETE = 3
// Indicates that close() deletes the Tasks. Don't change this
// value without updating the same enum in class ACE_Stream...
- // The above flags may be or'ed together.
+ // The <M_DELETE_READER> and <M_DELETE_WRITER> flags may be or'ed
+ // together.
};
// = Initialization and termination methods.
@@ -63,7 +64,7 @@ public:
ACE_Task<ACE_SYNCH_2> *writer = 0,
ACE_Task<ACE_SYNCH_2> *reader = 0,
void *args = 0,
- int flags = M_DELETE);
+ int flags = M_DELETE);
// Create an initialized module with <module_name> as its identity
// and <reader> and <writer> as its tasks.
@@ -71,20 +72,19 @@ public:
ACE_Task<ACE_SYNCH_2> *writer = 0,
ACE_Task<ACE_SYNCH_2> *reader = 0,
void *a = 0,
- int flags = M_DELETE);
+ int flags = M_DELETE);
// Create an initialized module with <module_name> as its identity
- // and <reader> and <writer> as its tasks.
- // Previously register reader or writers or closed down and deleted
- // according to the value of flags_.
- // Should not be called from within ACE_Task::module_closed()
+ // and <reader> and <writer> as its tasks. Previously register
+ // reader or writers or closed down and deleted according to the
+ // value of flags_. Should not be called from within
+ // ACE_Task::module_closed().
int close (int flags = M_DELETE_NONE);
// Close down the Module and its Tasks. The flags argument can be
- // used to override the default behaviour, which depends on
- // previous <flags> values in calls to c'tor(), open(), reader() and
- // writer().
- // A previous value M_DELETE[_XXX] can not be overridden.
- // Should not be called from within ACE_Task::module_closed()
+ // used to override the default behaviour, which depends on previous
+ // <flags> values in calls to c'tor(), open(), reader() and
+ // writer(). A previous value M_DELETE[_XXX] can not be overridden.
+ // Should not be called from within ACE_Task::module_closed().
// = ACE_Task manipulation routines
ACE_Task<ACE_SYNCH_2> *writer (void);
@@ -92,23 +92,23 @@ public:
void writer (ACE_Task<ACE_SYNCH_2> *q, int flags = M_DELETE_WRITER);
// Set the writer task. <flags> can be used to indicate that the
- // module should delete the writer during a call to close or
- // to the destructor. If a previous writer exists, it is closed.
- // It may also be deleted, depending on the old flags_ value.
- // Should not be called from within ACE_Task::module_closed()
+ // module should delete the writer during a call to close or to the
+ // destructor. If a previous writer exists, it is closed. It may
+ // also be deleted, depending on the old flags_ value. Should not
+ // be called from within ACE_Task::module_closed().
ACE_Task<ACE_SYNCH_2> *reader (void);
// Get the reader task.
void reader (ACE_Task<ACE_SYNCH_2> *q, int flags = M_DELETE_READER);
// Set the reader task. <flags> can be used to indicate that the
- // module should delete the reader during a call to close or
- // to the destructor. If a previous reader exists, it is closed.
- // It may also be deleted, depending on the old flags_ value.
- // Should not be called from within ACE_Task::module_closed()
+ // module should delete the reader during a call to close or to the
+ // destructor. If a previous reader exists, it is closed. It may
+ // also be deleted, depending on the old flags_ value. Should not
+ // be called from within ACE_Task::module_closed()
ACE_Task<ACE_SYNCH_2> *sibling (ACE_Task<ACE_SYNCH_2> *orig);
- // Set and get pointer to sibling ACE_Task in ACE_Module
+ // Set and get pointer to sibling ACE_Task in ACE_Module.
// = Identify the module
const char *name (void) const;
@@ -139,9 +139,9 @@ public:
// Declare the dynamic allocation hooks.
private:
- int close_i(int which);
- // Implements the close operation for either the reader
- // or the writer task (depending on <which>)
+ int close_i (int which, int flags);
+ // Implements the close operation for either the reader or the
+ // writer task (depending on <which>).
ACE_Task<ACE_SYNCH_2> *q_pair_[2];
// Pair of Tasks that form the "read-side" and "write-side" of the
diff --git a/ace/OS.h b/ace/OS.h
index 73213a5ee63..393793e3a38 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -1699,8 +1699,10 @@ typedef fd_set ACE_FD_SET_TYPE;
// Necessary to support the Alphas, which have 64 bit longs and 32 bit
// ints...
typedef u_int ACE_UINT32;
+typedef int ACE_INT32;
#else
typedef u_long ACE_UINT32;
+typedef long ACE_INT32;
#endif /* ACE_HAS_64BIT_LONGS */
#if !defined (ETIMEDOUT) && defined (ETIME)
diff --git a/ace/Proactor.cpp b/ace/Proactor.cpp
index 6e0f4438ad5..bfa8ddbd2f6 100644
--- a/ace/Proactor.cpp
+++ b/ace/Proactor.cpp
@@ -446,6 +446,7 @@ ACE_Proactor::cancel_io (ACE_Event_Handler *handler)
#if defined (ACE_WIN32) && defined (ACE_HAS_CANCEL_IO)
return ::CancelIO (handler->get_handle ()) ? -1 : 0;
#else
+ ACE_UNUSED_ARG(handler);
return 0;
#endif /* ACE_WIN32 */
}
diff --git a/ace/ReactorEx.h b/ace/ReactorEx.h
index 0cf84d67de4..e5903fb5eeb 100644
--- a/ace/ReactorEx.h
+++ b/ace/ReactorEx.h
@@ -14,8 +14,8 @@
//
// ============================================================================
-#if !defined (ACE_ReactorEx_H)
-#define ACE_ReactorEx_H
+#if !defined (ACE_REACTOREX_H)
+#define ACE_REACTOREX_H
#include "ace/Time_Value.h"
#include "ace/Timer_Queue.h"
@@ -291,4 +291,4 @@ public:
#if defined (__ACE_INLINE__)
#include "ace/ReactorEx.i"
#endif /* __ACE_INLINE__ */
-#endif /* ACE_ReactorEx_H */
+#endif /* ACE_REACTOREX_H */
diff --git a/ace/Service_Main.cpp b/ace/Service_Main.cpp
index b96c06fe7db..14e8fdbcd0b 100644
--- a/ace/Service_Main.cpp
+++ b/ace/Service_Main.cpp
@@ -26,8 +26,7 @@ sc_main (int argc, char *argv[])
// Run forever, performing the configured services until we are shut
// down by a SIGINT/SIGQUIT signal.
- while (daemon.reactor_event_loop_done () == 0)
- daemon.run_reactor_event_loop ();
+ daemon.run_reactor_event_loop ();
return 0;
}
diff --git a/ace/Service_Record.cpp b/ace/Service_Record.cpp
index 53271cf49a1..21cd5462e71 100644
--- a/ace/Service_Record.cpp
+++ b/ace/Service_Record.cpp
@@ -103,15 +103,20 @@ int
ACE_Module_Type::fini (void) const
{
ACE_TRACE ("ACE_Module_Type::fini");
+
const void *obj = this->object ();
MT_Module *mod = (MT_Module *) obj;
MT_Task *reader = mod->reader ();
MT_Task *writer = mod->writer ();
- reader->fini ();
- writer->fini ();
- delete reader;
- delete writer;
+ if (reader != 0)
+ reader->fini ();
+
+ if (writer != 0)
+ writer->fini ();
+
+ // Close the module and delete the memory.
+ mod->close (MT_Module::M_DELETE);
return ACE_Service_Type::fini ();
}
@@ -227,7 +232,7 @@ ACE_Stream_Type::fini (void) const
return ACE_Service_Type::fini ();
}
-// Locate and remove MOD_NAME from the ACE_Stream.
+// Locate and remove <mod_name> from the ACE_Stream.
int
ACE_Stream_Type::remove (ACE_Module_Type *mod)
@@ -337,6 +342,15 @@ ACE_Service_Record::resume (void) const
this->type_->resume ();
}
+int
+ACE_Service_Object_Type::fini (void) const
+{
+ ACE_TRACE ("ACE_Service_Object_Type::fini");
+ ACE_Service_Object *so = (ACE_Service_Object *) this->object ();
+ so->fini ();
+ return ACE_Service_Type::fini ();
+}
+
#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
template class ACE_Module<ACE_SYNCH>;
template class ACE_Stream<ACE_SYNCH>;
diff --git a/ace/Service_Record.i b/ace/Service_Record.i
index bd6d667a593..0afc4051ee2 100644
--- a/ace/Service_Record.i
+++ b/ace/Service_Record.i
@@ -18,15 +18,6 @@ ACE_Service_Object_Type::resume (void) const
}
ACE_INLINE int
-ACE_Service_Object_Type::fini (void) const
-{
- ACE_TRACE ("ACE_Service_Object_Type::fini");
- ACE_Service_Object *so = (ACE_Service_Object *) this->object ();
- so->fini ();
- return ACE_Service_Type::fini ();
-}
-
-ACE_INLINE int
ACE_Service_Object_Type::info (char **str, size_t len) const
{
ACE_TRACE ("ACE_Service_Object_Type::info");
diff --git a/ace/Stream.cpp b/ace/Stream.cpp
index 13a1874ed51..d345c0f7662 100644
--- a/ace/Stream.cpp
+++ b/ace/Stream.cpp
@@ -23,7 +23,9 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const
ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::dump");
ACE_DEBUG ((LM_DEBUG, "-------- module links --------\n"));
- for (ACE_Module<ACE_SYNCH_2> *mp = this->stream_head_; ; mp = mp->next ())
+ for (ACE_Module<ACE_SYNCH_2> *mp = this->stream_head_;
+ ;
+ mp = mp->next ())
{
ACE_DEBUG ((LM_DEBUG, "module name = %s\n", mp->name ()));
if (mp == this->stream_tail_)
@@ -34,13 +36,16 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const
ACE_Task<ACE_SYNCH_2> *tp;
- for (tp = this->stream_head_->writer (); ; tp = tp->next ())
+ for (tp = this->stream_head_->writer ();
+ ;
+ tp = tp->next ())
{
ACE_DEBUG ((LM_DEBUG, "writer queue name = %s\n", tp->name ()));
tp->dump ();
ACE_DEBUG ((LM_DEBUG, "-------\n"));
if (tp == this->stream_tail_->writer ()
- || (this->linked_us_ && tp == this->linked_us_->stream_head_->reader ()))
+ || (this->linked_us_
+ && tp == this->linked_us_->stream_head_->reader ()))
break;
}
@@ -51,7 +56,8 @@ ACE_Stream<ACE_SYNCH_2>::dump (void) const
tp->dump ();
ACE_DEBUG ((LM_DEBUG, "-------\n"));
if (tp == this->stream_head_->reader ()
- || (this->linked_us_ && tp == this->linked_us_->stream_head_->writer ()))
+ || (this->linked_us_
+ && tp == this->linked_us_->stream_head_->writer ()))
break;
}
}
@@ -157,12 +163,13 @@ ACE_Stream<ACE_SYNCH_2>::remove (const char *name,
else
prev->link (mod->next ());
- // Close down the module and release the memory.
- mod->close (flags);
-
// Don't delete the Module unless the flags request this.
if (flags != ACE_Module<ACE_SYNCH_2>::M_DELETE_NONE)
- delete mod;
+ {
+ // Close down the module and release the memory.
+ mod->close (flags);
+ delete mod;
+ }
return 0;
}
@@ -228,8 +235,8 @@ ACE_Stream<ACE_SYNCH_2>::push_module (ACE_Module<ACE_SYNCH_2> *new_top,
#if 0
int
ACE_Stream<ACE_SYNCH_2>::open (void *a,
- ACE_Multiplexor &muxer,
- ACE_Module<ACE_SYNCH_2> *head)
+ ACE_Multiplexor &muxer,
+ ACE_Module<ACE_SYNCH_2> *head)
{
ACE_TRACE ("ACE_Stream<ACE_SYNCH_2>::open");
this->stream_head_ = head == 0
diff --git a/ace/Stream.h b/ace/Stream.h
index cd50967ceb7..25e756c4ddf 100644
--- a/ace/Stream.h
+++ b/ace/Stream.h
@@ -1,7 +1,6 @@
/* -*- C++ -*- */
// $Id$
-
// ============================================================================
//
// = LIBRARY
diff --git a/ace/Svc_Handler.cpp b/ace/Svc_Handler.cpp
index 922b6b89eaa..67855016c1a 100644
--- a/ace/Svc_Handler.cpp
+++ b/ace/Svc_Handler.cpp
@@ -137,6 +137,7 @@ template <PR_ST_1, ACE_SYNCH_1> void
ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown (void)
{
ACE_TRACE ("ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown");
+
// Deregister this handler with the ACE_Reactor.
if (this->reactor ())
{
@@ -145,7 +146,7 @@ ACE_Svc_Handler<PR_ST_2, ACE_SYNCH_2>::shutdown (void)
ACE_Event_Handler::DONT_CALL;
// Make sure there are no timers.
- this->reactor ()->cancel_timer( this );
+ this->reactor ()->cancel_timer (this);
// Remove self from reactor.
this->reactor ()->remove_handler (this, mask);
diff --git a/ace/Task.cpp b/ace/Task.cpp
index 3046b40a9c3..a1217fff683 100644
--- a/ace/Task.cpp
+++ b/ace/Task.cpp
@@ -232,11 +232,12 @@ ACE_Task_Base::svc_run (void *args)
/* NOTREACHED */
}
-// Forward the call to close() so that existing
-// applications don't break.
+// Forward the call to close() so that existing applications don't
+// break.
+
int
ACE_Task_Base::module_closed (void)
{
- return this->close (1);
+ return this->close (1);
}
diff --git a/apps/Gateway/Gateway/Concurrency_Strategies.h b/apps/Gateway/Gateway/Concurrency_Strategies.h
index e2fbc934c93..8d1b2979a49 100644
--- a/apps/Gateway/Gateway/Concurrency_Strategies.h
+++ b/apps/Gateway/Gateway/Concurrency_Strategies.h
@@ -29,7 +29,7 @@ typedef ACE_Null_Mutex MAP_MUTEX;
#else /* ACE_HAS_THREADS */
// Note that we only need to make the ACE_Task thread-safe if we are
-// using the multi-threaded Thr_Consumer_Handler...
+// using the multi-threaded Thr_Consumer_Proxy...
#if defined (USE_OUTPUT_MT)
#define SYNCH_STRATEGY ACE_MT_SYNCH
#else
@@ -37,7 +37,7 @@ typedef ACE_Null_Mutex MAP_MUTEX;
#endif /* USE_OUTPUT_MT || USE_INPUT_MT */
// Note that we only need to make the ACE_Map_Manager thread-safe if
-// we are using the multi-threaded Thr_Supplier_Handler. In this
+// we are using the multi-threaded Thr_Supplier_Proxy. In this
// case, we use an RW_Mutex since we'll lookup Consumers far more
// often than we'll update them.
#if defined (USE_INPUT_MT)
@@ -48,27 +48,27 @@ typedef ACE_Null_Mutex MAP_MUTEX;
#endif /* ACE_HAS_THREADS */
// = Forward decls
-class Thr_Consumer_Handler;
-class Thr_Supplier_Handler;
-class Consumer_Handler;
-class Supplier_Handler;
+class Thr_Consumer_Proxy;
+class Thr_Supplier_Proxy;
+class Consumer_Proxy;
+class Supplier_Proxy;
#if defined (ACE_HAS_THREADS) && (defined (USE_OUTPUT_MT) || defined (USE_INPUT_MT))
#if defined (USE_OUTPUT_MT)
-typedef Thr_Consumer_Handler CONSUMER_HANDLER;
+typedef Thr_Consumer_Proxy CONSUMER_HANDLER;
#else
-typedef Consumer_Handler CONSUMER_HANDLER;
+typedef Consumer_Proxy CONSUMER_HANDLER;
#endif /* USE_OUTPUT_MT */
#if defined (USE_INPUT_MT)
-typedef Thr_Supplier_Handler SUPPLIER_HANDLER;
+typedef Thr_Supplier_Proxy SUPPLIER_HANDLER;
#else
-typedef Supplier_Handler SUPPLIER_HANDLER;
+typedef Supplier_Proxy SUPPLIER_HANDLER;
#endif /* USE_INPUT_MT */
#else
// Instantiate a non-multi-threaded Gateway.
-typedef Supplier_Handler SUPPLIER_HANDLER;
-typedef Consumer_Handler CONSUMER_HANDLER;
+typedef Supplier_Proxy SUPPLIER_HANDLER;
+typedef Consumer_Proxy CONSUMER_HANDLER;
#endif /* ACE_HAS_THREADS */
#endif /* _CONCURRENCY_STRATEGIES */
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index 4c2648addf0..7e99902b0db 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -28,11 +28,11 @@ Consumer_Config_File_Parser::read_entry (Consumer_Config_File_Entry &entry,
}
// Get the logic id.
- if ((read_result = this->getint (entry.logical_id_)) != FP::SUCCESS)
+ if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS)
return read_result;
// Get the payload type.
- if ((read_result = this->getint (entry.payload_type_)) != FP::SUCCESS)
+ if ((read_result = this->getint (entry.type_)) != FP::SUCCESS)
return read_result;
// get all the destinations.
@@ -104,7 +104,7 @@ int main (int argc, char *argv[])
{
if (argc != 4) {
// ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1);
- cerr << argv[0] << " CCfilename RTfilename Mapfilename.\n";
+ cerr << argv[0] << " CCfilename filename Mapfilename.\n";
exit (1);
}
FP_RETURN_TYPE result;
@@ -130,30 +130,30 @@ int main (int argc, char *argv[])
}
CCfile.close();
- Consumer_Config_File_Entry RTentry;
- Consumer_Config_File_Parser RTfile;
+ Consumer_Config_File_Entry entry;
+ Consumer_Config_File_Parser file;
- RTfile.open (argv[2]);
+ file.open (argv[2]);
line_number = 0;
printf ("\nConnID\tLogic\tPayload\tDestinations\n");
// Read config file line at a time.
- while ((result = RTfile.read_entry (RTentry, line_number)) != EOF)
+ while ((result = file.read_entry (entry, line_number)) != EOF)
{
if (result != FP::SUCCESS)
cerr << "Error at line " << line_number << endl;
else
{
printf ("%d\t%d\t%d\t%d\t",
- RTentry.conn_id_, RTentry.logical_id_, RTentry.payload_type_);
- while (--RTentry.total_destinations_ >= 0)
- printf ("%d,", RTentry.destinations_[RTentry.total_destinations_]);
+ entry.conn_id_, entry.supplier_id_, entry.type_);
+ while (--entry.total_destinations_ >= 0)
+ printf ("%d,", entry.destinations_[entry.total_destinations_]);
printf ("\n");
}
}
- RTfile.close();
+ file.close();
return 0;
}
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index 145c3233bae..2620301e25b 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -22,11 +22,11 @@
class Connection_Config_File_Entry
// = TITLE
- // Stores the IO_Handler entry for connection configuration.
+ // Stores the Proxy_Handler entry for connection configuration.
{
public:
int conn_id_;
- // Connection id for this IO_Handler.
+ // Connection id for this Proxy_Handler.
char host_[BUFSIZ];
// Host to connect with.
@@ -46,7 +46,7 @@ public:
class Connection_Config_File_Parser : public File_Parser<Connection_Config_File_Entry>
// = TITLE
- // Parser for the IO_Handler Connection file.
+ // Parser for the Proxy_Handler Connection file.
{
public:
virtual FP::Return_Type
@@ -65,10 +65,10 @@ public:
int conn_id_;
// Connection id for this channel.
- int logical_id_;
+ int supplier_id_;
// Logical routing id for this channel.
- int payload_type_;
+ int type_;
// Type of payload in the message.
int destinations_[MAX_DESTINATIONS];
diff --git a/apps/Gateway/Gateway/Dispatch_Set.h b/apps/Gateway/Gateway/Dispatch_Set.h
new file mode 100644
index 00000000000..a867f1ca5ff
--- /dev/null
+++ b/apps/Gateway/Gateway/Dispatch_Set.h
@@ -0,0 +1,28 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Dispatch_Set.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_DISPATCH_SET)
+#define _DISPATCH_SET
+
+#include "ace/Set.h"
+
+// Forward reference.
+class Proxy_Handler;
+
+typedef ACE_Unbounded_Set<Proxy_Handler *> Dispatch_Set;
+typedef ACE_Unbounded_Set_Iterator<Proxy_Handler *> Dispatch_Set_Iterator;
+
+#endif /* _DISPATCH_SET */
diff --git a/apps/Gateway/Gateway/Event.h b/apps/Gateway/Gateway/Event.h
index a8a9374be3c..24881c3e85b 100644
--- a/apps/Gateway/Gateway/Event.h
+++ b/apps/Gateway/Gateway/Event.h
@@ -17,33 +17,45 @@
#if !defined (EVENT)
#define EVENT
+#include "ace/OS.h"
+
// This is the unique connection identifier that denotes a particular
-// IO_Handler in the Gateway.
-typedef short CONN_ID;
+// Proxy_Handler in the Gateway.
+typedef ACE_INT32 ACE_INT32;
class Event_Addr
// = TITLE
// Address used to identify the source/destination of an event.
+ //
+ // = DESCRIPTION
+ // This is really a "virtual forwarding address" thatis used to
+ // decouple the filtering and forwarding logic of the Event
+ // Channel from the format of the data.
{
public:
- Event_Addr (CONN_ID cid = -1, unsigned char lid = 0, unsigned char pay = 0)
- : conn_id_ (cid), logical_id_ (lid), payload_ (pay) {}
-
- int operator== (const Event_Addr &pa) const
+ Event_Addr (ACE_INT32 cid = -1,
+ u_char sid = 0,
+ u_char type = 0)
+ : conn_id_ (cid),
+ supplier_id_ (sid),
+ type_ (type) {}
+
+ int operator== (const Event_Addr &event_addr) const
{
- return this->conn_id_ == pa.conn_id_
- && this->logical_id_ == pa.logical_id_
- && this->payload_ == pa.payload_;
+ return this->conn_id_ == event_addr.conn_id_
+ && this->supplier_id_ == event_addr.supplier_id_
+ && this->type_ == event_addr.type_;
}
- CONN_ID conn_id_;
- // Unique connection identifier that denotes a particular IO_Handler.
+ ACE_INT32 conn_id_;
+ // Unique connection identifier that denotes a particular
+ // Proxy_Handler.
- unsigned char logical_id_;
+ ACE_INT32 supplier_id_;
// Logical ID.
- unsigned char payload_;
- // Payload type.
+ ACE_INT32 type_;
+ // Event type.
};
@@ -52,24 +64,27 @@ class Event_Header
// Fixed sized header.
{
public:
- typedef unsigned short SUPPLIER_ID;
- // Type used to route messages from gatewayd.
+ typedef ACE_INT32 SUPPLIER_ID;
+ // Type used to forward events from gatewayd.
enum
{
INVALID_ID = -1 // No peer can validly use this number.
};
- SUPPLIER_ID routing_id_;
+ SUPPLIER_ID supplier_id_;
// Source ID.
+ ACE_INT32 type_;
+ // Event type.
+
size_t len_;
- // Length of the message in bytes.
+ // Length of the entire event (including data payload) in bytes.
};
class Event
// = TITLE
- // Variable-sized message (buf_ may be variable-sized between
+ // Variable-sized event (data_ may be variable-sized between
// 0 and MAX_PAYLOAD_SIZE).
{
public:
@@ -77,10 +92,10 @@ public:
// The maximum size of an Event.
Event_Header header_;
- // Message header.
+ // Event header.
- char buf_[MAX_PAYLOAD_SIZE];
- // Message payload.
+ char data_[MAX_PAYLOAD_SIZE];
+ // Event data.
};
#endif /* EVENT */
diff --git a/apps/Gateway/Gateway/Event_Channel.cpp b/apps/Gateway/Gateway/Event_Channel.cpp
index 815755216c7..d146ddfb362 100644
--- a/apps/Gateway/Gateway/Event_Channel.cpp
+++ b/apps/Gateway/Gateway/Event_Channel.cpp
@@ -1,9 +1,10 @@
/* -*- C++ -*- */
// $Id$
+#define ACE_BUILD_SVC_DLL
#include "ace/Get_Opt.h"
#include "Config_Files.h"
-#include "IO_Handler_Connector.h"
+#include "Proxy_Handler_Connector.h"
#include "Event_Channel.h"
#if !defined (ACE_EVENT_CHANNEL_C)
@@ -46,18 +47,18 @@ ACE_Event_Channel<SH, CH>::handle_timeout (const ACE_Time_Value &,
size_t total_bytes_in = 0;
size_t total_bytes_out = 0;
- // Iterate through the consumer map connecting all the IO_Handlers.
+ // Iterate through the consumer map connecting all the Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
cti.next (me) != 0;
cti.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
- if (io_handler->direction () == 'C')
- total_bytes_out += io_handler->total_bytes ();
- else // io_handler->direction () == 'S'
- total_bytes_in += io_handler->total_bytes ();
+ if (proxy_handler->direction () == 'C')
+ total_bytes_out += proxy_handler->total_bytes ();
+ else // proxy_handler->direction () == 'S'
+ total_bytes_in += proxy_handler->total_bytes ();
}
#if defined (ACE_NLOGGING)
@@ -108,16 +109,16 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
else
synch_options = ACE_Synch_Options::synch;
- // Iterate through the Consumer Map connecting all the IO_Handlers.
+ // Iterate through the Consumer Map connecting all the Proxy_Handlers.
for (CONNECTION_MAP_ENTRY *me = 0;
cti.next (me) != 0;
cti.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
if (this->connector_->initiate_connection
- (io_handler, synch_options) == -1)
+ (proxy_handler, synch_options) == -1)
continue;
}
@@ -125,7 +126,7 @@ ACE_Event_Channel<SH, CH>::initiate_connections (void)
}
// This method gracefully shuts down all the Handlers in the
-// IO_Handler Connection Map.
+// Proxy_Handler Connection Map.
template <class SH, class CH> int
ACE_Event_Channel<SH, CH>::close (void)
@@ -136,26 +137,26 @@ ACE_Event_Channel<SH, CH>::close (void)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "suspend_all"), -1);
#endif /* USE_INPUT_MT || USE_OUTPUT_MT */
- CONNECTION_MAP_ITERATOR cti (this->connection_map_);
+ CONNECTION_MAP_ITERATOR cmi (this->connection_map_);
// Iterate over all the handlers and shut them down.
for (CONNECTION_MAP_ENTRY *me;
- cti.next (me) != 0;
- cti.advance ())
+ cmi.next (me) != 0;
+ cmi.advance ())
{
- IO_Handler *io_handler = me->int_id_;
+ Proxy_Handler *proxy_handler = me->int_id_;
ACE_DEBUG ((LM_DEBUG, "(%t) closing down route %d\n",
- io_handler->id ()));
+ proxy_handler->id ()));
- if (io_handler->state () != IO_Handler::IDLE)
- // Mark IO_Handler as DISCONNECTING so we don't try to
+ if (proxy_handler->state () != Proxy_Handler::IDLE)
+ // Mark Proxy_Handler as DISCONNECTING so we don't try to
// reconnect...
- io_handler->state (IO_Handler::DISCONNECTING);
+ proxy_handler->state (Proxy_Handler::DISCONNECTING);
- // Deallocate IO_Handler resources.
- io_handler->destroy (); // Will trigger a delete.
+ // Deallocate Proxy_Handler resources.
+ proxy_handler->destroy (); // Will trigger a delete.
}
// Free up the resources allocated dynamically by the ACE_Connector.
@@ -168,7 +169,10 @@ ACE_Event_Channel<SH, CH>::open (int argc, char *argv[])
{
this->parse_args (argc, argv);
- ACE_NEW_RETURN (this->connector_, IO_Handler_Connector (), -1);
+ ACE_NEW_RETURN (this->connector_, Proxy_Handler_Connector (), -1);
+
+ // Ignore SIPPIPE so each Consumer_Proxy can handle it.
+ ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
if (this->active_connector_role_)
{
@@ -226,38 +230,38 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
entry.max_retry_delay_,
entry.local_poconsumer_));
- IO_Handler *io_handler = 0;
+ Proxy_Handler *proxy_handler = 0;
// The next few lines of code are dependent on whether we are
- // making an Supplier_Handler or an Consumer_Handler.
+ // making an Supplier_Proxy or an Consumer_Proxy.
- if (entry.direction_ == 'C') // Configure a Consumer_Handler.
- ACE_NEW_RETURN (io_handler,
- CONSUMER_HANDLER (&this->consumer_map_,
+ if (entry.direction_ == 'C') // Configure a Consumer_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ CONSUMER_HANDLER (&this->efd_,
this->connector_,
ACE_Service_Config::thr_mgr (),
this->socket_queue_size_),
-1);
- else /* direction == 'S' */ // Configure a Supplier_Handler.
- ACE_NEW_RETURN (io_handler,
- SUPPLIER_HANDLER (&this->consumer_map_,
+ else /* direction == 'S' */ // Configure a Supplier_Proxy.
+ ACE_NEW_RETURN (proxy_handler,
+ SUPPLIER_HANDLER (&this->efd_,
this->connector_,
ACE_Service_Config::thr_mgr (),
this->socket_queue_size_),
-1);
- // The following code is common to both Supplier_Handlers_ and
- // Consumer_Handlers.
+ // The following code is common to both Supplier_Proxys_ and
+ // Consumer_Proxys.
// Initialize the routing entry's peer addressing info.
- io_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
+ proxy_handler->bind (ACE_INET_Addr (entry.remote_poconsumer_, entry.host_),
ACE_INET_Addr (entry.local_poconsumer_), entry.conn_id_);
// Initialize max timeout.
- io_handler->max_timeout (entry.max_retry_delay_);
+ proxy_handler->max_timeout (entry.max_retry_delay_);
- // Try to bind the new IO_Handler to the connection ID.
- switch (this->connection_map_.bind (entry.conn_id_, io_handler))
+ // Try to bind the new Proxy_Handler to the connection ID.
+ switch (this->connection_map_.bind (entry.conn_id_, proxy_handler))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR,
@@ -277,7 +281,7 @@ ACE_Event_Channel<SH, CH>::parse_connection_config_file (void)
if (file_empty)
ACE_ERROR ((LM_WARNING,
- "warning: connection io_handler configuration file was empty\n"));
+ "warning: connection proxy_handler configuration file was empty\n"));
return 0;
}
@@ -303,43 +307,37 @@ ACE_Event_Channel<SH, CH>::parse_consumer_config_file (void)
ACE_DEBUG ((LM_DEBUG, "(%t) conn id = %d, logical id = %d, payload = %d, "
"number of destinations = %d\n",
entry.conn_id_,
- entry.logical_id_,
- entry.payload_type_,
+ entry.supplier_id_,
+ entry.type_,
entry.total_destinations_));
for (int i = 0; i < entry.total_destinations_; i++)
ACE_DEBUG ((LM_DEBUG, "(%t) destination[%d] = %d\n",
i, entry.destinations_[i]));
}
- Consumer_Entry *re;
- ACE_NEW_RETURN (re, Consumer_Entry, -1);
-
- Consumer_Entry::ENTRY_SET *io_handler_set;
- ACE_NEW_RETURN (io_handler_set, Consumer_Entry::ENTRY_SET, -1);
+ Dispatch_Set *dispatch_set;
+ ACE_NEW_RETURN (dispatch_set, Dispatch_Set, -1);
Event_Addr event_addr (entry.conn_id_,
- entry.logical_id_,
- entry.payload_type_);
+ entry.supplier_id_,
+ entry.type_);
// Add the destinations to the Routing Entry.
for (int i = 0; i < entry.total_destinations_; i++)
{
- IO_Handler *io_handler = 0;
+ Proxy_Handler *proxy_handler = 0;
- // Lookup destination and add to Consumer_Entry set if found.
+ // Lookup destination and add to Dispatch_Set set if found.
if (this->connection_map_.find (entry.destinations_[i],
- io_handler) != -1)
- io_handler_set->insert (io_handler);
+ proxy_handler) != -1)
+ dispatch_set->insert (proxy_handler);
else
ACE_ERROR ((LM_ERROR, "(%t) not found: destination[%d] = %d\n",
i, entry.destinations_[i]));
}
- // Attach set of destination io_handlers to routing entry.
- re->destinations (io_handler_set);
-
// Bind with consumer map, keyed by peer address.
- switch (this->consumer_map_.bind (event_addr, re))
+ switch (this->efd_.bind (event_addr, dispatch_set))
{
case -1:
ACE_ERROR_RETURN ((LM_ERROR, "(%t) bind failed for connection %d\n",
diff --git a/apps/Gateway/Gateway/Event_Channel.h b/apps/Gateway/Gateway/Event_Channel.h
index 0cb928d7884..4e7afc5d328 100644
--- a/apps/Gateway/Gateway/Event_Channel.h
+++ b/apps/Gateway/Gateway/Event_Channel.h
@@ -17,7 +17,7 @@
#if !defined (ACE_EVENT_CHANNEL)
#define ACE_EVENT_CHANNEL
-#include "IO_Handler_Connector.h"
+#include "Proxy_Handler_Connector.h"
template <class SUPPLIER_HANDLER, class CONSUMER_HANDLER>
class ACE_Svc_Export ACE_Event_Channel : public ACE_Event_Handler
@@ -70,22 +70,22 @@ private:
int debug_;
// Are we debugging?
- IO_Handler_Connector *connector_;
+ Proxy_Handler_Connector *connector_;
// This is used to establish the connections actively.
int socket_queue_size_;
// Size of the socket queue (0 means "use default").
// = Make life easier by defining typedefs.
- typedef ACE_Map_Manager<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP;
- typedef ACE_Map_Iterator<CONN_ID, IO_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
- typedef ACE_Map_Entry<CONN_ID, IO_Handler *> CONNECTION_MAP_ENTRY;
+ typedef ACE_Map_Manager<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP;
+ typedef ACE_Map_Iterator<ACE_INT32, Proxy_Handler *, MAP_MUTEX> CONNECTION_MAP_ITERATOR;
+ typedef ACE_Map_Entry<ACE_INT32, Proxy_Handler *> CONNECTION_MAP_ENTRY;
CONNECTION_MAP connection_map_;
- // Table that maps Connection IDs to IO_Handler *'s.
+ // Table that maps Connection IDs to Proxy_Handler *'s.
- Consumer_Map consumer_map_;
- // Map that associates event addresses to a set of Consumer_Handler
+ Event_Forwarding_Discriminator efd_;
+ // Map that associates event addresses to a set of Consumer_Proxy
// *'s.
};
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
new file mode 100644
index 00000000000..8261ea13eb2
--- /dev/null
+++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.cpp
@@ -0,0 +1,61 @@
+/* -*- C++ -*- */
+// $Id$
+
+#if !defined (_CONSUMER_MAP_C)
+#define _CONSUMER_MAP_C
+
+#include "Event_Forwarding_Discriminator.h"
+
+// Bind the Event_Addr to the INT_ID.
+
+int
+Event_Forwarding_Discriminator::bind (Event_Addr event_addr,
+ Dispatch_Set *Dispatch_Set)
+{
+ return this->map_.bind (event_addr, Dispatch_Set);
+}
+
+// Find the Dispatch_Set corresponding to the Event_Addr.
+
+int
+Event_Forwarding_Discriminator::find (Event_Addr event_addr,
+ Dispatch_Set *&Dispatch_Set)
+{
+ return this->map_.find (event_addr, Dispatch_Set);
+}
+
+// Unbind (remove) the Event_Addr from the map.
+
+int
+Event_Forwarding_Discriminator::unbind (Event_Addr event_addr)
+{
+ return this->map_.unbind (event_addr);
+}
+
+Event_Forwarding_Discriminator_Iterator::Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &rt)
+ : map_iter_ (rt.map_)
+{
+}
+
+int
+Event_Forwarding_Discriminator_Iterator::next (Dispatch_Set *&ss)
+{
+ // Loop in order to skip over inactive entries if necessary.
+
+ for (ACE_Map_Entry<Event_Addr, Dispatch_Set *> *temp = 0;
+ this->map_iter_.next (temp) != 0;
+ this->advance ())
+ {
+ // Otherwise, return the next item.
+ ss = temp->int_id_;
+ return 1;
+ }
+ return 0;
+}
+
+int
+Event_Forwarding_Discriminator_Iterator::advance (void)
+{
+ return this->map_iter_.advance ();
+}
+#endif /* _CONSUMER_MAP_C */
diff --git a/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
new file mode 100644
index 00000000000..35a594b61b5
--- /dev/null
+++ b/apps/Gateway/Gateway/Event_Forwarding_Discriminator.h
@@ -0,0 +1,62 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Event_Forwarding_Discriminator.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_CONSUMER_MAP_H)
+#define _CONSUMER_MAP_H
+
+#include "ace/Map_Manager.h"
+#include "Concurrency_Strategies.h"
+#include "Event.h"
+#include "Dispatch_Set.h"
+
+class Event_Forwarding_Discriminator
+{
+ // = TITLE
+ // Define a generic consumer map based on the ACE Map_Manager.
+ //
+ // = DESCRIPTION
+ // This class makes it easier to use the Map_Manager.
+public:
+ int bind (Event_Addr event, Dispatch_Set *Dispatch_Set);
+ // Associate Event with the Dispatch_Set.
+
+ int find (Event_Addr event, Dispatch_Set *&Dispatch_Set);
+ // Break any association of EXID.
+
+ int unbind (Event_Addr event);
+ // Locate EXID and pass out parameter via INID. If found,
+ // return 0, else -1.
+
+public:
+ ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_;
+ // Map that associates Event Addrs (external ids) with Dispatch_Set *'s
+ // <internal IDs>.
+};
+
+class Event_Forwarding_Discriminator_Iterator
+{
+ // = TITLE
+ // Define an iterator for the Consumer Map.
+public:
+ Event_Forwarding_Discriminator_Iterator (Event_Forwarding_Discriminator &mm);
+ int next (Dispatch_Set *&);
+ int advance (void);
+
+private:
+ ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX> map_iter_;
+ // Map we are iterating over.
+};
+#endif /* _CONSUMER_MAP_H */
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
index 776d1b2f338..80b768aff84 100644
--- a/apps/Gateway/Gateway/File_Parser.h
+++ b/apps/Gateway/Gateway/File_Parser.h
@@ -57,7 +57,6 @@ protected:
FP::Return_Type readword (char buf[]);
int delimiter (char ch);
- int endofline (char ch);
int comments (char ch);
int skipline (void);
diff --git a/apps/Gateway/Gateway/Gateway.cpp b/apps/Gateway/Gateway/Gateway.cpp
index 82666406070..2c963ff3d7f 100644
--- a/apps/Gateway/Gateway/Gateway.cpp
+++ b/apps/Gateway/Gateway/Gateway.cpp
@@ -6,11 +6,14 @@
#include "Gateway.h"
class Gateway : public ACE_Service_Object
+ // = TITLE
+ // Integrates the whole Gateway application.
+ //
+ // = DESCRIPTION
+ // This implementation uses the <ACE_Event_Channel> as the basis
+ // for the <Gateway> routing.
{
public:
- // = Initialization method.
- Gateway (void);
-
// = Service configurator hooks.
virtual int init (int argc, char *argv[]);
// Perform initialization.
@@ -34,6 +37,7 @@ protected:
// file.
ACE_Event_Channel<SUPPLIER_HANDLER, CONSUMER_HANDLER> event_channel_;
+ // The Event Channel routes events from Supplier(s) to Consumer(s).
};
// Convenient shorthands.
@@ -46,8 +50,6 @@ Gateway::handle_signal (int signum, siginfo_t *, ucontext_t *)
if (signum > 0)
ACE_DEBUG ((LM_DEBUG, "(%t) %S\n", signum));
- this->event_channel_.close ();
-
// Shut down the main event loop.
ACE_Service_Config::end_reactor_event_loop ();
return 0;
@@ -68,22 +70,12 @@ Gateway::handle_input (ACE_HANDLE h)
return this->handle_signal (h);
}
-// Give default values to data members.
-
-
-Gateway::Gateway (void)
-{
-}
-
int
Gateway::init (int argc, char *argv[])
{
if (this->event_channel_.open (argc, argv) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "open"), -1);
- // Ignore SIPPIPE so each Consumer_Handler can handle it.
- ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
-
ACE_Sig_Set sig_set;
sig_set.sig_add (SIGINT);
sig_set.sig_add (SIGQUIT);
@@ -91,13 +83,12 @@ Gateway::init (int argc, char *argv[])
// Register ourselves to receive SIGINT and SIGQUIT
// so we can shut down gracefully via signals.
- if (ACE_Service_Config::reactor ()->register_handler (sig_set,
- this) == -1)
+ if (ACE_Service_Config::reactor ()->register_handler
+ (sig_set, this) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
- if (ACE_Service_Config::reactor ()->register_handler (0,
- this,
- ACE_Event_Handler::READ_MASK) == -1)
+ if (ACE_Service_Config::reactor ()->register_handler
+ (0, this, ACE_Event_Handler::READ_MASK) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
return 0;
}
diff --git a/apps/Gateway/Gateway/Makefile b/apps/Gateway/Gateway/Makefile
index f81df3d73af..0f5ddc07eb0 100644
--- a/apps/Gateway/Gateway/Makefile
+++ b/apps/Gateway/Gateway/Makefile
@@ -1,7 +1,7 @@
#----------------------------------------------------------------------------
# @(#)Makefile 1.1 10/18/96
#
-# Makefile for the Gateway prototype.
+# Makefile for the Gateway.
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
@@ -12,15 +12,14 @@ BIN = gatewayd
LIB = libGateway.a
SHLIB = libGateway.so
-FILES = Event_Channel \
- IO_Handler \
- IO_Handler_Connector \
- Config_Files \
+FILES = Config_Files \
File_Parser \
Gateway \
- Consumer_Entry \
- Consumer_Map \
- Thr_IO_Handler
+ Event_Channel \
+ Event_Forwarding_Discriminator \
+ Proxy_Handler \
+ Proxy_Handler_Connector \
+ Thr_Proxy_Handler
LSRC = $(addsuffix .cpp,$(FILES))
LOBJ = $(addsuffix .o,$(FILES))
@@ -52,7 +51,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# Default behavior is to use single-threading. See the README
# file for information on how to configure this with multiple
# strategies for threading the input and output channels.
-DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
+# DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
#----------------------------------------------------------------------------
# Dependencies
@@ -61,9 +60,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
# DO NOT DELETE THIS LINE -- g++dep uses it.
# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
- $(WRAPPER_ROOT)/ace/Get_Opt.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
+.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
$(WRAPPER_ROOT)/ace/config.h \
@@ -72,13 +69,38 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Msg.h \
$(WRAPPER_ROOT)/ace/Log_Record.h \
$(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/ACE.i \
$(WRAPPER_ROOT)/ace/Log_Record.i \
+ Config_Files.h File_Parser.h
+.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/stdcpp.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/ACE.i \
- Config_Files.h File_Parser.h IO_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ File_Parser.h
+.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/stdcpp.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/ACE.i \
$(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
@@ -119,19 +141,21 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
+ Event_Channel.h Proxy_Handler_Connector.h \
+ $(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Connector.i \
- Thr_IO_Handler.h IO_Handler.h \
+ Thr_Proxy_Handler.h Proxy_Handler.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h \
- Event_Channel.h
-.obj/IO_Handler.o .shobj/IO_Handler.so: IO_Handler.cpp Consumer_Entry.h \
- $(WRAPPER_ROOT)/ace/Set.h \
+ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
+ Dispatch_Set.h Gateway.h
+.obj/Event_Channel.o .shobj/Event_Channel.so: Event_Channel.cpp \
+ $(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -143,7 +167,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- IO_Handler_Connector.h \
+ Config_Files.h File_Parser.h Proxy_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
@@ -158,6 +182,7 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
$(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
$(WRAPPER_ROOT)/ace/Timer_Queue.h \
@@ -193,16 +218,14 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Connector.i \
- Thr_IO_Handler.h IO_Handler.h \
+ Thr_Proxy_Handler.h Proxy_Handler.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Consumer_Map.h Concurrency_Strategies.h Event.h
-.obj/IO_Handler_Connector.o .shobj/IO_Handler_Connector.so: IO_Handler_Connector.cpp \
- IO_Handler_Connector.h \
- $(WRAPPER_ROOT)/ace/Connector.h \
- $(WRAPPER_ROOT)/ace/Service_Config.h \
- $(WRAPPER_ROOT)/ace/Service_Object.h \
- $(WRAPPER_ROOT)/ace/Shared_Object.h \
+ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
+ Dispatch_Set.h Event_Channel.h
+.obj/Event_Forwarding_Discriminator.o .shobj/Event_Forwarding_Discriminator.so: Event_Forwarding_Discriminator.cpp \
+ Event_Forwarding_Discriminator.h \
+ $(WRAPPER_ROOT)/ace/Map_Manager.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
$(WRAPPER_ROOT)/ace/Time_Value.h \
@@ -214,6 +237,34 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
+ Concurrency_Strategies.h \
+ $(WRAPPER_ROOT)/ace/Synch.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.h \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
+ $(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
+ $(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Event_Handler.h \
+ Event.h Dispatch_Set.h \
+ $(WRAPPER_ROOT)/ace/Set.h
+.obj/Proxy_Handler.o .shobj/Proxy_Handler.so: Proxy_Handler.cpp Dispatch_Set.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/ACE.h \
+ $(WRAPPER_ROOT)/ace/OS.h \
+ $(WRAPPER_ROOT)/ace/Time_Value.h \
+ $(WRAPPER_ROOT)/ace/config.h \
+ $(WRAPPER_ROOT)/ace/stdcpp.h \
+ $(WRAPPER_ROOT)/ace/Trace.h \
+ $(WRAPPER_ROOT)/ace/Log_Msg.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.h \
+ $(WRAPPER_ROOT)/ace/Log_Priority.h \
+ $(WRAPPER_ROOT)/ace/Log_Record.i \
+ $(WRAPPER_ROOT)/ace/ACE.i \
+ Proxy_Handler_Connector.h \
+ $(WRAPPER_ROOT)/ace/Connector.h \
+ $(WRAPPER_ROOT)/ace/Service_Config.h \
+ $(WRAPPER_ROOT)/ace/Service_Object.h \
+ $(WRAPPER_ROOT)/ace/Shared_Object.h \
$(WRAPPER_ROOT)/ace/Event_Handler.h \
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
@@ -224,7 +275,6 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
$(WRAPPER_ROOT)/ace/Timer_Queue.h \
@@ -260,37 +310,13 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
$(WRAPPER_ROOT)/ace/Connector.i \
- Thr_IO_Handler.h IO_Handler.h \
+ Thr_Proxy_Handler.h Proxy_Handler.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.i \
- Consumer_Map.h Concurrency_Strategies.h Event.h Consumer_Entry.h
-.obj/Config_Files.o .shobj/Config_Files.so: Config_Files.cpp \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- Config_Files.h File_Parser.h
-.obj/File_Parser.o .shobj/File_Parser.so: File_Parser.cpp \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/ACE.i \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- File_Parser.h
-.obj/Gateway.o .shobj/Gateway.so: Gateway.cpp \
+ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h
+.obj/Proxy_Handler_Connector.o .shobj/Proxy_Handler_Connector.so: Proxy_Handler_Connector.cpp \
+ Proxy_Handler_Connector.h \
+ $(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -345,36 +371,19 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
- Gateway.h
-.obj/Consumer_Entry.o .shobj/Consumer_Entry.so: Consumer_Entry.cpp Consumer_Entry.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i
-.obj/Consumer_Map.o .shobj/Consumer_Map.so: Consumer_Map.cpp Consumer_Map.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- $(WRAPPER_ROOT)/ace/ACE.h \
- $(WRAPPER_ROOT)/ace/OS.h \
- $(WRAPPER_ROOT)/ace/Time_Value.h \
- $(WRAPPER_ROOT)/ace/config.h \
- $(WRAPPER_ROOT)/ace/stdcpp.h \
- $(WRAPPER_ROOT)/ace/Trace.h \
- $(WRAPPER_ROOT)/ace/Log_Msg.h \
- $(WRAPPER_ROOT)/ace/Log_Record.h \
- $(WRAPPER_ROOT)/ace/Log_Priority.h \
- $(WRAPPER_ROOT)/ace/Log_Record.i \
- $(WRAPPER_ROOT)/ace/ACE.i \
- Concurrency_Strategies.h Event.h Consumer_Entry.h \
- $(WRAPPER_ROOT)/ace/Set.h
-.obj/Thr_IO_Handler.o .shobj/Thr_IO_Handler.so: Thr_IO_Handler.cpp Thr_IO_Handler.h IO_Handler.h \
+ $(WRAPPER_ROOT)/ace/Svc_Handler.h \
+ $(WRAPPER_ROOT)/ace/Synch_Options.h \
+ $(WRAPPER_ROOT)/ace/Task.h \
+ $(WRAPPER_ROOT)/ace/Task_T.h \
+ $(WRAPPER_ROOT)/ace/Connector.i \
+ Thr_Proxy_Handler.h Proxy_Handler.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.h \
+ $(WRAPPER_ROOT)/ace/SOCK_Connector.i \
+ Event_Forwarding_Discriminator.h Concurrency_Strategies.h Event.h \
+ Dispatch_Set.h
+.obj/Thr_Proxy_Handler.o .shobj/Thr_Proxy_Handler.so: Thr_Proxy_Handler.cpp Thr_Proxy_Handler.h \
+ Proxy_Handler.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
@@ -435,10 +444,10 @@ DEFFLAGS += -DASSIGN_SUPPLIER_ID # -DUSE_OUTPUT_MT -DUSE_INPUT_MT
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- Consumer_Map.h \
+ Event_Forwarding_Discriminator.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
- Concurrency_Strategies.h Event.h Consumer_Entry.h \
- IO_Handler_Connector.h \
+ Concurrency_Strategies.h Event.h Dispatch_Set.h \
+ Proxy_Handler_Connector.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Connector.i
diff --git a/apps/Gateway/Gateway/Proxy_Handler.cpp b/apps/Gateway/Gateway/Proxy_Handler.cpp
new file mode 100644
index 00000000000..86e0fff8e41
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler.cpp
@@ -0,0 +1,698 @@
+// $Id$
+
+#include "Dispatch_Set.h"
+#include "Proxy_Handler_Connector.h"
+
+// Convenient short-hands.
+#define CO CONDITION
+#define MU MAP_MUTEX
+
+// The total number of bytes sent/received on this Proxy.
+
+size_t
+Proxy_Handler::total_bytes (void)
+{
+ return this->total_bytes_;
+}
+
+void
+Proxy_Handler::total_bytes (size_t bytes)
+{
+ this->total_bytes_ += bytes;
+}
+
+Proxy_Handler::Proxy_Handler (Event_Forwarding_Discriminator *efd,
+ Proxy_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY> (thr_mgr),
+ efd_ (efd),
+ id_ (-1),
+ total_bytes_ (0),
+ state_ (Proxy_Handler::IDLE),
+ connector_ (ioc),
+ timeout_ (1),
+ max_timeout_ (Proxy_Handler::MAX_RETRY_TIMEOUT),
+ socket_queue_size_ (socket_queue_size)
+{
+}
+
+// Set the direction.
+
+void
+Proxy_Handler::direction (char d)
+{
+ this->direction_ = d;
+}
+
+// Get the direction.
+
+char
+Proxy_Handler::direction (void)
+{
+ return this->direction_;
+}
+
+// Sets the timeout delay.
+
+void
+Proxy_Handler::timeout (int to)
+{
+ if (to > this->max_timeout_)
+ to = this->max_timeout_;
+
+ this->timeout_ = to;
+}
+
+// Recalculate the current retry timeout delay using exponential
+// backoff. Returns the original timeout (i.e., before the
+// recalculation).
+
+int
+Proxy_Handler::timeout (void)
+{
+ int old_timeout = this->timeout_;
+ this->timeout_ *= 2;
+
+ if (this->timeout_ > this->max_timeout_)
+ this->timeout_ = this->max_timeout_;
+
+ return old_timeout;
+}
+
+// Sets the max timeout delay.
+
+void
+Proxy_Handler::max_timeout (int mto)
+{
+ this->max_timeout_ = mto;
+}
+
+// Gets the max timeout delay.
+
+int
+Proxy_Handler::max_timeout (void)
+{
+ return this->max_timeout_;
+}
+
+// Restart connection asynchronously when timeout occurs.
+
+int
+Proxy_Handler::handle_timeout (const ACE_Time_Value &, const void *)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) attempting to reconnect Proxy_Handler %d with timeout = %d\n",
+ this->id (), this->timeout_));
+ return this->connector_->initiate_connection (this, ACE_Synch_Options::asynch);
+}
+
+// Restart connection (blocking_semantics dicates whether we
+// restart synchronously or asynchronously).
+
+int
+Proxy_Handler::reinitiate_connection (void)
+{
+ // Skip over deactivated descriptors.
+ if (this->get_handle () != ACE_INVALID_HANDLE)
+ {
+ // Make sure to close down peer to reclaim descriptor.
+ this->peer ().close ();
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) scheduling reinitiation of Proxy_Handler %d\n",
+ this->id ()));
+
+ // Reschedule ourselves to try and connect again.
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (this, 0, this->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ return 0;
+}
+
+// Handle shutdown of the Proxy_Handler object.
+
+int
+Proxy_Handler::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
+{
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down Proxy_Handler %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ return this->reinitiate_connection ();
+}
+
+// Set the state of the Proxy.
+
+void
+Proxy_Handler::state (Proxy_Handler::State s)
+{
+ this->state_ = s;
+}
+
+// Perform the first-time initiation of a connection to the peer.
+
+int
+Proxy_Handler::initialize_connection (void)
+{
+ this->state_ = Proxy_Handler::ESTABLISHED;
+
+ // Restart the timeout to 1.
+ this->timeout (1);
+
+ // Action that sends the connection id to the peerd.
+
+ ACE_INT32 id = htonl (this->id ());
+
+ ssize_t n = this->peer ().send ((const void *) &id, sizeof id);
+
+ if (n != sizeof id)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ n == 0 ? "peer has closed down unexpectedly" : "send"),
+ -1);
+ return 0;
+}
+
+// Set the size of the socket queue.
+
+void
+Proxy_Handler::socket_queue_size (void)
+{
+ if (this->socket_queue_size_ > 0)
+ {
+ int option = this->direction_ == 'S' ? SO_RCVBUF : SO_SNDBUF;
+
+ if (this->peer ().set_option (SOL_SOCKET, option,
+ &this->socket_queue_size_,
+ sizeof (int)) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "set_option"));
+ }
+}
+
+// Upcall from the ACE_Acceptor::handle_input() that
+// delegates control to our application-specific Proxy_Handler.
+
+int
+Proxy_Handler::open (void *a)
+{
+ ACE_DEBUG ((LM_DEBUG, "(%t) Proxy_Handler's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn on non-blocking I/O.
+ if (this->peer ().enable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Call down to the base class to activate and register this handler.
+ if (this->ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>::open (a) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "activate"), -1);
+
+ return this->initialize_connection ();
+}
+
+// Return the current state of the Proxy.
+
+Proxy_Handler::State
+Proxy_Handler::state (void)
+{
+ return this->state_;
+}
+
+void
+Proxy_Handler::id (ACE_INT32 id)
+{
+ this->id_ = id;
+}
+
+ACE_INT32
+Proxy_Handler::id (void)
+{
+ return this->id_;
+}
+
+// Set the peer's address information.
+int
+Proxy_Handler::bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32 id)
+{
+ this->remote_addr_ = remote_addr;
+ this->local_addr_ = local_addr;
+ this->id_ = id;
+ return 0;
+}
+
+ACE_INET_Addr &
+Proxy_Handler::remote_addr (void)
+{
+ return this->remote_addr_;
+}
+
+ACE_INET_Addr &
+Proxy_Handler::local_addr (void)
+{
+ return this->local_addr_;
+}
+
+// Constructor sets the consumer map pointer.
+
+Consumer_Proxy::Consumer_Proxy (Event_Forwarding_Discriminator *efd,
+ Proxy_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'C';
+ this->msg_queue ()->high_water_mark (Consumer_Proxy::MAX_QUEUE_SIZE);
+}
+
+// This method should be called only when the Consumer shuts down
+// unexpectedly. This method simply marks the Proxy_Handler as having
+// failed so that handle_close () can reconnect.
+
+int
+Consumer_Proxy::handle_input (ACE_HANDLE)
+{
+ char buf[1];
+
+ this->state (Proxy_Handler::FAILED);
+
+ switch (this->peer ().recv (buf, sizeof buf))
+ {
+ case -1:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has failed unexpectedly for Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case 0:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has shutdown unexpectedly for Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ default:
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Consumer is erroneously sending input to Consumer_Proxy %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ }
+}
+
+// Perform a non-blocking put() of event. If we are unable to send
+// the entire event the remainder is re-queued at the *front* of the
+// Event_List.
+
+int
+Consumer_Proxy::nonblk_put (ACE_Message_Block *event)
+{
+ // Try to send the event. If we don't send it all (e.g., due to
+ // flow control), then re-queue the remainder at the head of the
+ // Event_List and ask the ACE_Reactor to inform us (via
+ // handle_output()) when it is possible to try again.
+
+ ssize_t n = this->send (event);
+
+ if (n == -1)
+ {
+ // Things have gone wrong, let's try to close down and set up a
+ // new reconnection by calling handle_close().
+ this->state (Proxy_Handler::FAILED);
+ this->handle_close ();
+ return -1;
+ }
+ else if (errno == EWOULDBLOCK) // Didn't manage to send everything.
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) queueing activated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+ // ACE_Queue in *front* of the list to preserve order.
+ if (this->msg_queue ()->enqueue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enqueue_head"), -1);
+
+ // Tell ACE_Reactor to call us back when we can send again.
+ else if (ACE_Service_Config::reactor ()->schedule_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "schedule_wakeup"), -1);
+ return 0;
+ }
+ else
+ return n;
+}
+
+ssize_t
+Consumer_Proxy::send (ACE_Message_Block *event)
+{
+ ssize_t len = event->length ();
+ ssize_t n = this->peer ().send (event->rd_ptr (), len);
+
+ if (n <= 0)
+ return errno == EWOULDBLOCK ? 0 : n;
+ else if (n < len)
+ // Re-adjust pointer to skip over the part we did send.
+ event->rd_ptr (n);
+ else /* if (n == length) */
+ {
+ // The whole event is sent, we can now safely deallocate the
+ // buffer. Note that this should decrement a reference count...
+ delete event;
+ errno = 0;
+ }
+ this->total_bytes (n);
+ return n;
+}
+
+// Finish sending an event when flow control conditions abate.
+// This method is automatically called by the ACE_Reactor.
+
+int
+Consumer_Proxy::handle_output (ACE_HANDLE)
+{
+ ACE_Message_Block *event = 0;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in handle_output on handle %d\n",
+ this->get_handle ()));
+ // The list had better not be empty, otherwise there's a bug!
+
+ if (this->msg_queue ()->dequeue_head
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero) != -1)
+ {
+ switch (this->nonblk_put (event))
+ {
+ case 0: // Partial send.
+ ACE_ASSERT (errno == EWOULDBLOCK);
+ // Didn't write everything this time, come back later...
+ break;
+
+ case -1:
+ // We are responsible for freeing an ACE_Message_Block if
+ // failures occur.
+ delete event;
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "transmission failure"));
+
+ /* FALLTHROUGH */
+ default: // Sent the whole thing.
+
+ // If we succeed in writing the entire event (or we did not
+ // fail due to EWOULDBLOCK) then check if there are more
+ // events on the Message_Queue. If there aren't, tell the
+ // ACE_Reactor not to notify us anymore (at least until
+ // there are new events queued up).
+
+ if (this->msg_queue ()->is_empty ())
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) queueing deactivated on handle %d to routing id %d\n",
+ this->get_handle (), this->id ()));
+
+
+ if (ACE_Service_Config::reactor ()->cancel_wakeup
+ (this, ACE_Event_Handler::WRITE_MASK) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "cancel_wakeup"));
+ }
+ }
+ }
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "dequeue_head"));
+ return 0;
+}
+
+// Send an event to a Consumer (may queue if necessary).
+
+int
+Consumer_Proxy::put (ACE_Message_Block *event, ACE_Time_Value *)
+{
+ if (this->msg_queue ()->is_empty ())
+ // Try to send the event *without* blocking!
+ return this->nonblk_put (event);
+ else
+ // If we have queued up events due to flow control then just
+ // enqueue and return.
+ return this->msg_queue ()->enqueue_tail
+ (event, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Constructor sets the consumer map pointer and the connector
+// pointer.
+
+Supplier_Proxy::Supplier_Proxy (Event_Forwarding_Discriminator *efd,
+ Proxy_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : msg_frag_ (0),
+ Proxy_Handler (efd, ioc, thr_mgr, socket_queue_size)
+{
+ this->direction_ = 'S';
+ this->msg_queue ()->high_water_mark (0);
+}
+
+// Receive an Event from a Supplier. Handles fragmentation.
+//
+// The event returned from recv consists of two parts:
+//
+// 1. The Address part, contains the "virtual" routing id.
+//
+// 2. The Data part, which contains the actual data to be forwarded.
+//
+// The reason for having two parts is to shield the higher layers
+// of software from knowledge of the event structure.
+
+int
+Supplier_Proxy::recv (ACE_Message_Block *&forward_addr)
+{
+ if (this->msg_frag_ == 0)
+ // No existing fragment...
+ ACE_NEW_RETURN (this->msg_frag_,
+ ACE_Message_Block (sizeof (Event)),
+ -1);
+
+ Event *event = (Event *) this->msg_frag_->rd_ptr ();
+ ssize_t header_received = 0;
+
+ const ssize_t HEADER_SIZE = sizeof (Event_Header);
+ ssize_t header_bytes_left_to_read =
+ HEADER_SIZE - this->msg_frag_->length ();
+
+ if (header_bytes_left_to_read > 0)
+ {
+ header_received = this->peer ().recv
+ (this->msg_frag_->wr_ptr (), header_bytes_left_to_read);
+
+ if (header_received == -1 /* error */
+ || header_received == 0 /* EOF */)
+ {
+ ACE_ERROR ((LM_ERROR, "%p\n",
+ "Recv error during header read "));
+ ACE_DEBUG ((LM_DEBUG,
+ "attempted to read %d\n",
+ header_bytes_left_to_read));
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return header_received;
+ }
+
+ // Bump the write pointer by the amount read.
+ this->msg_frag_->wr_ptr (header_received);
+
+ // At this point we may or may not have the ENTIRE header.
+ if (this->msg_frag_->length () < HEADER_SIZE)
+ {
+ ACE_DEBUG ((LM_DEBUG,
+ "Partial header received: only %d bytes\n",
+ this->msg_frag_->length ()));
+ // Notify the caller that we didn't get an entire event.
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ }
+
+ // At this point there is a complete, valid header in msg_frag_
+ ssize_t data_bytes_left_to_read =
+ sizeof (Event) - this->msg_frag_->length ();
+
+ ssize_t data_received =
+ this->peer ().recv (this->msg_frag_->wr_ptr (), data_bytes_left_to_read);
+
+ // Try to receive the remainder of the event.
+
+ switch (data_received)
+ {
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // This might happen if only the header came through.
+ return -1;
+ else
+ /* FALLTHROUGH */;
+
+ case 0: // Premature EOF.
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ return 0;
+
+ default:
+ // Set the write pointer at 1 past the end of the event.
+ this->msg_frag_->wr_ptr (data_received);
+
+ if (data_received != data_bytes_left_to_read)
+ {
+ errno = EWOULDBLOCK;
+ // Inform caller that we didn't get the whole event.
+ return -1;
+ }
+ else
+ {
+ // Set the read pointer to the beginning of the event.
+ this->msg_frag_->rd_ptr (this->msg_frag_->base ());
+
+ // Allocate an event forwarding header and chain the data
+ // portion onto its continuation field.
+ forward_addr = new ACE_Message_Block (sizeof (Event_Addr),
+ ACE_Message_Block::MB_PROTO,
+ this->msg_frag_);
+ if (forward_addr == 0)
+ {
+ delete this->msg_frag_;
+ this->msg_frag_ = 0;
+ errno = ENOMEM;
+ return -1;
+ }
+
+ Event_Addr event_addr (this->id (),
+ event->header_.supplier_id_,
+ event->header_.type_);
+ // Copy the forwarding address from the Event_Addr into
+ // forward_addr.
+ forward_addr->copy ((char *) &event_addr, sizeof (Event_Addr));
+
+ // Reset the pointer to indicate we've got an entire event.
+ this->msg_frag_ = 0;
+ }
+
+ this->total_bytes (data_received + header_received);
+#if defined (VERBOSE)
+ ACE_DEBUG ((LM_DEBUG, "(%t) connection id = %d, supplier id = %d, len = %d, payload = %*s",
+ event_addr.conn_id_, event->header_.supplier_id_, event->header_.len_,
+ event->header_.len_, event->data_));
+#else
+ ACE_DEBUG ((LM_DEBUG, "(%t) supplier id = %d, cur len = %d, total bytes read = %d\n",
+ event->header_.supplier_id_, event->header_.len_, this->total_bytes ()));
+#endif
+ return data_received + header_received;
+ }
+}
+
+// Receive various types of input (e.g., Peer event from the
+// gatewayd, as well as stdio).
+
+int
+Supplier_Proxy::handle_input (ACE_HANDLE)
+{
+ ACE_Message_Block *forward_addr = 0;
+
+ switch (this->recv (forward_addr))
+ {
+ case 0:
+ // Note that a peer should never initiate a shutdown by closing
+ // the connection. Instead, it should reconnect.
+ this->state (Proxy_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR,
+ "(%t) Peer has closed down unexpectedly for Input Proxy_Handler %d\n",
+ this->id ()), -1);
+ /* NOTREACHED */
+ case -1:
+ if (errno == EWOULDBLOCK)
+ // A short-read, we'll come back and finish it up later on!
+ return 0;
+ else // A weird problem occurred, shut down and start again.
+ {
+ this->state (Proxy_Handler::FAILED);
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p for Input Proxy_Handler %d\n",
+ "Peer has failed unexpectedly",
+ this->id ()), -1);
+ }
+ /* NOTREACHED */
+ default:
+ return this->forward (forward_addr);
+ }
+}
+
+// Forward an event to its appropriate Consumer(s).
+
+int
+Supplier_Proxy::forward (ACE_Message_Block *forward_addr)
+{
+ // We got a valid event, so determine its virtual forwarding
+ // address, which is stored in the first of the two event blocks,
+ // which are chained together by this->recv().
+
+ Event_Addr *forwarding_addr = (Event_Addr *) forward_addr->rd_ptr ();
+
+ // Skip over the address portion and get the data.
+ const ACE_Message_Block *const data = forward_addr->cont ();
+
+ // <dispatch_set> points to the set of Consumers associated with
+ // this forwarding address.
+ Dispatch_Set *dispatch_set = 0;
+
+ if (this->efd_->find (*forwarding_addr, dispatch_set) != -1)
+ {
+ // Check to see if there are any destinations.
+ if (dispatch_set->size () == 0)
+ ACE_DEBUG ((LM_WARNING,
+ "there are no active destinations for this event currently\n"));
+
+ else // There are destinations, so forward the event.
+ {
+ Dispatch_Set_Iterator dsi (*dispatch_set);
+
+ for (Proxy_Handler **proxy_handler = 0;
+ dsi.next (proxy_handler) != 0;
+ dsi.advance ())
+ {
+ // Only process active proxy_handlers.
+ if ((*proxy_handler)->state () == Proxy_Handler::ESTABLISHED)
+ {
+ // Clone the event portion (should be doing
+ // reference counting here...)
+ ACE_Message_Block *newmsg = data->clone ();
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) sending to peer %d\n",
+ (*proxy_handler)->id ()));
+
+ if ((*proxy_handler)->put (newmsg) == -1)
+ {
+ if (errno == EWOULDBLOCK) // The queue has filled up!
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n",
+ "gateway is flow controlled, so we're dropping events"));
+ else
+ ACE_ERROR ((LM_ERROR, "(%t) %p transmission error to peer %d\n",
+ "put", (*proxy_handler)->id ()));
+
+ // We are responsible for freeing a
+ // ACE_Message_Block if failures occur.
+ delete newmsg;
+ }
+ }
+ }
+ // Will become superfluous once we have reference
+ // counting...
+ delete forward_addr;
+ return 0;
+ }
+ }
+ delete forward_addr;
+ // Failure return.
+ ACE_ERROR ((LM_DEBUG, "(%t) find failed on conn id = %d, logical id = %d, payload = %d\n",
+ forwarding_addr->conn_id_, forwarding_addr->supplier_id_, forwarding_addr->type_));
+ return 0;
+}
+
+#if defined (ACE_TEMPLATES_REQUIRE_SPECIALIZATION)
+template class ACE_Map_Manager<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Iterator<Event_Addr, Dispatch_Set *, MAP_MUTEX>;
+template class ACE_Map_Entry<Event_Addr, Dispatch_Set *>;
+#endif /* ACE_TEMPLATES_REQUIRE_SPECIALIZATION */
diff --git a/apps/Gateway/Gateway/Proxy_Handler.h b/apps/Gateway/Gateway/Proxy_Handler.h
new file mode 100644
index 00000000000..d91fa3108ff
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler.h
@@ -0,0 +1,215 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Proxy_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_PROXY_HANDLER)
+#define _PROXY_HANDLER
+
+#include "ace/Service_Config.h"
+#include "ace/SOCK_Connector.h"
+#include "ace/Svc_Handler.h"
+#include "Event_Forwarding_Discriminator.h"
+#include "Dispatch_Set.h"
+#include "Event.h"
+
+// Forward declaration.
+class Proxy_Handler_Connector;
+
+class Proxy_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, SYNCH_STRATEGY>
+ // = TITLE
+ // Proxy_Handler contains info about connection state and addressing.
+ //
+ // = DESCRIPTION
+ // The Proxy_Handler classes process events sent to the Event
+ // Channel from Suppliers and forward them to Consumers.
+{
+public:
+ Proxy_Handler (Event_Forwarding_Discriminator *,
+ Proxy_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int open (void * = 0);
+ // Initialize and activate a single-threaded Proxy_Handler (called by
+ // ACE_Connector::handle_output()).
+
+ int bind (const ACE_INET_Addr &remote_addr,
+ const ACE_INET_Addr &local_addr,
+ ACE_INT32);
+ // Set the peer's addressing and routing information.
+
+ ACE_INET_Addr &remote_addr (void);
+ // Returns the peer's routing address.
+
+ ACE_INET_Addr &local_addr (void);
+ // Returns our local address.
+
+ // = Set/get routing id.
+ ACE_INT32 id (void);
+ void id (ACE_INT32);
+
+ // = The current state of the Proxy_Handler.
+ enum State
+ {
+ IDLE = 1, // Prior to initialization.
+ CONNECTING, // During connection establishment.
+ ESTABLISHED, // Proxy_Handler is established and active.
+ DISCONNECTING, // Proxy_Handler is in the process of connecting.
+ FAILED // Proxy_Handler has failed.
+ };
+
+ // = Set/get the current state.
+ void state (State);
+ State state (void);
+
+ // = Set/get the current retry timeout delay.
+ void timeout (int);
+ int timeout (void);
+
+ // = Set/get the maximum retry timeout delay.
+ void max_timeout (int);
+ int max_timeout (void);
+
+ // = Set/get direction (i.e., 'S' for Supplier and 'C' for Consumer
+ // (necessary for error checking).
+ void direction (char);
+ char direction (void);
+
+ // = The total number of bytes sent/received on this channel.
+ size_t total_bytes (void);
+ void total_bytes (size_t bytes);
+ // Increment count by <bytes>.
+
+ virtual int handle_timeout (const ACE_Time_Value &, const void *arg);
+ // Perform timer-based Proxy_Handler reconnection.
+
+protected:
+ enum
+ {
+ MAX_RETRY_TIMEOUT = 300 // 5 minutes is the maximum timeout.
+ };
+
+ int initialize_connection (void);
+ // Perform the first-time initiation of a connection to the peer.
+
+ int reinitiate_connection (void);
+ // Reinitiate a connection asynchronously when peers fail.
+
+ void socket_queue_size (void);
+ // Set the socket queue size.
+
+ virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
+ ACE_Reactor_Mask = ACE_Event_Handler::RWE_MASK);
+ // Perform Proxy_Handler termination.
+
+ Event_Forwarding_Discriminator *efd_;
+ // Maps Events to a set of Consumers.
+
+ ACE_INET_Addr remote_addr_;
+ // Address of peer.
+
+ ACE_INET_Addr local_addr_;
+ // Address of us.
+
+ ACE_INT32 id_;
+ // The assigned routing ID of this entry.
+
+ size_t total_bytes_;
+ // The total number of bytes sent/received on this channel.
+
+ State state_;
+ // The current state of the channel.
+
+ Proxy_Handler_Connector *connector_;
+ // Back pointer to Proxy_Handler_Connector to reestablish broken
+ // connections.
+
+ int timeout_;
+ // Amount of time to wait between reconnection attempts.
+
+ int max_timeout_;
+ // Maximum amount of time to wait between reconnection attempts.
+
+ char direction_;
+ // Indicates which direction data flows through the channel ('S' ==
+ // Supplier and 'C' == Consumer).
+
+ int socket_queue_size_;
+ // Size of the socket queue (0 means "use default").
+};
+
+class Supplier_Proxy : public Proxy_Handler
+ // = TITLE
+ // Handles reception of Events from Suppliers
+ //
+ // = DESCRIPTION
+ // Performs framing and error checking.
+{
+public:
+ Supplier_Proxy (Event_Forwarding_Discriminator *,
+ Proxy_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+ // Constructor sets the consumer map pointer.
+
+protected:
+ virtual int handle_input (ACE_HANDLE = ACE_INVALID_HANDLE);
+ // Receive and process peer events.
+
+ virtual int recv (ACE_Message_Block *&);
+ // Receive an event from a Supplier.
+
+ int forward (ACE_Message_Block *event);
+ // Forward the Event to a Consumer.
+
+ ACE_Message_Block *msg_frag_;
+ // Keep track of event fragment to handle non-blocking recv's from
+ // Suppliers.
+};
+
+class Consumer_Proxy : public Proxy_Handler
+ // = TITLE
+ // Handles transmission of events to Consumers.
+ //
+ // = DESCRIPTION
+ // Uses a single-threaded approach.
+{
+public:
+ Consumer_Proxy (Event_Forwarding_Discriminator *,
+ Proxy_Handler_Connector *,
+ ACE_Thread_Manager * = 0,
+ int socket_queue_size = 0);
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send an event to a Consumer (may be queued if necessary).
+
+protected:
+ // = We'll allow up to 16 megabytes to be queued per-output channel.
+ enum {MAX_QUEUE_SIZE = 1024 * 1024 * 16};
+
+ virtual int handle_output (ACE_HANDLE);
+ // Finish sending event when flow control conditions abate.
+
+ int nonblk_put (ACE_Message_Block *mb);
+ // Perform a non-blocking put().
+
+ virtual ssize_t send (ACE_Message_Block *);
+ // Send an event to a Consumer.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Receive and process shutdowns from a Consumer.
+};
+
+#endif /* _PROXY_HANDLER */
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
new file mode 100644
index 00000000000..7ac0a77a2d4
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.cpp
@@ -0,0 +1,92 @@
+#include "Proxy_Handler_Connector.h"
+// $Id$
+
+
+Proxy_Handler_Connector::Proxy_Handler_Connector (void)
+{
+}
+
+// Override the connection-failure method to add timer support.
+// Note that these timers perform "expoential backoff" to
+// avoid rapidly trying to reestablish connections when a link
+// goes down.
+
+int
+Proxy_Handler_Connector::handle_close (ACE_HANDLE sd, ACE_Reactor_Mask)
+{
+ ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>::AST *stp = 0;
+
+ // Locate the ACE_Svc_Handler corresponding to the socket descriptor.
+ if (this->handler_map_.find (sd, stp) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) can't locate channel %d in map, %p\n",
+ sd, "find"), -1);
+
+ Proxy_Handler *channel = stp->svc_handler ();
+
+ // Schedule a reconnection request at some point in the future
+ // (note that channel uses an exponential backoff scheme).
+ if (ACE_Service_Config::reactor ()->schedule_timer (channel, 0,
+ channel->timeout ()) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ return 0;
+}
+
+// Initiate (or reinitiate) a connection to the Proxy_Handler.
+
+int
+Proxy_Handler_Connector::initiate_connection (Proxy_Handler *channel,
+ ACE_Synch_Options &synch_options)
+{
+ char buf[MAXHOSTNAMELEN];
+
+ // Mark ourselves as idle so that the various iterators
+ // will ignore us until we are reconnected.
+ channel->state (Proxy_Handler::IDLE);
+
+ if (channel->remote_addr ().addr_to_string (buf, sizeof buf) == -1
+ || channel->local_addr ().addr_to_string (buf, sizeof buf) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "can't obtain peer's address"), -1);
+
+ // Try to connect to the Peer.
+
+ if (this->connect (channel, channel->remote_addr (),
+ synch_options, channel->local_addr ()) == -1)
+ {
+ if (errno != EWOULDBLOCK)
+ {
+ channel->state (Proxy_Handler::FAILED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) %p on address %s\n",
+ "connect", buf));
+
+ // Reschedule ourselves to try and connect again.
+ if (synch_options[ACE_Synch_Options::USE_REACTOR])
+ {
+ if (ACE_Service_Config::reactor ()->schedule_timer
+ (channel, 0, channel->timeout ()) == 0)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "schedule_timer"), -1);
+ }
+ else
+ // Failures on synchronous connects are reported as errors
+ // so that the caller can decide how to proceed.
+ return -1;
+ }
+ else
+ {
+ channel->state (Proxy_Handler::CONNECTING);
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) in the process of connecting %s to %s\n",
+ synch_options[ACE_Synch_Options::USE_REACTOR]
+ ? "asynchronously" : "synchronously", buf));
+ }
+ }
+ else
+ {
+ channel->state (Proxy_Handler::ESTABLISHED);
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected to %s on %d\n",
+ buf, channel->get_handle ()));
+ }
+ return 0;
+}
diff --git a/apps/Gateway/Gateway/Proxy_Handler_Connector.h b/apps/Gateway/Gateway/Proxy_Handler_Connector.h
new file mode 100644
index 00000000000..3baea75934a
--- /dev/null
+++ b/apps/Gateway/Gateway/Proxy_Handler_Connector.h
@@ -0,0 +1,40 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Proxy_Handler_Connector.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_IO_HANDLER_CONNECTOR)
+#define _IO_HANDLER_CONNECTOR
+
+#include "ace/Connector.h"
+#include "Thr_Proxy_Handler.h"
+
+class Proxy_Handler_Connector : public ACE_Connector<Proxy_Handler, ACE_SOCK_CONNECTOR>
+ // = TITLE
+ // A concrete factory class that setups connections to peerds
+ // and produces a new Proxy_Handler object to do the dirty work...
+{
+public:
+ Proxy_Handler_Connector (void);
+
+ // Initiate (or reinitiate) a connection on the Proxy_Handler.
+ int initiate_connection (Proxy_Handler *,
+ ACE_Synch_Options & = ACE_Synch_Options::synch);
+
+protected:
+ // Override the connection-failure method to add timer support.
+ virtual int handle_close (ACE_HANDLE sd, ACE_Reactor_Mask);
+};
+
+#endif /* _IO_HANDLER_CONNECTOR */
diff --git a/apps/Gateway/Gateway/README b/apps/Gateway/Gateway/README
index 4e986354aaa..e64ad26b568 100644
--- a/apps/Gateway/Gateway/README
+++ b/apps/Gateway/Gateway/README
@@ -2,15 +2,15 @@ This application illustrates an application-level Gateway which routes
messages between Consumer and Suppliers in a distributed environment.
The default configuration is single-threaded, i.e., all
-Supplier_Handlers and Consumer_Handlers are multiplexed via the ACE
+Supplier_Proxys and Consumer_Proxys are multiplexed via the ACE
Reactor within a single thread of control. To obtain a version that
-multi-threads both Consumer_Handlers and Supplier_Handlers simply set
+multi-threads both Consumer_Proxys and Supplier_Proxys simply set
the following flag in the Makefile:
DEFFLAGS += -DUSE_OUTPUT_MT -DUSE_INPUT_MT
-To get a version that uses single-threading for all Supplier_Handlers,
-but a separate thread per-Consumer_Handler set the following flag in
+To get a version that uses single-threading for all Supplier_Proxys,
+but a separate thread per-Consumer_Proxy set the following flag in
the Makefile:
DEFFLAGS += -DUSE_OUTPUT_MT
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
new file mode 100644
index 00000000000..98722a96295
--- /dev/null
+++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.cpp
@@ -0,0 +1,204 @@
+#include "Thr_Proxy_Handler.h"
+// $Id$
+
+#include "Proxy_Handler_Connector.h"
+
+#if defined (ACE_HAS_THREADS)
+Thr_Consumer_Proxy::Thr_Consumer_Proxy (Event_Forwarding_Discriminator *efd,
+ Proxy_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Consumer_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+{
+}
+
+// This method should be called only when the peer shuts down
+// unexpectedly. This method marks the Proxy_Handler as having failed and
+// deactivates the ACE_Message_Queue (to wake up the thread blocked on
+// <dequeue_head> in svc()). Thr_Output_Handler::handle_close () will
+// eventually try to reconnect...
+
+int
+Thr_Consumer_Proxy::handle_input (ACE_HANDLE h)
+{
+ this->Consumer_Proxy::handle_input (h);
+ ACE_Service_Config::reactor ()->remove_handler (h,
+ ACE_Event_Handler::RWE_MASK
+ | ACE_Event_Handler::DONT_CALL);
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+ return 0;
+}
+
+// Initialize the threaded Consumer_Proxy object and spawn a new
+// thread.
+
+int
+Thr_Consumer_Proxy::open (void *)
+{
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ // Register ourselves to receive input events (which indicate that
+ // the Peer has shut down unexpectedly).
+ if (ACE_Service_Config::reactor ()->register_handler (this,
+ ACE_Event_Handler::READ_MASK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "register_handler"), -1);
+
+ if (this->initialize_connection ())
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "initialize_connection"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // messages to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// ACE_Queue up a message for transmission (must not block since all
+// Supplier_Proxys are single-threaded).
+
+int
+Thr_Consumer_Proxy::put (ACE_Message_Block *mb, ACE_Time_Value *)
+{
+ // Perform non-blocking enqueue.
+ return this->msg_queue ()->enqueue_tail (mb, (ACE_Time_Value *) &ACE_Time_Value::zero);
+}
+
+// Transmit messages to the peer (note simplification resulting from
+// threads...)
+
+int
+Thr_Consumer_Proxy::svc (void)
+{
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Consumer_Proxy's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread it is OK to block on
+ // output.
+
+ for (ACE_Message_Block *mb = 0;
+ this->msg_queue ()->dequeue_head (mb) != -1; )
+ if (this->send (mb) == -1)
+ ACE_ERROR ((LM_ERROR, "(%t) %p\n", "send failed"));
+
+ ACE_ASSERT (errno == ESHUTDOWN);
+
+ ACE_DEBUG ((LM_DEBUG, "(%t) shutting down threaded Consumer_Proxy %d on handle %d\n",
+ this->id (), this->get_handle ()));
+
+ this->peer ().close ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->connector_->initiate_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n",
+ tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+
+ return 0;
+}
+
+Thr_Supplier_Proxy::Thr_Supplier_Proxy (Event_Forwarding_Discriminator *efd,
+ Proxy_Handler_Connector *ioc,
+ ACE_Thread_Manager *thr_mgr,
+ int socket_queue_size)
+ : Supplier_Proxy (efd, ioc, thr_mgr, socket_queue_size)
+{
+}
+
+int
+Thr_Supplier_Proxy::open (void *)
+{
+ // Set the size of the socket queue.
+ this->socket_queue_size ();
+
+ // Turn off non-blocking I/O.
+ if (this->peer ().disable (ACE_NONBLOCK) == -1)
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n", "enable"), -1);
+
+ if (this->initialize_connection ())
+ ACE_ERROR_RETURN ((LM_ERROR, "(%t) %p\n",
+ "initialize_connection"), -1);
+
+ // Reactivate message queue. If it was active then this is the
+ // first time in and we need to spawn a thread, otherwise the queue
+ // was inactive due to some problem and we've already got a thread.
+ if (this->msg_queue ()->activate () == ACE_Message_Queue<SYNCH_STRATEGY>::WAS_ACTIVE)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) spawning new thread\n"));
+ // Become an active object by spawning a new thread to transmit
+ // messages to peers.
+ return this->activate (THR_NEW_LWP | THR_DETACHED);
+ }
+ else
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) reusing existing thread\n"));
+ return 0;
+ }
+}
+
+// Receive messages from a Peer in a separate thread (note reuse of
+// existing code!).
+
+int
+Thr_Supplier_Proxy::svc (void)
+{
+ for (;;)
+ {
+ ACE_DEBUG ((LM_DEBUG, "(%t) connected! Thr_Supplier_Proxy's fd = %d\n",
+ this->peer ().get_handle ()));
+
+ // Since this method runs in its own thread and processes
+ // messages for one connection it is OK to block on input and
+ // output.
+
+ while (this->handle_input () != -1)
+ continue;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) shutting down threaded Supplier_Proxy %d on handle %d\n",
+ this->id (),
+ this->get_handle ()));
+
+ this->peer ().close ();
+
+ // Deactivate the queue while we try to get reconnected.
+ this->msg_queue ()->deactivate ();
+
+ for (this->timeout (1);
+ // Default is to reconnect synchronously.
+ this->connector_->initiate_connection (this) == -1; )
+ {
+ ACE_Time_Value tv (this->timeout ());
+ ACE_ERROR ((LM_ERROR,
+ "(%t) reattempting connection, sec = %d\n", tv.sec ()));
+ ACE_OS::sleep (tv);
+ }
+ }
+ return 0;
+}
+
+#endif /* ACE_HAS_THREADS */
diff --git a/apps/Gateway/Gateway/Thr_Proxy_Handler.h b/apps/Gateway/Gateway/Thr_Proxy_Handler.h
new file mode 100644
index 00000000000..8ecced3805c
--- /dev/null
+++ b/apps/Gateway/Gateway/Thr_Proxy_Handler.h
@@ -0,0 +1,64 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Thr_Proxy_Handler.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (_THR_IO_HANDLER)
+#define _THR_IO_HANDLER
+
+#include "Proxy_Handler.h"
+
+#if defined (ACE_HAS_THREADS)
+class Thr_Consumer_Proxy : public Consumer_Proxy
+ // = TITLE
+ // Runs each Output Proxy_Handler in a separate thread.
+{
+public:
+ Thr_Consumer_Proxy (Event_Forwarding_Discriminator *,
+ Proxy_Handler_Connector *,
+ ACE_Thread_Manager *,
+ int socket_queue_size);
+
+ virtual int open (void *);
+ // Initialize the threaded Consumer_Proxy object and spawn a new
+ // thread.
+
+ virtual int handle_input (ACE_HANDLE);
+ // Called when Peer shutdown unexpectedly.
+
+ virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);
+ // Send a message to a peer.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+
+class Thr_Supplier_Proxy : public Supplier_Proxy
+ // = TITLE
+ // Runs each Input Proxy_Handler in a separate thread.
+{
+public:
+ Thr_Supplier_Proxy (Event_Forwarding_Discriminator *,
+ Proxy_Handler_Connector *,
+ ACE_Thread_Manager *,
+ int socket_queue_size);
+
+ virtual int open (void *);
+ // Initialize the object and spawn a new thread.
+
+ virtual int svc (void);
+ // Transmit peer messages.
+};
+#endif /* ACE_HAS_THREADS */
+#endif /* _THR_IO_HANDLER */
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
index d33469ee157..58884340e61 100644
--- a/apps/Gateway/Gateway/consumer_config
+++ b/apps/Gateway/Gateway/consumer_config
@@ -1,8 +1,8 @@
-# Consumer map configuration file
-# Conn ID Logical ID Payload Destinations
-# ------- ---------- ------- ------------
-# 1 1 0 3,4,5
- 1 1 0 3
- 3 1 0 3
-# 4 1 0 4
-# 5 1 0 5
+# Consumer configuration file
+# Conn ID Supplier ID Type Consumers
+# ------- ----------- ------- ------------
+# 1 1 0 3,4,5
+ 1 1 0 3
+ 3 1 0 3
+# 4 1 0 4
+# 5 1 0 5
diff --git a/apps/Gateway/Peer/Event.h b/apps/Gateway/Peer/Event.h
new file mode 100644
index 00000000000..24881c3e85b
--- /dev/null
+++ b/apps/Gateway/Peer/Event.h
@@ -0,0 +1,101 @@
+/* -*- C++ -*- */
+// $Id$
+
+// ============================================================================
+//
+// = LIBRARY
+// apps
+//
+// = FILENAME
+// Event.h
+//
+// = AUTHOR
+// Doug Schmidt
+//
+// ============================================================================
+
+#if !defined (EVENT)
+#define EVENT
+
+#include "ace/OS.h"
+
+// This is the unique connection identifier that denotes a particular
+// Proxy_Handler in the Gateway.
+typedef ACE_INT32 ACE_INT32;
+
+class Event_Addr
+ // = TITLE
+ // Address used to identify the source/destination of an event.
+ //
+ // = DESCRIPTION
+ // This is really a "virtual forwarding address" thatis used to
+ // decouple the filtering and forwarding logic of the Event
+ // Channel from the format of the data.
+{
+public:
+ Event_Addr (ACE_INT32 cid = -1,
+ u_char sid = 0,
+ u_char type = 0)
+ : conn_id_ (cid),
+ supplier_id_ (sid),
+ type_ (type) {}
+
+ int operator== (const Event_Addr &event_addr) const
+ {
+ return this->conn_id_ == event_addr.conn_id_
+ && this->supplier_id_ == event_addr.supplier_id_
+ && this->type_ == event_addr.type_;
+ }
+
+ ACE_INT32 conn_id_;
+ // Unique connection identifier that denotes a particular
+ // Proxy_Handler.
+
+ ACE_INT32 supplier_id_;
+ // Logical ID.
+
+ ACE_INT32 type_;
+ // Event type.
+};
+
+
+class Event_Header
+ // = TITLE
+ // Fixed sized header.
+{
+public:
+ typedef ACE_INT32 SUPPLIER_ID;
+ // Type used to forward events from gatewayd.
+
+ enum
+ {
+ INVALID_ID = -1 // No peer can validly use this number.
+ };
+
+ SUPPLIER_ID supplier_id_;
+ // Source ID.
+
+ ACE_INT32 type_;
+ // Event type.
+
+ size_t len_;
+ // Length of the entire event (including data payload) in bytes.
+};
+
+class Event
+ // = TITLE
+ // Variable-sized event (data_ may be variable-sized between
+ // 0 and MAX_PAYLOAD_SIZE).
+{
+public:
+ enum { MAX_PAYLOAD_SIZE = 1024 };
+ // The maximum size of an Event.
+
+ Event_Header header_;
+ // Event header.
+
+ char data_[MAX_PAYLOAD_SIZE];
+ // Event data.
+};
+
+#endif /* EVENT */
diff --git a/apps/Gateway/Peer/Makefile b/apps/Gateway/Peer/Makefile
index 9909eb2ef2a..3176a62b455 100644
--- a/apps/Gateway/Peer/Makefile
+++ b/apps/Gateway/Peer/Makefile
@@ -1,7 +1,7 @@
#----------------------------------------------------------------------------
# @(#)Makefile 1.1 10/18/96
#
-# Makefile for the peer portion of the communication gateway
+# Makefile for the Peer portion of the Gateway application
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
@@ -10,7 +10,7 @@
BIN = peerd
-FILES = Gateway_Handler
+FILES = Peer
LSRC = $(addsuffix .cpp,$(FILES))
LOBJ = $(addsuffix .o,$(FILES))
@@ -46,7 +46,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
# DO NOT DELETE THIS LINE -- g++dep uses it.
# DO NOT PUT ANYTHING AFTER THIS LINE, IT WILL GO AWAY.
-.obj/Gateway_Handler.o .shobj/Gateway_Handler.so: Gateway_Handler.cpp \
+.obj/Peer.o .shobj/Peer.so: Peer.cpp \
$(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
@@ -59,7 +59,7 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Log_Priority.h \
$(WRAPPER_ROOT)/ace/Log_Record.i \
$(WRAPPER_ROOT)/ace/ACE.i \
- Gateway_Handler.h \
+ Peer.h \
$(WRAPPER_ROOT)/ace/Service_Config.h \
$(WRAPPER_ROOT)/ace/Service_Object.h \
$(WRAPPER_ROOT)/ace/Shared_Object.h \
diff --git a/apps/Gateway/Peer/peerd.cpp b/apps/Gateway/Peer/peerd.cpp
index 7836e9892f8..3b7bdb0cb2d 100644
--- a/apps/Gateway/Peer/peerd.cpp
+++ b/apps/Gateway/Peer/peerd.cpp
@@ -1,11 +1,10 @@
-/* Driver for the peer daemon (peerd). Note that this
// $Id$
- is completely generic code due to the Service Configurator
- framework! */
+// Driver for the peer daemon (peerd). Note that this is completely
+// generic code due to the Service Configurator framework!
#include "ace/Service_Config.h"
-#include "Gateway_Handler.h"
+#include "Peer.h"
int
main (int argc, char *argv[])
@@ -20,7 +19,7 @@ main (int argc, char *argv[])
{
static char *l_argv[3] = { "-d", "-p", "10002" };
- ACE_Service_Object *so = _alloc_peerd ();
+ ACE_Service_Object *so = _make_Peer_Acceptor ();
if (so->init (3, l_argv) == -1)
@@ -28,7 +27,19 @@ main (int argc, char *argv[])
}
}
- /* Run forever, performing the configured services (until SIGINT/SIGQUIT occurs) */
+ // Create an adapter to end the event loop.
+ ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+
+ ACE_Sig_Set sig_set;
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register ourselves to receive SIGINT and SIGQUIT so we can shut
+ // down gracefully via signals.
+ ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
+
+ // Run forever, performing the configured services until we are shut
+ // down by a SIGINT/SIGQUIT signal.
daemon.run_reactor_event_loop ();
diff --git a/apps/Gateway/Peer/svc.conf b/apps/Gateway/Peer/svc.conf
index 3bd0cd55891..6c9bc3bc3d7 100644
--- a/apps/Gateway/Peer/svc.conf
+++ b/apps/Gateway/Peer/svc.conf
@@ -1,3 +1,3 @@
#static Svc_Manager "-d -p 291"
-dynamic Peer1 Service_Object *.shobj/Gateway_Handler:_alloc_peerd() active "-p 10004"
-#dynamic Peer2 Service_Object *.shobj/Gateway_Handler:_alloc_peerd() active "-p 10003"
+dynamic Peer1 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10004"
+#dynamic Peer2 Service_Object *.shobj/Peer:_make_Peer_Acceptor() active "-p 10003"
diff --git a/examples/ASX/CCM_App/svc.conf b/examples/ASX/CCM_App/svc.conf
index b083976607e..f77d9b14d9d 100644
--- a/examples/ASX/CCM_App/svc.conf
+++ b/examples/ASX/CCM_App/svc.conf
@@ -1,4 +1,4 @@
-static ACE_Service_Manager "-d -p 3911"
+static ACE_Service_Manager "-d -p 4911"
dynamic My_Task Service_Object *.shobj/CCM_App:make_task() "-p 3000"
diff --git a/examples/Logger/Acceptor-server/server_loggerd.cpp b/examples/Logger/Acceptor-server/server_loggerd.cpp
index 21776447b73..20522d9168c 100644
--- a/examples/Logger/Acceptor-server/server_loggerd.cpp
+++ b/examples/Logger/Acceptor-server/server_loggerd.cpp
@@ -1,6 +1,6 @@
-// This server daemon collects, formats, and displays logging
// $Id$
+// This server daemon collects, formats, and displays logging
// information forwarded from client daemons running on other hosts in
// the network. In addition, this example illustrates how to use the
// ACE_Reactor, ACE_Acceptor, ACE_Singleton, and ACE_Test_and_Set
diff --git a/netsvcs/lib/Client_Logging_Handler.cpp b/netsvcs/lib/Client_Logging_Handler.cpp
index 719e56abd00..20212064353 100644
--- a/netsvcs/lib/Client_Logging_Handler.cpp
+++ b/netsvcs/lib/Client_Logging_Handler.cpp
@@ -1,10 +1,10 @@
-// Client_Logging_Handler.cpp
// $Id$
+// Client_Logging_Handler.cpp
+
#define ACE_BUILD_SVC_DLL
#include "ace/Service_Config.h"
#include "ace/Connector.h"
-
#include "ace/Get_Opt.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
@@ -16,9 +16,6 @@ class ACE_Svc_Export ACE_Client_Logging_Handler : public ACE_Svc_Handler<ACE_SOC
// This client logging daemon is a mediator that receives logging
// records from local applications processes and forwards them to
// the server logging daemon running on another host.
- //
- // = DESCRIPTION
- //
{
public:
// = Initialization and termination.
@@ -34,11 +31,11 @@ public:
// Return the handle of the message_fifo_;
virtual int close (u_long);
- // Called when object is removed from the ACE_Reactor
+ // Called when object is removed from the ACE_Reactor.
protected:
virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
- // Handle SIGINT.
+ // Handle SIGPIPE.
virtual int handle_input (ACE_HANDLE);
// Receive logging records from applications.
@@ -69,10 +66,13 @@ ACE_Client_Logging_Handler::ACE_Client_Logging_Handler (const char rendezvous[])
{
if (ACE_OS::unlink (rendezvous) == -1 && errno == EACCES)
ACE_ERROR ((LM_ERROR, "%p\n", "unlink"));
+
else if (this->message_fifo_.open (rendezvous) == -1)
ACE_ERROR ((LM_ERROR, "%p\n", "open"));
- // Register message FIFO to receive input from clients. Note that we need to
- // put the EXCEPT_MASK here to deal with SVR4 MSG_BAND data correctly...
+
+ // Register message FIFO to receive input from clients. Note that
+ // we need to put the EXCEPT_MASK here to deal with SVR4 MSG_BAND
+ // data correctly...
else if (ACE_Service_Config::reactor ()->register_handler
(this->message_fifo_.get_handle (), this,
ACE_Event_Handler::READ_MASK | ACE_Event_Handler::EXCEPT_MASK) == -1)
@@ -90,7 +90,6 @@ int
ACE_Client_Logging_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
{
ACE_TRACE ("ACE_Client_Logging_Connector::handle_signal");
-// return 0;
return -1;
}
@@ -126,7 +125,6 @@ ACE_Client_Logging_Handler::get_handle (void) const
return this->message_fifo_.get_handle ();
}
-
// Receive a logging record from an application.
int
@@ -170,7 +168,15 @@ int
ACE_Client_Logging_Handler::close (u_long)
{
ACE_DEBUG ((LM_DEBUG, "shutting down!!!\n"));
+
+ if (ACE_Service_Config::reactor ()->remove_handler
+ (this->message_fifo_.get_handle (),
+ ACE_Event_Handler::READ_MASK | ACE_Event_Handler::EXCEPT_MASK | ACE_Event_Handler::DONT_CALL) == -1)
+ ACE_ERROR ((LM_ERROR, "%n: %p\n",
+ "remove_handler (message_fifo)"));
+
this->message_fifo_.close ();
+ this->destroy ();
return 0;
}
@@ -207,7 +213,6 @@ ACE_Client_Logging_Handler::send (ACE_Log_Record &log_record)
return 0;
}
-
class ACE_Client_Logging_Connector : public ACE_Connector<ACE_Client_Logging_Handler, ACE_SOCK_CONNECTOR>
// = TITLE
// This class contains the service-specific methods that can't
@@ -228,9 +233,6 @@ protected:
virtual int suspend (void);
virtual int resume (void);
- virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
- // Handle SIGINT.
-
private:
int parse_args (int argc, char *argv[]);
// Parse svc.conf arguments.
@@ -256,7 +258,7 @@ private:
int
ACE_Client_Logging_Connector::fini (void)
{
- this->handler_->destroy ();
+ this->handler_->close (0);
return 0;
}
@@ -285,12 +287,6 @@ ACE_Client_Logging_Connector::init (int argc, char *argv[])
// options.
this->parse_args (argc, argv);
- // Register ourselves to receive SIGINT so we can shutdown
- // gracefully.
- if (ACE_Service_Config::reactor ()->register_handler (SIGINT, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n",
- "register_handler (SIGINT)"), -1);
-
ACE_NEW_RETURN (this->handler_,
ACE_Client_Logging_Handler (this->rendezvous_key_),
-1);
@@ -352,16 +348,6 @@ ACE_Client_Logging_Connector::resume (void)
return 0;
}
-// Signal the server to shutdown gracefully.
-
-int
-ACE_Client_Logging_Connector::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- ACE_TRACE ("ACE_Client_Logging_Connector::handle_signal");
- ACE_Service_Config::end_reactor_event_loop ();
- return 0;
-}
-
// The following is a "Factory" used by the ACE_Service_Config and
// svc.conf file to dynamically initialize the state of the
// single-threaded logging server.
diff --git a/netsvcs/lib/Makefile b/netsvcs/lib/Makefile
index 8e97448a749..7f2caec6a8c 100644
--- a/netsvcs/lib/Makefile
+++ b/netsvcs/lib/Makefile
@@ -67,19 +67,11 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -93,14 +85,22 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Time_Request_Reply.h \
@@ -129,20 +129,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -156,6 +148,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
@@ -163,9 +166,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Connector.i \
$(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
@@ -197,20 +197,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -224,6 +216,17 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Connector.h \
$(WRAPPER_ROOT)/ace/Map_Manager.h \
@@ -231,9 +234,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Connector.i \
$(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/SOCK_Connector.h \
@@ -285,22 +285,25 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Proactor.h \
$(WRAPPER_ROOT)/ace/Message_Block.h \
$(WRAPPER_ROOT)/ace/Malloc.h \
$(WRAPPER_ROOT)/ace/Malloc_T.h \
$(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
$(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Name_Request_Reply.h \
@@ -309,9 +312,6 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
Name_Handler.h
@@ -358,31 +358,31 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/Shared_Object.h \
$(WRAPPER_ROOT)/ace/Thread_Manager.h \
$(WRAPPER_ROOT)/ace/Thread.h \
+ $(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Set.h \
+ $(WRAPPER_ROOT)/ace/Reactor.h \
+ $(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Pipe.h \
+ $(WRAPPER_ROOT)/ace/Pipe.i \
+ $(WRAPPER_ROOT)/ace/Reactor.i \
$(WRAPPER_ROOT)/ace/Proactor.h \
$(WRAPPER_ROOT)/ace/Message_Block.h \
$(WRAPPER_ROOT)/ace/Malloc.h \
$(WRAPPER_ROOT)/ace/Malloc_T.h \
$(WRAPPER_ROOT)/ace/Memory_Pool.h \
- $(WRAPPER_ROOT)/ace/Signal.h \
$(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
$(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
- $(WRAPPER_ROOT)/ace/Reactor.h \
- $(WRAPPER_ROOT)/ace/Handle_Set.h \
- $(WRAPPER_ROOT)/ace/Pipe.h \
- $(WRAPPER_ROOT)/ace/Pipe.i \
- $(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
Server_Logging_Handler.h
.obj/Token_Handler.o .shobj/Token_Handler.so: Token_Handler.cpp \
@@ -411,20 +411,12 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SV_Semaphore_Simple.i \
$(WRAPPER_ROOT)/ace/SV_Semaphore_Complex.i \
$(WRAPPER_ROOT)/ace/Synch_T.h \
- $(WRAPPER_ROOT)/ace/Set.h \
- $(WRAPPER_ROOT)/ace/Proactor.h \
- $(WRAPPER_ROOT)/ace/Message_Block.h \
- $(WRAPPER_ROOT)/ace/Malloc.h \
- $(WRAPPER_ROOT)/ace/Malloc_T.h \
- $(WRAPPER_ROOT)/ace/Memory_Pool.h \
$(WRAPPER_ROOT)/ace/Signal.h \
- $(WRAPPER_ROOT)/ace/Mem_Map.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.h \
- $(WRAPPER_ROOT)/ace/Timer_Queue.i \
- $(WRAPPER_ROOT)/ace/ReactorEx.h \
- $(WRAPPER_ROOT)/ace/Token.h \
+ $(WRAPPER_ROOT)/ace/Set.h \
$(WRAPPER_ROOT)/ace/Reactor.h \
$(WRAPPER_ROOT)/ace/Handle_Set.h \
+ $(WRAPPER_ROOT)/ace/Timer_Queue.h \
+ $(WRAPPER_ROOT)/ace/Token.h \
$(WRAPPER_ROOT)/ace/Pipe.h \
$(WRAPPER_ROOT)/ace/Pipe.i \
$(WRAPPER_ROOT)/ace/SOCK_Stream.h \
@@ -438,14 +430,22 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/INET_Addr.h \
$(WRAPPER_ROOT)/ace/SOCK_Stream.i \
$(WRAPPER_ROOT)/ace/Reactor.i \
+ $(WRAPPER_ROOT)/ace/Proactor.h \
+ $(WRAPPER_ROOT)/ace/Message_Block.h \
+ $(WRAPPER_ROOT)/ace/Malloc.h \
+ $(WRAPPER_ROOT)/ace/Malloc_T.h \
+ $(WRAPPER_ROOT)/ace/Memory_Pool.h \
+ $(WRAPPER_ROOT)/ace/Mem_Map.h \
+ $(WRAPPER_ROOT)/ace/ReactorEx.h \
+ $(WRAPPER_ROOT)/ace/Message_Queue.h \
+ $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
+ $(WRAPPER_ROOT)/ace/Strategies.h \
+ $(WRAPPER_ROOT)/ace/Strategies_T.h \
$(WRAPPER_ROOT)/ace/Svc_Conf_Tokens.h \
$(WRAPPER_ROOT)/ace/Svc_Handler.h \
$(WRAPPER_ROOT)/ace/Synch_Options.h \
$(WRAPPER_ROOT)/ace/Task.h \
$(WRAPPER_ROOT)/ace/Task_T.h \
- $(WRAPPER_ROOT)/ace/Message_Queue.h \
- $(WRAPPER_ROOT)/ace/IO_Cntl_Msg.h \
- $(WRAPPER_ROOT)/ace/Strategies.h \
$(WRAPPER_ROOT)/ace/Acceptor.i \
$(WRAPPER_ROOT)/ace/SOCK_Acceptor.h \
$(WRAPPER_ROOT)/ace/Token_Request_Reply.h \
@@ -456,6 +456,8 @@ include $(WRAPPER_ROOT)/include/makeinclude/rules.local.GNU
$(WRAPPER_ROOT)/ace/SString.h \
Token_Handler.h
.obj/Logging_Strategy.o .shobj/Logging_Strategy.so: Logging_Strategy.cpp \
+ /pkg/gnu/lib/g++-include/iostream.h \
+ /pkg/gnu/lib/g++-include/fstream.h \
$(WRAPPER_ROOT)/ace/Get_Opt.h \
$(WRAPPER_ROOT)/ace/ACE.h \
$(WRAPPER_ROOT)/ace/OS.h \
diff --git a/netsvcs/lib/Name_Handler.cpp b/netsvcs/lib/Name_Handler.cpp
index f2ddb2e950f..fc4a8b9cb46 100644
--- a/netsvcs/lib/Name_Handler.cpp
+++ b/netsvcs/lib/Name_Handler.cpp
@@ -174,21 +174,12 @@ public:
int parse_args (int argc, char *argv[]);
// Parse svc.conf arguments.
- int handle_signal (int, siginfo_t *, ucontext_t *);
-
private:
ACE_Schedule_All_Reactive_Strategy<ACE_Name_Handler> scheduling_strategy_;
// The scheduling strategy is designed for Reactive services.
};
int
-ACE_Name_Acceptor::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- ACE_DEBUG ((LM_DEBUG, "ACE_Name_Acceptor::handle_signal got called\n"));
- return 0;
-}
-
-int
ACE_Name_Acceptor::parse_args (int argc, char *argv[])
{
ACE_TRACE ("ACE_Name_Acceptor::parse_args");
@@ -237,12 +228,6 @@ ACE_Name_Acceptor::init (int argc, char *argv[])
"acceptor::open failed",
this->service_addr_.get_port_number ()), -1);
- // Register ourselves to receive SIGINT so we can shutdown
- // gracefully.
- if (this->reactor ()->register_handler (SIGINT, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n",
- "register_handler (SIGINT)"), -1);
-
// Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its
// own.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
diff --git a/netsvcs/lib/Server_Logging_Handler.cpp b/netsvcs/lib/Server_Logging_Handler.cpp
index 5f18494fb3c..67c67ee4e74 100644
--- a/netsvcs/lib/Server_Logging_Handler.cpp
+++ b/netsvcs/lib/Server_Logging_Handler.cpp
@@ -131,12 +131,6 @@ ACE_Server_Logging_Acceptor::init (int argc,
"acceptor::open failed",
this->service_addr_.get_port_number ()), -1);
- // Register ourselves to receive SIGINT so we can shutdown
- // gracefully.
- if (this->reactor ()->register_handler (SIGINT, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n",
- "register_handler (SIGINT)"), -1);
-
// Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its
// own.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
@@ -364,12 +358,6 @@ ACE_Thr_Server_Logging_Acceptor::init (int argc,
"acceptor::open failed",
this->service_addr_.get_port_number ()), -1);
- // Register ourselves to receive SIGINT so we can shutdown
- // gracefully.
- if (this->reactor ()->register_handler (SIGINT, this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n",
- "register_handler (SIGINT)"), -1);
-
// Ignore SIGPIPE so that each <SVC_HANDLER> can handle this on its
// own.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
diff --git a/netsvcs/lib/TS_Clerk_Handler.cpp b/netsvcs/lib/TS_Clerk_Handler.cpp
index 533e98291bb..e0142256ab0 100644
--- a/netsvcs/lib/TS_Clerk_Handler.cpp
+++ b/netsvcs/lib/TS_Clerk_Handler.cpp
@@ -94,7 +94,7 @@ public:
protected:
virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
- // Handle SIGINT.
+ // Handle SIGPIPE.
static void stderr_output (int = 0);
@@ -147,7 +147,7 @@ class ACE_TS_Clerk_Processor : public ACE_Connector <ACE_TS_Clerk_Handler, ACE_S
// computing a synchronized system time.
{
public:
- ACE_TS_Clerk_Processor ();
+ ACE_TS_Clerk_Processor (void);
// Default constructor
virtual int handle_timeout (const ACE_Time_Value &tv,
@@ -172,9 +172,6 @@ protected:
virtual int suspend (void);
virtual int resume (void);
- virtual int handle_signal (int signum, siginfo_t *, ucontext_t *);
- // Handle SIGINT.
-
private:
int parse_args (int argc, char *argv[]);
// Parse svc.conf arguments.
@@ -234,14 +231,6 @@ ACE_TS_Clerk_Handler::ACE_TS_Clerk_Handler (ACE_TS_Clerk_Processor *processor,
this->time_info_.sequence_num_ = 0;
}
-// This is called when a <send> to a server fails...
-int
-ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- ACE_TRACE ("ACE_TS_Clerk_Handler::handle_signal");
- return 0;
-}
-
// Set the connection state
void
ACE_TS_Clerk_Handler::state (ACE_TS_Clerk_Handler::State state)
@@ -285,6 +274,15 @@ ACE_TS_Clerk_Handler::timeout (void)
return old_timeout;
}
+// This is called when a <send> to the logging server fails...
+
+int
+ACE_TS_Clerk_Handler::handle_signal (int, siginfo_t *, ucontext_t *)
+{
+ ACE_TRACE ("ACE_TS_Clerk_Handler::handle_signal");
+ return -1;
+}
+
// Set the max timeout delay.
void
ACE_TS_Clerk_Handler::max_timeout (int mto)
@@ -651,14 +649,8 @@ ACE_TS_Clerk_Processor::init (int argc, char *argv[])
#if !defined (ACE_WIN32)
// Ignore SIPPIPE so each Output_Channel can handle it.
ACE_Sig_Action sig (ACE_SignalHandler (SIG_IGN), SIGPIPE);
+#endif /* ACE_WIN32 */
- // Register ourselves to receive SIGINT and SIGPIPE
- // so we can shut down gracefully via signals.
- if (ACE_Service_Config::reactor ()->register_handler (SIGINT,
- this) == -1)
- ACE_ERROR_RETURN ((LM_ERROR, "%n: %p\n",
- "register_handler"), -1);
-#endif
ACE_Synch_Options &synch_options = this->blocking_semantics_ == 0
? ACE_Synch_Options::asynch : ACE_Synch_Options::synch;
@@ -803,16 +795,6 @@ ACE_TS_Clerk_Processor::resume (void)
return 0;
}
-// Signal the server to shutdown gracefully.
-
-int
-ACE_TS_Clerk_Processor::handle_signal (int, siginfo_t *, ucontext_t *)
-{
- ACE_TRACE ("ACE_TS_Clerk_Processor::handle_signal");
- ACE_Service_Config::end_reactor_event_loop ();
- return 0;
-}
-
// The following is a "Factory" used by the ACE_Service_Config and
// svc.conf file to dynamically initialize the state of the TS_Clerk.
diff --git a/netsvcs/servers/main.cpp b/netsvcs/servers/main.cpp
index c072d012211..130b7f1ab30 100644
--- a/netsvcs/servers/main.cpp
+++ b/netsvcs/servers/main.cpp
@@ -1,6 +1,6 @@
-#include "ace/Service_Config.h"
// $Id$
+#include "ace/Service_Config.h"
#include "TS_Clerk_Handler.h"
#include "TS_Server_Handler.h"
#include "Client_Logging_Handler.h"
@@ -69,6 +69,17 @@ main (int argc, char *argv[])
}
}
+ // Create an adapter to end the event loop.
+ ACE_Sig_Adapter sa ((ACE_Sig_Handler_Ex) ACE_Service_Config::end_reactor_event_loop);
+
+ ACE_Sig_Set sig_set;
+ sig_set.sig_add (SIGINT);
+ sig_set.sig_add (SIGQUIT);
+
+ // Register ourselves to receive SIGINT and SIGQUIT so we can shut
+ // down gracefully via signals.
+ ACE_Service_Config::reactor ()->register_handler (sig_set, &sa);
+
// Run forever, performing the configured services until we are shut
// down by a SIGINT/SIGQUIT signal.