diff options
-rw-r--r-- | ChangeLog-98a | 9 | ||||
-rw-r--r-- | ace/Env_Value_T.cpp | 2 | ||||
-rw-r--r-- | ace/Env_Value_T.h | 45 | ||||
-rw-r--r-- | ace/Env_Value_T.i | 37 | ||||
-rw-r--r-- | ace/OS.h | 29 | ||||
-rw-r--r-- | ace/OS.i | 162 | ||||
-rw-r--r-- | ace/Synch.h | 8 | ||||
-rw-r--r-- | ace/Synch.i | 7 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Config_Files.cpp | 135 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Config_Files.h | 17 | ||||
-rw-r--r-- | apps/Gateway/Gateway/File_Parser.cpp | 30 | ||||
-rw-r--r-- | apps/Gateway/Gateway/File_Parser.h | 7 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.cpp | 17 | ||||
-rw-r--r-- | apps/Gateway/Gateway/Options.h | 9 | ||||
-rw-r--r-- | apps/Gateway/Gateway/consumer_config | 15 | ||||
-rw-r--r-- | apps/Gateway/Gateway/proxy_config | 15 | ||||
-rw-r--r-- | tests/Reader_Writer_Test.cpp | 159 |
17 files changed, 430 insertions, 273 deletions
diff --git a/ChangeLog-98a b/ChangeLog-98a index f2d06d41051..aefa6554c46 100644 --- a/ChangeLog-98a +++ b/ChangeLog-98a @@ -1,3 +1,12 @@ +Fri Jan 2 00:17:04 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu> + + * Reader_Writer_Test.cpp: Changed from an ACE_RW_Mutex to an + ACE_RW_Thread_Mutex so that we can use the new + tryacquire_write_upgrade() method. + + * ace/Env_Value_T.h: Reformatted the file a bit to conform to + ACE coding conventions. + Thu Jan 1 12:04:29 1998 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu> * apps/Gateway/Peer/Options.cpp (parse_args): Changed the option diff --git a/ace/Env_Value_T.cpp b/ace/Env_Value_T.cpp index 25a2efed4d1..758f3203137 100644 --- a/ace/Env_Value_T.cpp +++ b/ace/Env_Value_T.cpp @@ -8,5 +8,5 @@ template void ACE_Convert(const char*, char*&); #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate void ACE_Convert (const char*, char*&) -#endif +#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ diff --git a/ace/Env_Value_T.h b/ace/Env_Value_T.h index dbce975361e..5a2b88ea434 100644 --- a/ace/Env_Value_T.h +++ b/ace/Env_Value_T.h @@ -1,4 +1,4 @@ -// This may look like C, but it's really -*- C++ -*- +/* This may look like C, but it's really -*- C++ -*- */ // $Id$ // ============================================================================ @@ -16,31 +16,29 @@ // ============================================================================ #if !defined (ACE_ENV_VALUE_T_H) -# define ACE_ENV_VALUE_T_H +#define ACE_ENV_VALUE_T_H template <class T> class ACE_Env_Value -// -// = TITLE -// Enviroment Variable Value -// -// = DESCRIPTION -// Reads a variable from the user enviroment, providing a default -// value. -// -// = AUTHOR -// Chris Cleeland, Carlos O'Ryan -// { + // = TITLE + // Enviroment Variable Value + // + // = DESCRIPTION + // Reads a variable from the user enviroment, providing a default + // value. + // + // = AUTHOR + // Chris Cleeland, Carlos O'Ryan public: ACE_Env_Value (void); // Default constructor which isn't bound to a specific environment // variable name or a default value. Before being useful it must // <open()>ed. - ACE_Env_Value (const char* varname, - const T& vardefault); - // Constructor which calls <open()>. + ACE_Env_Value (const char *varname, + const T &vardefault); + // Constructor that calls <open>. ~ACE_Env_Value (void); // Destroy the value. @@ -48,26 +46,25 @@ public: operator const T (void) const; // Returns the value as type T. - void open (const char* varname, const T& defval); + void open (const char *varname, const T &defval); // The constructor, read <varname> from the enviroment, using // <vardefault> as its value if it is not defined. - const char* varname (void) const; + const char *varname (void) const; // Returns the name of the variable being tracked. private: - ACE_UNIMPLEMENTED_FUNC(ACE_Env_Value (const ACE_Env_Value&)) - ACE_UNIMPLEMENTED_FUNC(ACE_Env_Value& operator= (const ACE_Env_Value&)) + ACE_UNIMPLEMENTED_FUNC (ACE_Env_Value (const ACE_Env_Value &)) + ACE_UNIMPLEMENTED_FUNC (ACE_Env_Value &operator= (const ACE_Env_Value &)) // Disallow copying and assignment. -private: void fetch_value (void); - const char* varname_; + const char *varname_; T value_; }; -template <class T> void ACE_Convert (const char* s, T& t); +template <class T> void ACE_Convert (const char *s, T &t); // Function to convert a string <s> into type <T>. #if defined (__ACE_INLINE__) @@ -82,4 +79,4 @@ template <class T> void ACE_Convert (const char* s, T& t); #pragma implementation ("Env_Value_T.cpp") #endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */ -#endif +#endif /* ACE_ENV_VALUE_T_H */ diff --git a/ace/Env_Value_T.i b/ace/Env_Value_T.i index 0770f86c79c..31828bfe385 100644 --- a/ace/Env_Value_T.i +++ b/ace/Env_Value_T.i @@ -9,18 +9,22 @@ ACE_Env_Value<T>::operator const T (void) const template <class T> ACE_INLINE ACE_Env_Value<T>::ACE_Env_Value (void) : varname_ (0) -{} +{ +} template <class T> ACE_INLINE -ACE_Env_Value<T>::ACE_Env_Value (const char* varname, const T& defval) - : varname_ (varname), value_(defval) +ACE_Env_Value<T>::ACE_Env_Value (const char *varname, + const T &defval) + : varname_ (varname), + value_(defval) { this->fetch_value (); } template <class T> ACE_INLINE void -ACE_Env_Value<T>::open (const char* varname, const T& defval) +ACE_Env_Value<T>::open (const char *varname, + const T &defval) { varname_ = varname; value_ = defval; @@ -28,20 +32,19 @@ ACE_Env_Value<T>::open (const char* varname, const T& defval) } template <class T> ACE_INLINE void -ACE_Env_Value<T>::fetch_value () +ACE_Env_Value<T>::fetch_value (void) { - const char* env = ACE_OS::getenv (varname_); + const char *env = ACE_OS::getenv (varname_); + if (env != 0) ACE_Convert (env, value_); } - template <class T> ACE_INLINE ACE_Env_Value<T>::~ACE_Env_Value (void) { } - // Default calls a CTOR on type T of the form 'T::T(const char*)', but // users can feel free to create their own specialized conversion // functions if necessary, as shown below. Note that for 'char*' the @@ -49,43 +52,43 @@ ACE_Env_Value<T>::~ACE_Env_Value (void) // conversion will be necessary. template <class T> ACE_INLINE void -ACE_Convert (const char* s, T& t) +ACE_Convert (const char *s, T &t) { - t = T(s); + t = T (s); } ACE_INLINE void -ACE_Convert (const char* s, char*& v) +ACE_Convert (const char *s, char *&v) { - v = (char*)s; + v = (char *) s; } ACE_INLINE void -ACE_Convert (const char* s, short& si) +ACE_Convert (const char *s, short &si) { si = ACE_OS::strtol (s, 0, 10); } ACE_INLINE void -ACE_Convert (const char* s, long& l) +ACE_Convert (const char *s, long &l) { l = ACE_OS::strtol (s, 0, 10); } ACE_INLINE void -ACE_Convert (const char* s, int& i) +ACE_Convert (const char *s, int &i) { i = ACE_OS::strtol (s, 0, 10); } ACE_INLINE void -ACE_Convert (const char* s, unsigned long& ul) +ACE_Convert (const char *s, u_long &ul) { ul = ACE_OS::strtoul (s, 0, 10); } ACE_INLINE void -ACE_Convert (const char* s, double& d) +ACE_Convert (const char *s, double &d) { d = ACE_OS::strtod (s, 0); } @@ -171,31 +171,13 @@ #define ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR "10008" #endif /* ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR */ -// Used for the gateway server. -#if !defined (ACE_DEFAULT_GATEWAY_SERVER_PORT) -#define ACE_DEFAULT_GATEWAY_SERVER_PORT 10009 -#endif /* ACE_DEFAULT_GATEWAY_SERVER_PORT */ - -#if !defined (ACE_DEFAULT_GATEWAY_SERVER_PORT_STR) -#define ACE_DEFAULT_GATEWAY_SERVER_PORT_STR "10009" -#endif /* ACE_DEFAULT_GATEWAY_SERVER_PORT_STR */ - -// Used for the peer server. -#if !defined (ACE_DEFAULT_PEER_SERVER_PORT) -#define ACE_DEFAULT_PEER_SERVER_PORT 10010 -#endif /* ACE_DEFAULT_PEER_SERVER_PORT */ - -#if !defined (ACE_DEFAULT_PEER_SERVER_PORT_STR) -#define ACE_DEFAULT_PEER_SERVER_PORT_STR "10010" -#endif /* ACE_DEFAULT_PEER_SERVER_PORT_STR */ - // Used for the time server. #if !defined (ACE_DEFAULT_TIME_SERVER_PORT) -#define ACE_DEFAULT_TIME_SERVER_PORT 10011 +#define ACE_DEFAULT_TIME_SERVER_PORT 10009 #endif /* ACE_DEFAULT_TIME_SERVER_PORT */ #if !defined (ACE_DEFAULT_TIME_SERVER_PORT_STR) -#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10011" +#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10009" #endif /* ACE_DEFAULT_TIME_SERVER_PORT_STR */ #if !defined (ACE_DEFAULT_TIME_SERVER_STR) @@ -207,7 +189,7 @@ #if defined (ACE_HAS_STREAM_PIPES) #define ACE_DEFAULT_RENDEZVOUS "/tmp/fifo.ace" #else -#define ACE_DEFAULT_RENDEZVOUS "localhost:10012" +#define ACE_DEFAULT_RENDEZVOUS "localhost:10010" #endif /* ACE_LACKS_FIFO */ #endif /* ACE_DEFAULT_RENDEZVOUS */ @@ -215,7 +197,7 @@ #if defined (ACE_HAS_STREAM_PIPES) #define ACE_DEFAULT_LOGGER_KEY "/tmp/server_daemon" #else -#define ACE_DEFAULT_LOGGER_KEY "localhost:10013" +#define ACE_DEFAULT_LOGGER_KEY "localhost:10010" #endif /* ACE_HAS_STREAM_PIPES */ #endif /* ACE_DEFAULT_LOGGER_KEY */ @@ -4285,10 +4267,11 @@ public: void *arg = 0); static int rwlock_destroy (ACE_rwlock_t *rw); static int rw_rdlock (ACE_rwlock_t *rw); + static int rw_wrlock (ACE_rwlock_t *rw); static int rw_tryrdlock (ACE_rwlock_t *rw); static int rw_trywrlock (ACE_rwlock_t *rw); + static int rw_trywrlock_upgrade (ACE_rwlock_t *rw); static int rw_unlock (ACE_rwlock_t *rw); - static int rw_wrlock (ACE_rwlock_t *rw); // = A set of wrappers for auto-reset and manuaevents. static int event_init (ACE_event_t *event, @@ -3134,49 +3134,6 @@ ACE_OS::cond_wait (ACE_cond_t *cv, #endif /* ACE_LACKS_COND_T */ ACE_INLINE int -ACE_OS::rw_rdlock (ACE_rwlock_t *rw) -{ - // ACE_TRACE ("ACE_OS::rw_rdlock"); -#if defined (ACE_HAS_THREADS) -#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T) - ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_rdlock (rw), ace_result_), int, -1); -#else /* NT, POSIX, and VxWorks don't support this natively. */ -#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) - ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_); -#endif /* ACE_HAS_DCETHREADS */ - int result = 0; - if (ACE_OS::mutex_lock (&rw->lock_) == -1) - result = -1; // -1 means didn't get the mutex. - else - { - // Give preference to writers who are waiting. - while (rw->ref_count_ < 0 || rw->num_waiting_writers_ > 0) - { - rw->num_waiting_readers_++; - if (ACE_OS::cond_wait (&rw->waiting_readers_, &rw->lock_) == -1) - { - result = -2; // -2 means that we need to release the mutex. - break; - } - rw->num_waiting_readers_--; - } - } - if (result == 0) - rw->ref_count_++; - if (result != -1) - ACE_OS::mutex_unlock (&rw->lock_); -#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) - ACE_PTHREAD_CLEANUP_POP (0); -#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */ - return 0; -#endif /* ACE_HAS_STHREADS */ -#else - ACE_UNUSED_ARG (rw); - ACE_NOTSUP_RETURN (-1); -#endif /* ACE_HAS_THREADS */ -} - -ACE_INLINE int ACE_OS::rw_tryrdlock (ACE_rwlock_t *rw) { // ACE_TRACE ("ACE_OS::rw_tryrdlock"); @@ -3249,6 +3206,94 @@ ACE_OS::rw_trywrlock (ACE_rwlock_t *rw) } ACE_INLINE int +ACE_OS::rw_rdlock (ACE_rwlock_t *rw) +{ + // ACE_TRACE ("ACE_OS::rw_rdlock"); +#if defined (ACE_HAS_THREADS) +#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T) + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_rdlock (rw), ace_result_), int, -1); +#else /* NT, POSIX, and VxWorks don't support this natively. */ +#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) + ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_); +#endif /* ACE_HAS_DCETHREADS */ + int result = 0; + if (ACE_OS::mutex_lock (&rw->lock_) == -1) + result = -1; // -1 means didn't get the mutex. + else + { + // Give preference to writers who are waiting. + while (rw->ref_count_ < 0 || rw->num_waiting_writers_ > 0) + { + rw->num_waiting_readers_++; + if (ACE_OS::cond_wait (&rw->waiting_readers_, &rw->lock_) == -1) + { + result = -2; // -2 means that we need to release the mutex. + break; + } + rw->num_waiting_readers_--; + } + } + if (result == 0) + rw->ref_count_++; + if (result != -1) + ACE_OS::mutex_unlock (&rw->lock_); +#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) + ACE_PTHREAD_CLEANUP_POP (0); +#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */ + return 0; +#endif /* ACE_HAS_STHREADS */ +#else + ACE_UNUSED_ARG (rw); + ACE_NOTSUP_RETURN (-1); +#endif /* ACE_HAS_THREADS */ +} + +ACE_INLINE int +ACE_OS::rw_wrlock (ACE_rwlock_t *rw) +{ + // ACE_TRACE ("ACE_OS::rw_wrlock"); +#if defined (ACE_HAS_THREADS) +#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T) + ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_wrlock (rw), ace_result_), int, -1); +#else /* NT, POSIX, and VxWorks don't support this natively. */ +#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) + ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_); +#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */ + int result = 0; + + if (ACE_OS::mutex_lock (&rw->lock_) == -1) + result = -1; // -1 means didn't get the mutex. + else + { + while (rw->ref_count_ != 0) + { + rw->num_waiting_writers_++; + + if (ACE_OS::cond_wait (&rw->waiting_writers_, &rw->lock_) == -1) + { + result = -2; // -2 means we need to release the mutex. + break; + } + + rw->num_waiting_writers_--; + } + } + if (result == 0) + rw->ref_count_ = -1; + if (result != -1) + ACE_OS::mutex_unlock (&rw->lock_); +#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) + ACE_PTHREAD_CLEANUP_POP (0); +#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */ + return 0; +#endif /* ACE_HAS_STHREADS */ +#else + ACE_UNUSED_ARG (rw); + ACE_NOTSUP_RETURN (-1); +#endif /* ACE_HAS_THREADS */ +} + +ACE_INLINE int ACE_OS::rw_unlock (ACE_rwlock_t *rw) { // ACE_TRACE ("ACE_OS::rw_unlock"); @@ -3264,7 +3309,7 @@ ACE_OS::rw_unlock (ACE_rwlock_t *rw) else if (rw->ref_count_ == -1) // Releasing a writer. rw->ref_count_ = 0; else - assert (!"count should not be 0!\n"); + ACE_ASSERT (!"count should not be 0!\n"); int result; int error = 0; @@ -3293,37 +3338,36 @@ ACE_OS::rw_unlock (ACE_rwlock_t *rw) #endif /* ACE_HAS_THREADS */ } +// Note that the caller of this method *must* already possess this +// lock as a read lock. + ACE_INLINE int -ACE_OS::rw_wrlock (ACE_rwlock_t *rw) +ACE_OS::rw_trywrlock_upgrade (ACE_rwlock_t *rw) { // ACE_TRACE ("ACE_OS::rw_wrlock"); #if defined (ACE_HAS_THREADS) #if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T) - ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_wrlock (rw), ace_result_), int, -1); + // Solaris rwlocks don't support the upgrade feature... + ACE_NOTSUP_RETURN (-1); #else /* NT, POSIX, and VxWorks don't support this natively. */ #if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_); #endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */ int result = 0; + if (ACE_OS::mutex_lock (&rw->lock_) == -1) result = -1; // -1 means didn't get the mutex. - else + else if (rw->ref_count_ != 1) { - while (rw->ref_count_ != 0) - { - rw->num_waiting_writers_++; - - if (ACE_OS::cond_wait (&rw->waiting_writers_, &rw->lock_) == -1) - { - result = -2; // -2 means we need to release the mutex. - break; - } - - rw->num_waiting_writers_--; - } + // There were other readers, so we'll have to bail out. + error = EBUSY; + result = -1; } if (result == 0) + // We force the lock to become a write lock, thereby putting + // ourselves ahead of all other waiting writers. rw->ref_count_ = -1; + if (result != -1) ACE_OS::mutex_unlock (&rw->lock_); #if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) diff --git a/ace/Synch.h b/ace/Synch.h index 4801867327e..92c2642425f 100644 --- a/ace/Synch.h +++ b/ace/Synch.h @@ -1191,6 +1191,14 @@ public: ACE_RW_Thread_Mutex (LPCTSTR name = 0, void *arg = 0); + int tryacquire_write_upgrade (void); + // Conditionally upgrade a read lock to a write lock. This only + // works if there are no other readers present, in which case the + // method returns 0. Otherwise, the method returns -1 and sets + // <errno> to <EBUSY>. Note that the caller of this method *must* + // already possess this lock as a read lock (but this condition is + // not checked by the current implementation). + void dump (void) const; // Dump the state of an object. diff --git a/ace/Synch.i b/ace/Synch.i index e8658950e39..2b9e9da5e26 100644 --- a/ace/Synch.i +++ b/ace/Synch.i @@ -137,6 +137,13 @@ ACE_RW_Mutex::release (void) return ACE_OS::rw_unlock (&this->lock_); } +ACE_INLINE int +ACE_RW_Thread_Mutex::tryacquire_write_upgrade (void) +{ +// ACE_TRACE ("ACE_RW_Thread_Mutex::tryacquire_write_upgrade"); + return ACE_OS::rw_trywrlock_upgrade (&this->lock_); +} + ACE_INLINE int ACE_Mutex::acquire_read (void) { diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp index 68d53f4862e..bfc0ae93aaa 100644 --- a/apps/Gateway/Gateway/Config_Files.cpp +++ b/apps/Gateway/Gateway/Config_Files.cpp @@ -11,41 +11,42 @@ FP_RETURN_TYPE Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry, int &line_number) { - FP_RETURN_TYPE read_result; + FP_RETURN_TYPE result; // Increment the line count. line_number++; // Ignore comments, check for EOF and EOLINE if this succeeds, we // have our connection id. - while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS) - { - if (read_result == FP::EOFILE) - return FP::EOFILE; - else if (read_result == FP::EOLINE - || read_result == FP::COMMENT) - line_number++; - } + + while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS) + if (result == FP::EOFILE) + return FP::EOFILE; + else if (result == FP::EOLINE + || result == FP::COMMENT) + line_number++; // Get the supplier id. - if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS) - return read_result; + result = this->getint (entry.supplier_id_); + if (result != FP::SUCCESS) + return result; // Get the payload type. - if ((read_result = this->getint (entry.type_)) != FP::SUCCESS) - return read_result; + result = this->getint (entry.type_); + if (result != FP::SUCCESS) + return result; // get all the consumers. entry.total_consumers_ = 0; - while ((read_result = this->getint (entry.consumers_[entry.total_consumers_])) - == FP::SUCCESS) + while ((result = this->getint + (entry.consumers_[entry.total_consumers_])) == FP::SUCCESS) ++entry.total_consumers_; // do nothing (should check against max...) - if (read_result == FP::EOLINE || read_result == FP::EOFILE) + if (result == FP::EOLINE || result == FP::EOFILE) return FP::SUCCESS; else - return read_result; + return result; } FP_RETURN_TYPE @@ -53,54 +54,81 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, int &line_number) { char buf[BUFSIZ]; - FP_RETURN_TYPE read_result; - // increment the line count + FP_RETURN_TYPE result; + + // Increment the line count. line_number++; - // Ignore comments, check for EOF and EOLINE - // if this succeeds, we have our connection id - while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS) - { - if (read_result == FP::EOFILE) - return FP::EOFILE; - else if (read_result == FP::EOLINE - || read_result == FP::COMMENT) - line_number++; - } + // Ignore comments, check for EOF and EOLINE if this succeeds, we + // have our connection id + + while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS) + if (result == FP::EOFILE) + return FP::EOFILE; + else if (result == FP::EOLINE + || result == FP::COMMENT) + line_number++; - // get the hostname - if ((read_result = this->getword (entry.host_)) != FP::SUCCESS) - return read_result; + // Get the hostname. + result = this->getword (entry.host_); + if (result != FP::SUCCESS) + return result; ACE_INT32 port; // Get the port number. - if ((read_result = this->getint (port)) != FP::SUCCESS) - return read_result; - else - entry.remote_port_ = (u_short) port; + result = this->getint (port); + if (result == FP::DEFAULT) + { + // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier). + result = this->getword (buf); + if (result != FP::SUCCESS) + return result; + else + entry.proxy_role_ = buf[0]; - // Get the proxy role. - if ((read_result = this->getword (buf)) != FP::SUCCESS) - return read_result; + if (entry.proxy_role_ == 'C') + entry.remote_port_ = Options::instance ()->consumer_connector_port (); + else (entry.proxy_role_ == 'S') + entry.remote_port_ = Options::instance ()->supplier_connector_port (); + else + // Yikes, this is a *weird* error! + entry.remote_port_ = 0; + } + else if (result != FP::SUCCESS) + return result; else - entry.proxy_role_ = buf[0]; + { + entry.remote_port_ = u_short (port); + + // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier). + result = this->getword (buf); + if (result != FP::SUCCESS) + return result; + else + entry.proxy_role_ = buf[0]; + } // Get the max retry delay. - if ((read_result = this->getint (entry.max_retry_timeout_)) != FP::SUCCESS) - return read_result; + result = this->getint (entry.max_retry_timeout_); + if (result == FP::DEFAULT) + entry.max_retry_timeout_ = Options::instance ()->max_timeout (); + else if (result != FP::SUCCESS) + return result; // Get the local port number. - if ((read_result = this->getint (port)) != FP::SUCCESS) - return read_result; + result = this->getint (port); + if (result != FP::SUCCESS) + return result; else - entry.local_port_ = (u_short) port; + entry.local_port_ = u_short (port); ACE_INT32 priority; - // Get the priority - if ((read_result = this->getint (priority)) != FP::SUCCESS) - return read_result; + // Get the priority. + result = this->getint (priority); + if (result != FP::SUCCESS) + return result; else entry.priority_ = priority; @@ -110,11 +138,12 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry, #if defined (DEBUGGING) int main (int argc, char *argv[]) { - if (argc != 4) { -// ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1); - cerr << argv[0] << " CCfilename filename Mapfilename.\n"; - exit (1); - } + if (argc != 4) + { + // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1); + cerr << argv[0] << " CCfilename filename Mapfilename.\n"; + exit (1); + } FP_RETURN_TYPE result; Proxy_Config_Info entry; Proxy_Config_File_Parser CCfile; diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h index fcbe2c5cbb0..2f1f1280bc2 100644 --- a/apps/Gateway/Gateway/Config_Files.h +++ b/apps/Gateway/Gateway/Config_Files.h @@ -60,8 +60,10 @@ class Proxy_Config_File_Parser : public File_Parser<Proxy_Config_Info> // Parser for the Proxy_Handler Connection file. { public: - virtual FP::Return_Type - read_entry (Proxy_Config_Info &entry, int &line_number); + virtual FP::Return_Type read_entry (Proxy_Config_Info &entry, + int &line_number); + // Read in a <Proxy_Config_Info> entry. + }; class Consumer_Config_Info @@ -69,8 +71,10 @@ class Consumer_Config_Info // Stores the information in a Consumer Map entry. { public: - enum { - MAX_CONSUMERS = 1000 // Total number of multicast consumers. + enum + { + MAX_CONSUMERS = 1000 + // Total number of multicast consumers. }; ACE_INT32 proxy_id_; @@ -94,8 +98,9 @@ class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_Info> // Parser for the Consumer Map file. { public: - virtual FP::Return_Type - read_entry (Consumer_Config_Info &entry, int &line_number); + virtual FP::Return_Type read_entry (Consumer_Config_Info &entry, + int &line_number); + // Read in a <Consumer_Config_Info> entry. }; #endif /* _CONFIG_FILES */ diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp index 4d7c004e94c..bfde7a6911e 100644 --- a/apps/Gateway/Gateway/File_Parser.cpp +++ b/apps/Gateway/Gateway/File_Parser.cpp @@ -36,24 +36,32 @@ File_Parser<ENTRY>::getword (char buf[]) // Get the next string from the file via this->readword() // Check make sure the string forms a valid number. + template <class ENTRY> FP_RETURN_TYPE File_Parser<ENTRY>::getint (ACE_INT32 &value) { char buf[BUFSIZ]; - FP_RETURN_TYPE read_result = this->readword(buf); + FP_RETURN_TYPE read_result = this->readword (buf); + if (read_result == FP::SUCCESS) { - // ptr is used for error checking with ACE_OS::strtol - char *ptr; - - // try to convert the buf to a decimal number - value = ACE_OS::strtol (buf, &ptr, 10); - - // check if the buf is a decimal or not - if (value == 0 && ptr == buf) - return FP::PARSE_ERROR; + // Check to see if this is the "use the default value" symbol? + if (buf[0] == '*') + return FP::DEFAULT; else - return FP::SUCCESS; + { + // ptr is used for error checking with ACE_OS::strtol. + char *ptr; + + // try to convert the buf to a decimal number + value = ACE_OS::strtol (buf, &ptr, 10); + + // check if the buf is a decimal or not + if (value == 0 && ptr == buf) + return FP::PARSE_ERROR; + else + return FP::SUCCESS; + } } else return read_result; diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h index bb8da87c424..64b4d49db59 100644 --- a/apps/Gateway/Gateway/File_Parser.h +++ b/apps/Gateway/Gateway/File_Parser.h @@ -30,6 +30,7 @@ public: EOFILE, SUCCESS, COMMENT, + DEFAULT, PARSE_ERROR }; }; @@ -45,8 +46,10 @@ public: int open (const char filename[]); int close (void); - virtual FP::Return_Type read_entry (ENTRY &, int &line_number) = 0; - // Implementations use protected methods to fill in the entry. + virtual FP::Return_Type read_entry (ENTRY &entry, + int &line_number) = 0; + // Pure virtual hook that subclasses override and use the protected + // methods to fill in the <entry>. protected: FP::Return_Type getword (char buf[]); diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp index f96fc44436d..d4ee425caf3 100644 --- a/apps/Gateway/Gateway/Options.cpp +++ b/apps/Gateway/Gateway/Options.cpp @@ -27,9 +27,9 @@ Options::Options (void) threading_strategy_ (REACTIVE), options_ (0), supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT), - consumer_acceptor_port_ (DEFAULT_PEER_CONSUMER_PORT), + consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT), supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT), - consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT) + max_timeout_ (MAX_TIMEOUT) { ACE_OS::strcpy (this->proxy_config_file_, "proxy_config"); ACE_OS::strcpy (this->consumer_config_file_, "consumer_config"); @@ -64,7 +64,13 @@ Options::performance_window (void) const return this->performance_window_; } -int +long +Options::max_timeout (void) const +{ + return this->max_timeout_; +} + +int Options::blocking_semantics (void) const { return this->blocking_semantics_; @@ -124,7 +130,10 @@ int Options::parse_args (int argc, char *argv[]) { // Assign defaults. - ACE_Get_Opt get_opt (argc, argv, "a:bC:c:dP:p:q:t:vw:", 0); + ACE_Get_Opt get_opt (argc, + argv, + "a:bC:c:dP:p:q:r:t:vw:", + 0); for (int c; (c = get_opt ()) != EOF; ) { diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h index 4adf19dfaa3..9b83e5ba7a3 100644 --- a/apps/Gateway/Gateway/Options.h +++ b/apps/Gateway/Gateway/Options.h @@ -39,6 +39,9 @@ public: CONSUMER_ACCEPTOR = 010, SUPPLIER_CONNECTOR = 020, CONSUMER_CONNECTOR = 040 + + DEFAULT_TIMEOUT = 32 + // The maximum timeout for trying to re-establish connections. }; static Options *instance (void); @@ -105,6 +108,9 @@ public: const char *consumer_config_file (void) const; // Name of the consumer map configuration file. + long max_timeout (void) const; + // The maximum retry timeout delay. + private: Options (void); // Initialization. @@ -152,6 +158,9 @@ private: // The connector port number, i.e., the one that we use to actively // establish connections with a gatewayd and create a Consumer. + long max_timeout_; + // The maximum retry timeout delay. + char proxy_config_file_[MAXPATHLEN + 1]; // Name of the connection configuration file. diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config index 5f05972e5aa..7d20a50a579 100644 --- a/apps/Gateway/Gateway/consumer_config +++ b/apps/Gateway/Gateway/consumer_config @@ -1,11 +1,12 @@ -# Configuration file for specifying which Consumers will receive -# events from which Suppliers. For now, the Gateway only allows -# Consumers to "subscribe" to receive events from particular -# Suppliers. A more flexible implementation will allow Consumers to -# subscribe to particular types of events, as well. +# Configuration file that the gatewayd process uses to determine which +# Consumers will receive events from which Suppliers. For now, the +# Gateway only allows Consumers to "subscribe" to receive events from +# particular Suppliers. A more flexible implementation will allow +# Consumers to subscribe to particular types of events, as well. # -# Here's an explanation of the fields in this file, and how they -# relate to fields in the "proxy_config" file. +# The following provides an explanation for the fields in this file, +# and how they relate to fields in the corresponding "proxy_config" +# file. # # 1. Proxy ID -- Each Proxy is given a unique ID that is used # in the "consumer_config" file to specify to which Consumers diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config index f68cb556353..cdc7d07ffa3 100644 --- a/apps/Gateway/Gateway/proxy_config +++ b/apps/Gateway/Gateway/proxy_config @@ -1,8 +1,9 @@ -# Configuration file for specifying connection information about -# proxies. +# Configuration file that the gatewayd process uses to determine +# connection information about proxies. # -# Here's an explanation of the fields in this file, and how they -# relate to fields in the "consumer_config" file. +# The following provides an explanation for the fields in this file, +# and how they relate to fields in the corresponding "consumer_config" +# file. # # 1. Proxy ID -- Each Proxy is given a unique ID that is used # in the "consumer_config" file to specify to which Consumers @@ -16,12 +17,18 @@ # # 3. Remote Port -- The port number where the remote # Supplier/Consumer peerd process is listening on. +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. # # 4. Proxy Role -- i.e., Consumer ('C') or Supplier ('S') # # 5. Max Retry Timeout -- The maximum amount of time that we'll # wait between retry attempts (these start at 1 second and # double until they reach the Max Retry Timeout). +# If this is a '*' character it is an indication to the +# Gateway to use the "default value," e.g., which can be provided +# on the command-line, etc. # # 6. Local Port -- The port number that we want to use for # our local Proxy connection. If this is the value 0, then diff --git a/tests/Reader_Writer_Test.cpp b/tests/Reader_Writer_Test.cpp index b9e58cf8bfe..4b85fdabf5a 100644 --- a/tests/Reader_Writer_Test.cpp +++ b/tests/Reader_Writer_Test.cpp @@ -42,19 +42,19 @@ static size_t n_writers = 4; static ACE_thread_t shared_data; // Lock for shared_data. -static ACE_RW_Mutex rw_mutex; +static ACE_RW_Thread_Mutex rw_mutex; // Count of the number of readers and writers. -static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers; -static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_writers; +static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers = 0; +static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_writers = 0; // Explain usage and exit. static void print_usage_and_die (void) { - ACE_DEBUG ((LM_DEBUG, - "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n")); - ACE_OS::exit (1); + ACE_ERROR ((LM_ERROR, + "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n%a", + 1)); } static void @@ -88,21 +88,23 @@ parse_args (int argc, char *argv[]) static void * reader (void *) { - ACE_DEBUG ((LM_DEBUG, " (%t) reader starting\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) reader starting\n")); // We use a random pause, around 2msec with 1msec jittering. - int usecs = 1000 + ACE_OS::rand() % 2000; - ACE_Time_Value pause(0, usecs); + int usecs = 1000 + ACE_OS::rand () % 2000; + ACE_Time_Value pause (0, usecs); - for (size_t iterations = 1; iterations <= n_iterations; iterations++) + for (size_t iterations = 1; + iterations <= n_iterations; + iterations++) { - ACE_OS::sleep(pause); + ACE_OS::sleep (pause); ACE_Read_Guard<ACE_RW_Mutex> g (rw_mutex); - // int n = ++current_readers; - // ACE_DEBUG ((LM_DEBUG, " (%t) I'm reader number %d\n", n)); if (current_writers > 0) - ACE_DEBUG ((LM_DEBUG, " (%t) writers found!!!\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) writers found!!!\n")); ACE_thread_t data = shared_data; @@ -110,73 +112,106 @@ reader (void *) { ACE_Thread::yield (); - if (!ACE_OS::thr_equal (shared_data, data)) + if (ACE_OS::thr_equal (shared_data, data) == 0) ACE_DEBUG ((LM_DEBUG, - " (%t) somebody changed %d to %d\n", - data, shared_data)); + "(%t) somebody changed %d to %d\n", + data, + shared_data)); + } + + if (rw_mutex.tryacquire_write_upgrade () == 0) + { + current_writers++; + + ACE_DEBUG ((LM_DEBUG, + "(%t) upgraded to write lock!\n")); + + ACE_thread_t self = ACE_Thread::self (); + + shared_data = self; + + for (size_t loop = 1; loop <= n_loops; loop++) + { + if (ACE_OS::thr_equal (shared_data, data) == 0) + ACE_DEBUG ((LM_DEBUG, + "(%t) somebody changed %d to %d\n", + data, + shared_data)); + } + + --current_writers; } --current_readers; - //ACE_DEBUG ((LM_DEBUG, " (%t) done with reading guarded data\n")); - ACE_DEBUG((LM_DEBUG, " (%t) read %d done at %T\n", iterations)); - // ACE_Thread::yield (); + ACE_DEBUG ((LM_DEBUG, + "(%t) read %d done at %T\n", + iterations)); } return 0; } -// Iterate <n_iterations> each time modifying the global data -// and checking that nobody steps on it while we can write it. +// Iterate <n_iterations> each time modifying the global data and +// checking that nobody steps on it while we can write it. static void * writer (void *) { - ACE_DEBUG ((LM_DEBUG, " (%t) writer starting\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) writer starting\n")); // We use a random pause, around 2msec with 1msec jittering. - int usecs = 1000 + ACE_OS::rand() % 2000; - ACE_Time_Value pause(0, usecs); + int usecs = 1000 + ACE_OS::rand () % 2000; + ACE_Time_Value pause (0, usecs); - for (size_t iterations = 1; iterations <= n_iterations; iterations++) + for (size_t iterations = 1; + iterations <= n_iterations; + iterations++) { - ACE_OS::sleep(pause); - - ACE_Write_Guard<ACE_RW_Mutex> g (rw_mutex); + ACE_OS::sleep (pause); - ++current_writers; - //ACE_DEBUG ((LM_DEBUG, " (%t) writing to guarded data\n")); + { + // Add an additional scope here to bound the duration that the + // write lock is held. + ACE_Write_Guard<ACE_RW_Mutex> g (rw_mutex); - if (current_writers > 1) - ACE_DEBUG ((LM_DEBUG, " (%t) other writers found!!!\n")); + ++current_writers; - if (current_readers > 0) - ACE_DEBUG ((LM_DEBUG, " (%t) readers found!!!\n")); + if (current_writers > 1) + ACE_DEBUG ((LM_DEBUG, + "(%t) other writers found!!!\n")); - ACE_thread_t self = ACE_Thread::self (); + if (current_readers > 0) + ACE_DEBUG ((LM_DEBUG, + "(%t) readers found!!!\n")); - shared_data = self; + ACE_thread_t self = ACE_Thread::self (); - for (size_t loop = 1; loop <= n_loops; loop++) - { - ACE_Thread::yield (); + shared_data = self; - if (!ACE_OS::thr_equal (shared_data, self)) - ACE_DEBUG ((LM_DEBUG, - " (%t) somebody wrote on my data %d\n", - shared_data)); - } + for (size_t loop = 1; + loop <= n_loops; + loop++) + { + ACE_Thread::yield (); - --current_writers; + if (ACE_OS::thr_equal (shared_data, self) == 0) + ACE_DEBUG ((LM_DEBUG, + "(%t) somebody wrote on my data %d\n", + shared_data)); + } - //ACE_DEBUG ((LM_DEBUG, " (%t) done with guarded data\n")); + --current_writers; + } - ACE_DEBUG((LM_DEBUG, " (%t) write %d done at %T\n", iterations)); - // ACE_Thread::yield (); + ACE_DEBUG ((LM_DEBUG, + "(%t) write %d done at %T\n", + iterations)); } + return 0; } - #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Atomic_Op<ACE_Thread_Mutex, int>; template class ACE_Read_Guard<ACE_RW_Mutex>; @@ -200,29 +235,29 @@ int main (int argc, char *argv[]) #if defined (ACE_HAS_THREADS) parse_args (argc, argv); - current_readers = 0; // Possibly already done - current_writers = 0; // Possibly already done - - ACE_DEBUG ((LM_DEBUG, " (%t) main thread starting\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) main thread starting\n")); if (ACE_Thread_Manager::instance ()->spawn_n (n_readers, - ACE_THR_FUNC (reader), - 0, - THR_NEW_LWP) == -1) + ACE_THR_FUNC (reader), + 0, + THR_NEW_LWP) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1); else if (ACE_Thread_Manager::instance ()->spawn_n (n_writers, - ACE_THR_FUNC (writer), - 0, - THR_NEW_LWP) == -1) + ACE_THR_FUNC (writer), + 0, + THR_NEW_LWP) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1); ACE_Thread_Manager::instance ()->wait (); - ACE_DEBUG ((LM_DEBUG, " (%t) exiting main thread\n")); + ACE_DEBUG ((LM_DEBUG, + "(%t) exiting main thread\n")); #else ACE_UNUSED_ARG (argc); ACE_UNUSED_ARG (argv); - ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n")); + ACE_ERROR ((LM_ERROR, + "threads not supported on this platform\n")); #endif /* ACE_HAS_THREADS */ ACE_END_TEST; return 0; |