summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog-98a9
-rw-r--r--ace/Env_Value_T.cpp2
-rw-r--r--ace/Env_Value_T.h45
-rw-r--r--ace/Env_Value_T.i37
-rw-r--r--ace/OS.h29
-rw-r--r--ace/OS.i162
-rw-r--r--ace/Synch.h8
-rw-r--r--ace/Synch.i7
-rw-r--r--apps/Gateway/Gateway/Config_Files.cpp135
-rw-r--r--apps/Gateway/Gateway/Config_Files.h17
-rw-r--r--apps/Gateway/Gateway/File_Parser.cpp30
-rw-r--r--apps/Gateway/Gateway/File_Parser.h7
-rw-r--r--apps/Gateway/Gateway/Options.cpp17
-rw-r--r--apps/Gateway/Gateway/Options.h9
-rw-r--r--apps/Gateway/Gateway/consumer_config15
-rw-r--r--apps/Gateway/Gateway/proxy_config15
-rw-r--r--tests/Reader_Writer_Test.cpp159
17 files changed, 430 insertions, 273 deletions
diff --git a/ChangeLog-98a b/ChangeLog-98a
index f2d06d41051..aefa6554c46 100644
--- a/ChangeLog-98a
+++ b/ChangeLog-98a
@@ -1,3 +1,12 @@
+Fri Jan 2 00:17:04 1998 Douglas C. Schmidt <schmidt@tango.cs.wustl.edu>
+
+ * Reader_Writer_Test.cpp: Changed from an ACE_RW_Mutex to an
+ ACE_RW_Thread_Mutex so that we can use the new
+ tryacquire_write_upgrade() method.
+
+ * ace/Env_Value_T.h: Reformatted the file a bit to conform to
+ ACE coding conventions.
+
Thu Jan 1 12:04:29 1998 Douglas C. Schmidt <schmidt@flamenco.cs.wustl.edu>
* apps/Gateway/Peer/Options.cpp (parse_args): Changed the option
diff --git a/ace/Env_Value_T.cpp b/ace/Env_Value_T.cpp
index 25a2efed4d1..758f3203137 100644
--- a/ace/Env_Value_T.cpp
+++ b/ace/Env_Value_T.cpp
@@ -8,5 +8,5 @@
template void ACE_Convert(const char*, char*&);
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate void ACE_Convert (const char*, char*&)
-#endif
+#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
diff --git a/ace/Env_Value_T.h b/ace/Env_Value_T.h
index dbce975361e..5a2b88ea434 100644
--- a/ace/Env_Value_T.h
+++ b/ace/Env_Value_T.h
@@ -1,4 +1,4 @@
-// This may look like C, but it's really -*- C++ -*-
+/* This may look like C, but it's really -*- C++ -*- */
// $Id$
// ============================================================================
@@ -16,31 +16,29 @@
// ============================================================================
#if !defined (ACE_ENV_VALUE_T_H)
-# define ACE_ENV_VALUE_T_H
+#define ACE_ENV_VALUE_T_H
template <class T>
class ACE_Env_Value
-//
-// = TITLE
-// Enviroment Variable Value
-//
-// = DESCRIPTION
-// Reads a variable from the user enviroment, providing a default
-// value.
-//
-// = AUTHOR
-// Chris Cleeland, Carlos O'Ryan
-//
{
+ // = TITLE
+ // Enviroment Variable Value
+ //
+ // = DESCRIPTION
+ // Reads a variable from the user enviroment, providing a default
+ // value.
+ //
+ // = AUTHOR
+ // Chris Cleeland, Carlos O'Ryan
public:
ACE_Env_Value (void);
// Default constructor which isn't bound to a specific environment
// variable name or a default value. Before being useful it must
// <open()>ed.
- ACE_Env_Value (const char* varname,
- const T& vardefault);
- // Constructor which calls <open()>.
+ ACE_Env_Value (const char *varname,
+ const T &vardefault);
+ // Constructor that calls <open>.
~ACE_Env_Value (void);
// Destroy the value.
@@ -48,26 +46,25 @@ public:
operator const T (void) const;
// Returns the value as type T.
- void open (const char* varname, const T& defval);
+ void open (const char *varname, const T &defval);
// The constructor, read <varname> from the enviroment, using
// <vardefault> as its value if it is not defined.
- const char* varname (void) const;
+ const char *varname (void) const;
// Returns the name of the variable being tracked.
private:
- ACE_UNIMPLEMENTED_FUNC(ACE_Env_Value (const ACE_Env_Value&))
- ACE_UNIMPLEMENTED_FUNC(ACE_Env_Value& operator= (const ACE_Env_Value&))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Env_Value (const ACE_Env_Value &))
+ ACE_UNIMPLEMENTED_FUNC (ACE_Env_Value &operator= (const ACE_Env_Value &))
// Disallow copying and assignment.
-private:
void fetch_value (void);
- const char* varname_;
+ const char *varname_;
T value_;
};
-template <class T> void ACE_Convert (const char* s, T& t);
+template <class T> void ACE_Convert (const char *s, T &t);
// Function to convert a string <s> into type <T>.
#if defined (__ACE_INLINE__)
@@ -82,4 +79,4 @@ template <class T> void ACE_Convert (const char* s, T& t);
#pragma implementation ("Env_Value_T.cpp")
#endif /* ACE_TEMPLATES_REQUIRE_PRAGMA */
-#endif
+#endif /* ACE_ENV_VALUE_T_H */
diff --git a/ace/Env_Value_T.i b/ace/Env_Value_T.i
index 0770f86c79c..31828bfe385 100644
--- a/ace/Env_Value_T.i
+++ b/ace/Env_Value_T.i
@@ -9,18 +9,22 @@ ACE_Env_Value<T>::operator const T (void) const
template <class T> ACE_INLINE
ACE_Env_Value<T>::ACE_Env_Value (void)
: varname_ (0)
-{}
+{
+}
template <class T> ACE_INLINE
-ACE_Env_Value<T>::ACE_Env_Value (const char* varname, const T& defval)
- : varname_ (varname), value_(defval)
+ACE_Env_Value<T>::ACE_Env_Value (const char *varname,
+ const T &defval)
+ : varname_ (varname),
+ value_(defval)
{
this->fetch_value ();
}
template <class T> ACE_INLINE void
-ACE_Env_Value<T>::open (const char* varname, const T& defval)
+ACE_Env_Value<T>::open (const char *varname,
+ const T &defval)
{
varname_ = varname;
value_ = defval;
@@ -28,20 +32,19 @@ ACE_Env_Value<T>::open (const char* varname, const T& defval)
}
template <class T> ACE_INLINE void
-ACE_Env_Value<T>::fetch_value ()
+ACE_Env_Value<T>::fetch_value (void)
{
- const char* env = ACE_OS::getenv (varname_);
+ const char *env = ACE_OS::getenv (varname_);
+
if (env != 0)
ACE_Convert (env, value_);
}
-
template <class T> ACE_INLINE
ACE_Env_Value<T>::~ACE_Env_Value (void)
{
}
-
// Default calls a CTOR on type T of the form 'T::T(const char*)', but
// users can feel free to create their own specialized conversion
// functions if necessary, as shown below. Note that for 'char*' the
@@ -49,43 +52,43 @@ ACE_Env_Value<T>::~ACE_Env_Value (void)
// conversion will be necessary.
template <class T> ACE_INLINE void
-ACE_Convert (const char* s, T& t)
+ACE_Convert (const char *s, T &t)
{
- t = T(s);
+ t = T (s);
}
ACE_INLINE void
-ACE_Convert (const char* s, char*& v)
+ACE_Convert (const char *s, char *&v)
{
- v = (char*)s;
+ v = (char *) s;
}
ACE_INLINE void
-ACE_Convert (const char* s, short& si)
+ACE_Convert (const char *s, short &si)
{
si = ACE_OS::strtol (s, 0, 10);
}
ACE_INLINE void
-ACE_Convert (const char* s, long& l)
+ACE_Convert (const char *s, long &l)
{
l = ACE_OS::strtol (s, 0, 10);
}
ACE_INLINE void
-ACE_Convert (const char* s, int& i)
+ACE_Convert (const char *s, int &i)
{
i = ACE_OS::strtol (s, 0, 10);
}
ACE_INLINE void
-ACE_Convert (const char* s, unsigned long& ul)
+ACE_Convert (const char *s, u_long &ul)
{
ul = ACE_OS::strtoul (s, 0, 10);
}
ACE_INLINE void
-ACE_Convert (const char* s, double& d)
+ACE_Convert (const char *s, double &d)
{
d = ACE_OS::strtod (s, 0);
}
diff --git a/ace/OS.h b/ace/OS.h
index 8046686dfee..a663512f9b0 100644
--- a/ace/OS.h
+++ b/ace/OS.h
@@ -171,31 +171,13 @@
#define ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR "10008"
#endif /* ACE_DEFAULT_THR_LOGGING_SERVER_PORT_STR */
-// Used for the gateway server.
-#if !defined (ACE_DEFAULT_GATEWAY_SERVER_PORT)
-#define ACE_DEFAULT_GATEWAY_SERVER_PORT 10009
-#endif /* ACE_DEFAULT_GATEWAY_SERVER_PORT */
-
-#if !defined (ACE_DEFAULT_GATEWAY_SERVER_PORT_STR)
-#define ACE_DEFAULT_GATEWAY_SERVER_PORT_STR "10009"
-#endif /* ACE_DEFAULT_GATEWAY_SERVER_PORT_STR */
-
-// Used for the peer server.
-#if !defined (ACE_DEFAULT_PEER_SERVER_PORT)
-#define ACE_DEFAULT_PEER_SERVER_PORT 10010
-#endif /* ACE_DEFAULT_PEER_SERVER_PORT */
-
-#if !defined (ACE_DEFAULT_PEER_SERVER_PORT_STR)
-#define ACE_DEFAULT_PEER_SERVER_PORT_STR "10010"
-#endif /* ACE_DEFAULT_PEER_SERVER_PORT_STR */
-
// Used for the time server.
#if !defined (ACE_DEFAULT_TIME_SERVER_PORT)
-#define ACE_DEFAULT_TIME_SERVER_PORT 10011
+#define ACE_DEFAULT_TIME_SERVER_PORT 10009
#endif /* ACE_DEFAULT_TIME_SERVER_PORT */
#if !defined (ACE_DEFAULT_TIME_SERVER_PORT_STR)
-#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10011"
+#define ACE_DEFAULT_TIME_SERVER_PORT_STR "10009"
#endif /* ACE_DEFAULT_TIME_SERVER_PORT_STR */
#if !defined (ACE_DEFAULT_TIME_SERVER_STR)
@@ -207,7 +189,7 @@
#if defined (ACE_HAS_STREAM_PIPES)
#define ACE_DEFAULT_RENDEZVOUS "/tmp/fifo.ace"
#else
-#define ACE_DEFAULT_RENDEZVOUS "localhost:10012"
+#define ACE_DEFAULT_RENDEZVOUS "localhost:10010"
#endif /* ACE_LACKS_FIFO */
#endif /* ACE_DEFAULT_RENDEZVOUS */
@@ -215,7 +197,7 @@
#if defined (ACE_HAS_STREAM_PIPES)
#define ACE_DEFAULT_LOGGER_KEY "/tmp/server_daemon"
#else
-#define ACE_DEFAULT_LOGGER_KEY "localhost:10013"
+#define ACE_DEFAULT_LOGGER_KEY "localhost:10010"
#endif /* ACE_HAS_STREAM_PIPES */
#endif /* ACE_DEFAULT_LOGGER_KEY */
@@ -4285,10 +4267,11 @@ public:
void *arg = 0);
static int rwlock_destroy (ACE_rwlock_t *rw);
static int rw_rdlock (ACE_rwlock_t *rw);
+ static int rw_wrlock (ACE_rwlock_t *rw);
static int rw_tryrdlock (ACE_rwlock_t *rw);
static int rw_trywrlock (ACE_rwlock_t *rw);
+ static int rw_trywrlock_upgrade (ACE_rwlock_t *rw);
static int rw_unlock (ACE_rwlock_t *rw);
- static int rw_wrlock (ACE_rwlock_t *rw);
// = A set of wrappers for auto-reset and manuaevents.
static int event_init (ACE_event_t *event,
diff --git a/ace/OS.i b/ace/OS.i
index 59d5494886f..08e42815c93 100644
--- a/ace/OS.i
+++ b/ace/OS.i
@@ -3134,49 +3134,6 @@ ACE_OS::cond_wait (ACE_cond_t *cv,
#endif /* ACE_LACKS_COND_T */
ACE_INLINE int
-ACE_OS::rw_rdlock (ACE_rwlock_t *rw)
-{
- // ACE_TRACE ("ACE_OS::rw_rdlock");
-#if defined (ACE_HAS_THREADS)
-#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T)
- ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_rdlock (rw), ace_result_), int, -1);
-#else /* NT, POSIX, and VxWorks don't support this natively. */
-#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
- ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_);
-#endif /* ACE_HAS_DCETHREADS */
- int result = 0;
- if (ACE_OS::mutex_lock (&rw->lock_) == -1)
- result = -1; // -1 means didn't get the mutex.
- else
- {
- // Give preference to writers who are waiting.
- while (rw->ref_count_ < 0 || rw->num_waiting_writers_ > 0)
- {
- rw->num_waiting_readers_++;
- if (ACE_OS::cond_wait (&rw->waiting_readers_, &rw->lock_) == -1)
- {
- result = -2; // -2 means that we need to release the mutex.
- break;
- }
- rw->num_waiting_readers_--;
- }
- }
- if (result == 0)
- rw->ref_count_++;
- if (result != -1)
- ACE_OS::mutex_unlock (&rw->lock_);
-#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
- ACE_PTHREAD_CLEANUP_POP (0);
-#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */
- return 0;
-#endif /* ACE_HAS_STHREADS */
-#else
- ACE_UNUSED_ARG (rw);
- ACE_NOTSUP_RETURN (-1);
-#endif /* ACE_HAS_THREADS */
-}
-
-ACE_INLINE int
ACE_OS::rw_tryrdlock (ACE_rwlock_t *rw)
{
// ACE_TRACE ("ACE_OS::rw_tryrdlock");
@@ -3249,6 +3206,94 @@ ACE_OS::rw_trywrlock (ACE_rwlock_t *rw)
}
ACE_INLINE int
+ACE_OS::rw_rdlock (ACE_rwlock_t *rw)
+{
+ // ACE_TRACE ("ACE_OS::rw_rdlock");
+#if defined (ACE_HAS_THREADS)
+#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T)
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_rdlock (rw), ace_result_), int, -1);
+#else /* NT, POSIX, and VxWorks don't support this natively. */
+#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
+ ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_);
+#endif /* ACE_HAS_DCETHREADS */
+ int result = 0;
+ if (ACE_OS::mutex_lock (&rw->lock_) == -1)
+ result = -1; // -1 means didn't get the mutex.
+ else
+ {
+ // Give preference to writers who are waiting.
+ while (rw->ref_count_ < 0 || rw->num_waiting_writers_ > 0)
+ {
+ rw->num_waiting_readers_++;
+ if (ACE_OS::cond_wait (&rw->waiting_readers_, &rw->lock_) == -1)
+ {
+ result = -2; // -2 means that we need to release the mutex.
+ break;
+ }
+ rw->num_waiting_readers_--;
+ }
+ }
+ if (result == 0)
+ rw->ref_count_++;
+ if (result != -1)
+ ACE_OS::mutex_unlock (&rw->lock_);
+#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
+ ACE_PTHREAD_CLEANUP_POP (0);
+#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */
+ return 0;
+#endif /* ACE_HAS_STHREADS */
+#else
+ ACE_UNUSED_ARG (rw);
+ ACE_NOTSUP_RETURN (-1);
+#endif /* ACE_HAS_THREADS */
+}
+
+ACE_INLINE int
+ACE_OS::rw_wrlock (ACE_rwlock_t *rw)
+{
+ // ACE_TRACE ("ACE_OS::rw_wrlock");
+#if defined (ACE_HAS_THREADS)
+#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T)
+ ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_wrlock (rw), ace_result_), int, -1);
+#else /* NT, POSIX, and VxWorks don't support this natively. */
+#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
+ ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_);
+#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */
+ int result = 0;
+
+ if (ACE_OS::mutex_lock (&rw->lock_) == -1)
+ result = -1; // -1 means didn't get the mutex.
+ else
+ {
+ while (rw->ref_count_ != 0)
+ {
+ rw->num_waiting_writers_++;
+
+ if (ACE_OS::cond_wait (&rw->waiting_writers_, &rw->lock_) == -1)
+ {
+ result = -2; // -2 means we need to release the mutex.
+ break;
+ }
+
+ rw->num_waiting_writers_--;
+ }
+ }
+ if (result == 0)
+ rw->ref_count_ = -1;
+ if (result != -1)
+ ACE_OS::mutex_unlock (&rw->lock_);
+#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
+ ACE_PTHREAD_CLEANUP_POP (0);
+#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */
+ return 0;
+#endif /* ACE_HAS_STHREADS */
+#else
+ ACE_UNUSED_ARG (rw);
+ ACE_NOTSUP_RETURN (-1);
+#endif /* ACE_HAS_THREADS */
+}
+
+ACE_INLINE int
ACE_OS::rw_unlock (ACE_rwlock_t *rw)
{
// ACE_TRACE ("ACE_OS::rw_unlock");
@@ -3264,7 +3309,7 @@ ACE_OS::rw_unlock (ACE_rwlock_t *rw)
else if (rw->ref_count_ == -1) // Releasing a writer.
rw->ref_count_ = 0;
else
- assert (!"count should not be 0!\n");
+ ACE_ASSERT (!"count should not be 0!\n");
int result;
int error = 0;
@@ -3293,37 +3338,36 @@ ACE_OS::rw_unlock (ACE_rwlock_t *rw)
#endif /* ACE_HAS_THREADS */
}
+// Note that the caller of this method *must* already possess this
+// lock as a read lock.
+
ACE_INLINE int
-ACE_OS::rw_wrlock (ACE_rwlock_t *rw)
+ACE_OS::rw_trywrlock_upgrade (ACE_rwlock_t *rw)
{
// ACE_TRACE ("ACE_OS::rw_wrlock");
#if defined (ACE_HAS_THREADS)
#if defined (ACE_HAS_STHREADS) || !defined (ACE_LACKS_RWLOCK_T)
- ACE_OSCALL_RETURN (ACE_ADAPT_RETVAL (::rw_wrlock (rw), ace_result_), int, -1);
+ // Solaris rwlocks don't support the upgrade feature...
+ ACE_NOTSUP_RETURN (-1);
#else /* NT, POSIX, and VxWorks don't support this natively. */
#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
ACE_PTHREAD_CLEANUP_PUSH (&rw->lock_);
#endif /* defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS) */
int result = 0;
+
if (ACE_OS::mutex_lock (&rw->lock_) == -1)
result = -1; // -1 means didn't get the mutex.
- else
+ else if (rw->ref_count_ != 1)
{
- while (rw->ref_count_ != 0)
- {
- rw->num_waiting_writers_++;
-
- if (ACE_OS::cond_wait (&rw->waiting_writers_, &rw->lock_) == -1)
- {
- result = -2; // -2 means we need to release the mutex.
- break;
- }
-
- rw->num_waiting_writers_--;
- }
+ // There were other readers, so we'll have to bail out.
+ error = EBUSY;
+ result = -1;
}
if (result == 0)
+ // We force the lock to become a write lock, thereby putting
+ // ourselves ahead of all other waiting writers.
rw->ref_count_ = -1;
+
if (result != -1)
ACE_OS::mutex_unlock (&rw->lock_);
#if defined (ACE_HAS_DCETHREADS) || defined (ACE_HAS_PTHREADS)
diff --git a/ace/Synch.h b/ace/Synch.h
index 4801867327e..92c2642425f 100644
--- a/ace/Synch.h
+++ b/ace/Synch.h
@@ -1191,6 +1191,14 @@ public:
ACE_RW_Thread_Mutex (LPCTSTR name = 0,
void *arg = 0);
+ int tryacquire_write_upgrade (void);
+ // Conditionally upgrade a read lock to a write lock. This only
+ // works if there are no other readers present, in which case the
+ // method returns 0. Otherwise, the method returns -1 and sets
+ // <errno> to <EBUSY>. Note that the caller of this method *must*
+ // already possess this lock as a read lock (but this condition is
+ // not checked by the current implementation).
+
void dump (void) const;
// Dump the state of an object.
diff --git a/ace/Synch.i b/ace/Synch.i
index e8658950e39..2b9e9da5e26 100644
--- a/ace/Synch.i
+++ b/ace/Synch.i
@@ -137,6 +137,13 @@ ACE_RW_Mutex::release (void)
return ACE_OS::rw_unlock (&this->lock_);
}
+ACE_INLINE int
+ACE_RW_Thread_Mutex::tryacquire_write_upgrade (void)
+{
+// ACE_TRACE ("ACE_RW_Thread_Mutex::tryacquire_write_upgrade");
+ return ACE_OS::rw_trywrlock_upgrade (&this->lock_);
+}
+
ACE_INLINE int
ACE_Mutex::acquire_read (void)
{
diff --git a/apps/Gateway/Gateway/Config_Files.cpp b/apps/Gateway/Gateway/Config_Files.cpp
index 68d53f4862e..bfc0ae93aaa 100644
--- a/apps/Gateway/Gateway/Config_Files.cpp
+++ b/apps/Gateway/Gateway/Config_Files.cpp
@@ -11,41 +11,42 @@ FP_RETURN_TYPE
Consumer_Config_File_Parser::read_entry (Consumer_Config_Info &entry,
int &line_number)
{
- FP_RETURN_TYPE read_result;
+ FP_RETURN_TYPE result;
// Increment the line count.
line_number++;
// Ignore comments, check for EOF and EOLINE if this succeeds, we
// have our connection id.
- while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
- {
- if (read_result == FP::EOFILE)
- return FP::EOFILE;
- else if (read_result == FP::EOLINE
- || read_result == FP::COMMENT)
- line_number++;
- }
+
+ while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
+ if (result == FP::EOFILE)
+ return FP::EOFILE;
+ else if (result == FP::EOLINE
+ || result == FP::COMMENT)
+ line_number++;
// Get the supplier id.
- if ((read_result = this->getint (entry.supplier_id_)) != FP::SUCCESS)
- return read_result;
+ result = this->getint (entry.supplier_id_);
+ if (result != FP::SUCCESS)
+ return result;
// Get the payload type.
- if ((read_result = this->getint (entry.type_)) != FP::SUCCESS)
- return read_result;
+ result = this->getint (entry.type_);
+ if (result != FP::SUCCESS)
+ return result;
// get all the consumers.
entry.total_consumers_ = 0;
- while ((read_result = this->getint (entry.consumers_[entry.total_consumers_]))
- == FP::SUCCESS)
+ while ((result = this->getint
+ (entry.consumers_[entry.total_consumers_])) == FP::SUCCESS)
++entry.total_consumers_; // do nothing (should check against max...)
- if (read_result == FP::EOLINE || read_result == FP::EOFILE)
+ if (result == FP::EOLINE || result == FP::EOFILE)
return FP::SUCCESS;
else
- return read_result;
+ return result;
}
FP_RETURN_TYPE
@@ -53,54 +54,81 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
int &line_number)
{
char buf[BUFSIZ];
- FP_RETURN_TYPE read_result;
- // increment the line count
+ FP_RETURN_TYPE result;
+
+ // Increment the line count.
line_number++;
- // Ignore comments, check for EOF and EOLINE
- // if this succeeds, we have our connection id
- while ((read_result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
- {
- if (read_result == FP::EOFILE)
- return FP::EOFILE;
- else if (read_result == FP::EOLINE
- || read_result == FP::COMMENT)
- line_number++;
- }
+ // Ignore comments, check for EOF and EOLINE if this succeeds, we
+ // have our connection id
+
+ while ((result = this->getint (entry.proxy_id_)) != FP::SUCCESS)
+ if (result == FP::EOFILE)
+ return FP::EOFILE;
+ else if (result == FP::EOLINE
+ || result == FP::COMMENT)
+ line_number++;
- // get the hostname
- if ((read_result = this->getword (entry.host_)) != FP::SUCCESS)
- return read_result;
+ // Get the hostname.
+ result = this->getword (entry.host_);
+ if (result != FP::SUCCESS)
+ return result;
ACE_INT32 port;
// Get the port number.
- if ((read_result = this->getint (port)) != FP::SUCCESS)
- return read_result;
- else
- entry.remote_port_ = (u_short) port;
+ result = this->getint (port);
+ if (result == FP::DEFAULT)
+ {
+ // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier).
+ result = this->getword (buf);
+ if (result != FP::SUCCESS)
+ return result;
+ else
+ entry.proxy_role_ = buf[0];
- // Get the proxy role.
- if ((read_result = this->getword (buf)) != FP::SUCCESS)
- return read_result;
+ if (entry.proxy_role_ == 'C')
+ entry.remote_port_ = Options::instance ()->consumer_connector_port ();
+ else (entry.proxy_role_ == 'S')
+ entry.remote_port_ = Options::instance ()->supplier_connector_port ();
+ else
+ // Yikes, this is a *weird* error!
+ entry.remote_port_ = 0;
+ }
+ else if (result != FP::SUCCESS)
+ return result;
else
- entry.proxy_role_ = buf[0];
+ {
+ entry.remote_port_ = u_short (port);
+
+ // Get the proxy role, i.e., 'C' (Consumer) or 'S' (Supplier).
+ result = this->getword (buf);
+ if (result != FP::SUCCESS)
+ return result;
+ else
+ entry.proxy_role_ = buf[0];
+ }
// Get the max retry delay.
- if ((read_result = this->getint (entry.max_retry_timeout_)) != FP::SUCCESS)
- return read_result;
+ result = this->getint (entry.max_retry_timeout_);
+ if (result == FP::DEFAULT)
+ entry.max_retry_timeout_ = Options::instance ()->max_timeout ();
+ else if (result != FP::SUCCESS)
+ return result;
// Get the local port number.
- if ((read_result = this->getint (port)) != FP::SUCCESS)
- return read_result;
+ result = this->getint (port);
+ if (result != FP::SUCCESS)
+ return result;
else
- entry.local_port_ = (u_short) port;
+ entry.local_port_ = u_short (port);
ACE_INT32 priority;
- // Get the priority
- if ((read_result = this->getint (priority)) != FP::SUCCESS)
- return read_result;
+ // Get the priority.
+ result = this->getint (priority);
+ if (result != FP::SUCCESS)
+ return result;
else
entry.priority_ = priority;
@@ -110,11 +138,12 @@ Proxy_Config_File_Parser::read_entry (Proxy_Config_Info &entry,
#if defined (DEBUGGING)
int main (int argc, char *argv[])
{
- if (argc != 4) {
-// ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1);
- cerr << argv[0] << " CCfilename filename Mapfilename.\n";
- exit (1);
- }
+ if (argc != 4)
+ {
+ // ACE_ERROR_RETURN ((LM_ERROR, "%s filename\n", argv[0]), -1);
+ cerr << argv[0] << " CCfilename filename Mapfilename.\n";
+ exit (1);
+ }
FP_RETURN_TYPE result;
Proxy_Config_Info entry;
Proxy_Config_File_Parser CCfile;
diff --git a/apps/Gateway/Gateway/Config_Files.h b/apps/Gateway/Gateway/Config_Files.h
index fcbe2c5cbb0..2f1f1280bc2 100644
--- a/apps/Gateway/Gateway/Config_Files.h
+++ b/apps/Gateway/Gateway/Config_Files.h
@@ -60,8 +60,10 @@ class Proxy_Config_File_Parser : public File_Parser<Proxy_Config_Info>
// Parser for the Proxy_Handler Connection file.
{
public:
- virtual FP::Return_Type
- read_entry (Proxy_Config_Info &entry, int &line_number);
+ virtual FP::Return_Type read_entry (Proxy_Config_Info &entry,
+ int &line_number);
+ // Read in a <Proxy_Config_Info> entry.
+
};
class Consumer_Config_Info
@@ -69,8 +71,10 @@ class Consumer_Config_Info
// Stores the information in a Consumer Map entry.
{
public:
- enum {
- MAX_CONSUMERS = 1000 // Total number of multicast consumers.
+ enum
+ {
+ MAX_CONSUMERS = 1000
+ // Total number of multicast consumers.
};
ACE_INT32 proxy_id_;
@@ -94,8 +98,9 @@ class Consumer_Config_File_Parser : public File_Parser<Consumer_Config_Info>
// Parser for the Consumer Map file.
{
public:
- virtual FP::Return_Type
- read_entry (Consumer_Config_Info &entry, int &line_number);
+ virtual FP::Return_Type read_entry (Consumer_Config_Info &entry,
+ int &line_number);
+ // Read in a <Consumer_Config_Info> entry.
};
#endif /* _CONFIG_FILES */
diff --git a/apps/Gateway/Gateway/File_Parser.cpp b/apps/Gateway/Gateway/File_Parser.cpp
index 4d7c004e94c..bfde7a6911e 100644
--- a/apps/Gateway/Gateway/File_Parser.cpp
+++ b/apps/Gateway/Gateway/File_Parser.cpp
@@ -36,24 +36,32 @@ File_Parser<ENTRY>::getword (char buf[])
// Get the next string from the file via this->readword()
// Check make sure the string forms a valid number.
+
template <class ENTRY> FP_RETURN_TYPE
File_Parser<ENTRY>::getint (ACE_INT32 &value)
{
char buf[BUFSIZ];
- FP_RETURN_TYPE read_result = this->readword(buf);
+ FP_RETURN_TYPE read_result = this->readword (buf);
+
if (read_result == FP::SUCCESS)
{
- // ptr is used for error checking with ACE_OS::strtol
- char *ptr;
-
- // try to convert the buf to a decimal number
- value = ACE_OS::strtol (buf, &ptr, 10);
-
- // check if the buf is a decimal or not
- if (value == 0 && ptr == buf)
- return FP::PARSE_ERROR;
+ // Check to see if this is the "use the default value" symbol?
+ if (buf[0] == '*')
+ return FP::DEFAULT;
else
- return FP::SUCCESS;
+ {
+ // ptr is used for error checking with ACE_OS::strtol.
+ char *ptr;
+
+ // try to convert the buf to a decimal number
+ value = ACE_OS::strtol (buf, &ptr, 10);
+
+ // check if the buf is a decimal or not
+ if (value == 0 && ptr == buf)
+ return FP::PARSE_ERROR;
+ else
+ return FP::SUCCESS;
+ }
}
else
return read_result;
diff --git a/apps/Gateway/Gateway/File_Parser.h b/apps/Gateway/Gateway/File_Parser.h
index bb8da87c424..64b4d49db59 100644
--- a/apps/Gateway/Gateway/File_Parser.h
+++ b/apps/Gateway/Gateway/File_Parser.h
@@ -30,6 +30,7 @@ public:
EOFILE,
SUCCESS,
COMMENT,
+ DEFAULT,
PARSE_ERROR
};
};
@@ -45,8 +46,10 @@ public:
int open (const char filename[]);
int close (void);
- virtual FP::Return_Type read_entry (ENTRY &, int &line_number) = 0;
- // Implementations use protected methods to fill in the entry.
+ virtual FP::Return_Type read_entry (ENTRY &entry,
+ int &line_number) = 0;
+ // Pure virtual hook that subclasses override and use the protected
+ // methods to fill in the <entry>.
protected:
FP::Return_Type getword (char buf[]);
diff --git a/apps/Gateway/Gateway/Options.cpp b/apps/Gateway/Gateway/Options.cpp
index f96fc44436d..d4ee425caf3 100644
--- a/apps/Gateway/Gateway/Options.cpp
+++ b/apps/Gateway/Gateway/Options.cpp
@@ -27,9 +27,9 @@ Options::Options (void)
threading_strategy_ (REACTIVE),
options_ (0),
supplier_acceptor_port_ (DEFAULT_PEER_SUPPLIER_PORT),
- consumer_acceptor_port_ (DEFAULT_PEER_CONSUMER_PORT),
+ consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT),
supplier_connector_port_ (DEFAULT_GATEWAY_SUPPLIER_PORT),
- consumer_connector_port_ (DEFAULT_GATEWAY_CONSUMER_PORT)
+ max_timeout_ (MAX_TIMEOUT)
{
ACE_OS::strcpy (this->proxy_config_file_, "proxy_config");
ACE_OS::strcpy (this->consumer_config_file_, "consumer_config");
@@ -64,7 +64,13 @@ Options::performance_window (void) const
return this->performance_window_;
}
-int
+long
+Options::max_timeout (void) const
+{
+ return this->max_timeout_;
+}
+
+int
Options::blocking_semantics (void) const
{
return this->blocking_semantics_;
@@ -124,7 +130,10 @@ int
Options::parse_args (int argc, char *argv[])
{
// Assign defaults.
- ACE_Get_Opt get_opt (argc, argv, "a:bC:c:dP:p:q:t:vw:", 0);
+ ACE_Get_Opt get_opt (argc,
+ argv,
+ "a:bC:c:dP:p:q:r:t:vw:",
+ 0);
for (int c; (c = get_opt ()) != EOF; )
{
diff --git a/apps/Gateway/Gateway/Options.h b/apps/Gateway/Gateway/Options.h
index 4adf19dfaa3..9b83e5ba7a3 100644
--- a/apps/Gateway/Gateway/Options.h
+++ b/apps/Gateway/Gateway/Options.h
@@ -39,6 +39,9 @@ public:
CONSUMER_ACCEPTOR = 010,
SUPPLIER_CONNECTOR = 020,
CONSUMER_CONNECTOR = 040
+
+ DEFAULT_TIMEOUT = 32
+ // The maximum timeout for trying to re-establish connections.
};
static Options *instance (void);
@@ -105,6 +108,9 @@ public:
const char *consumer_config_file (void) const;
// Name of the consumer map configuration file.
+ long max_timeout (void) const;
+ // The maximum retry timeout delay.
+
private:
Options (void);
// Initialization.
@@ -152,6 +158,9 @@ private:
// The connector port number, i.e., the one that we use to actively
// establish connections with a gatewayd and create a Consumer.
+ long max_timeout_;
+ // The maximum retry timeout delay.
+
char proxy_config_file_[MAXPATHLEN + 1];
// Name of the connection configuration file.
diff --git a/apps/Gateway/Gateway/consumer_config b/apps/Gateway/Gateway/consumer_config
index 5f05972e5aa..7d20a50a579 100644
--- a/apps/Gateway/Gateway/consumer_config
+++ b/apps/Gateway/Gateway/consumer_config
@@ -1,11 +1,12 @@
-# Configuration file for specifying which Consumers will receive
-# events from which Suppliers. For now, the Gateway only allows
-# Consumers to "subscribe" to receive events from particular
-# Suppliers. A more flexible implementation will allow Consumers to
-# subscribe to particular types of events, as well.
+# Configuration file that the gatewayd process uses to determine which
+# Consumers will receive events from which Suppliers. For now, the
+# Gateway only allows Consumers to "subscribe" to receive events from
+# particular Suppliers. A more flexible implementation will allow
+# Consumers to subscribe to particular types of events, as well.
#
-# Here's an explanation of the fields in this file, and how they
-# relate to fields in the "proxy_config" file.
+# The following provides an explanation for the fields in this file,
+# and how they relate to fields in the corresponding "proxy_config"
+# file.
#
# 1. Proxy ID -- Each Proxy is given a unique ID that is used
# in the "consumer_config" file to specify to which Consumers
diff --git a/apps/Gateway/Gateway/proxy_config b/apps/Gateway/Gateway/proxy_config
index f68cb556353..cdc7d07ffa3 100644
--- a/apps/Gateway/Gateway/proxy_config
+++ b/apps/Gateway/Gateway/proxy_config
@@ -1,8 +1,9 @@
-# Configuration file for specifying connection information about
-# proxies.
+# Configuration file that the gatewayd process uses to determine
+# connection information about proxies.
#
-# Here's an explanation of the fields in this file, and how they
-# relate to fields in the "consumer_config" file.
+# The following provides an explanation for the fields in this file,
+# and how they relate to fields in the corresponding "consumer_config"
+# file.
#
# 1. Proxy ID -- Each Proxy is given a unique ID that is used
# in the "consumer_config" file to specify to which Consumers
@@ -16,12 +17,18 @@
#
# 3. Remote Port -- The port number where the remote
# Supplier/Consumer peerd process is listening on.
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
#
# 4. Proxy Role -- i.e., Consumer ('C') or Supplier ('S')
#
# 5. Max Retry Timeout -- The maximum amount of time that we'll
# wait between retry attempts (these start at 1 second and
# double until they reach the Max Retry Timeout).
+# If this is a '*' character it is an indication to the
+# Gateway to use the "default value," e.g., which can be provided
+# on the command-line, etc.
#
# 6. Local Port -- The port number that we want to use for
# our local Proxy connection. If this is the value 0, then
diff --git a/tests/Reader_Writer_Test.cpp b/tests/Reader_Writer_Test.cpp
index b9e58cf8bfe..4b85fdabf5a 100644
--- a/tests/Reader_Writer_Test.cpp
+++ b/tests/Reader_Writer_Test.cpp
@@ -42,19 +42,19 @@ static size_t n_writers = 4;
static ACE_thread_t shared_data;
// Lock for shared_data.
-static ACE_RW_Mutex rw_mutex;
+static ACE_RW_Thread_Mutex rw_mutex;
// Count of the number of readers and writers.
-static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers;
-static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_writers;
+static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_readers = 0;
+static ACE_Atomic_Op<ACE_Thread_Mutex, int> current_writers = 0;
// Explain usage and exit.
static void
print_usage_and_die (void)
{
- ACE_DEBUG ((LM_DEBUG,
- "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n"));
- ACE_OS::exit (1);
+ ACE_ERROR ((LM_ERROR,
+ "usage: %n [-r n_readers] [-w n_writers] [-n iteration_count]\n%a",
+ 1));
}
static void
@@ -88,21 +88,23 @@ parse_args (int argc, char *argv[])
static void *
reader (void *)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) reader starting\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) reader starting\n"));
// We use a random pause, around 2msec with 1msec jittering.
- int usecs = 1000 + ACE_OS::rand() % 2000;
- ACE_Time_Value pause(0, usecs);
+ int usecs = 1000 + ACE_OS::rand () % 2000;
+ ACE_Time_Value pause (0, usecs);
- for (size_t iterations = 1; iterations <= n_iterations; iterations++)
+ for (size_t iterations = 1;
+ iterations <= n_iterations;
+ iterations++)
{
- ACE_OS::sleep(pause);
+ ACE_OS::sleep (pause);
ACE_Read_Guard<ACE_RW_Mutex> g (rw_mutex);
- // int n = ++current_readers;
- // ACE_DEBUG ((LM_DEBUG, " (%t) I'm reader number %d\n", n));
if (current_writers > 0)
- ACE_DEBUG ((LM_DEBUG, " (%t) writers found!!!\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) writers found!!!\n"));
ACE_thread_t data = shared_data;
@@ -110,73 +112,106 @@ reader (void *)
{
ACE_Thread::yield ();
- if (!ACE_OS::thr_equal (shared_data, data))
+ if (ACE_OS::thr_equal (shared_data, data) == 0)
ACE_DEBUG ((LM_DEBUG,
- " (%t) somebody changed %d to %d\n",
- data, shared_data));
+ "(%t) somebody changed %d to %d\n",
+ data,
+ shared_data));
+ }
+
+ if (rw_mutex.tryacquire_write_upgrade () == 0)
+ {
+ current_writers++;
+
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) upgraded to write lock!\n"));
+
+ ACE_thread_t self = ACE_Thread::self ();
+
+ shared_data = self;
+
+ for (size_t loop = 1; loop <= n_loops; loop++)
+ {
+ if (ACE_OS::thr_equal (shared_data, data) == 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) somebody changed %d to %d\n",
+ data,
+ shared_data));
+ }
+
+ --current_writers;
}
--current_readers;
- //ACE_DEBUG ((LM_DEBUG, " (%t) done with reading guarded data\n"));
- ACE_DEBUG((LM_DEBUG, " (%t) read %d done at %T\n", iterations));
- // ACE_Thread::yield ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) read %d done at %T\n",
+ iterations));
}
return 0;
}
-// Iterate <n_iterations> each time modifying the global data
-// and checking that nobody steps on it while we can write it.
+// Iterate <n_iterations> each time modifying the global data and
+// checking that nobody steps on it while we can write it.
static void *
writer (void *)
{
- ACE_DEBUG ((LM_DEBUG, " (%t) writer starting\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) writer starting\n"));
// We use a random pause, around 2msec with 1msec jittering.
- int usecs = 1000 + ACE_OS::rand() % 2000;
- ACE_Time_Value pause(0, usecs);
+ int usecs = 1000 + ACE_OS::rand () % 2000;
+ ACE_Time_Value pause (0, usecs);
- for (size_t iterations = 1; iterations <= n_iterations; iterations++)
+ for (size_t iterations = 1;
+ iterations <= n_iterations;
+ iterations++)
{
- ACE_OS::sleep(pause);
-
- ACE_Write_Guard<ACE_RW_Mutex> g (rw_mutex);
+ ACE_OS::sleep (pause);
- ++current_writers;
- //ACE_DEBUG ((LM_DEBUG, " (%t) writing to guarded data\n"));
+ {
+ // Add an additional scope here to bound the duration that the
+ // write lock is held.
+ ACE_Write_Guard<ACE_RW_Mutex> g (rw_mutex);
- if (current_writers > 1)
- ACE_DEBUG ((LM_DEBUG, " (%t) other writers found!!!\n"));
+ ++current_writers;
- if (current_readers > 0)
- ACE_DEBUG ((LM_DEBUG, " (%t) readers found!!!\n"));
+ if (current_writers > 1)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) other writers found!!!\n"));
- ACE_thread_t self = ACE_Thread::self ();
+ if (current_readers > 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) readers found!!!\n"));
- shared_data = self;
+ ACE_thread_t self = ACE_Thread::self ();
- for (size_t loop = 1; loop <= n_loops; loop++)
- {
- ACE_Thread::yield ();
+ shared_data = self;
- if (!ACE_OS::thr_equal (shared_data, self))
- ACE_DEBUG ((LM_DEBUG,
- " (%t) somebody wrote on my data %d\n",
- shared_data));
- }
+ for (size_t loop = 1;
+ loop <= n_loops;
+ loop++)
+ {
+ ACE_Thread::yield ();
- --current_writers;
+ if (ACE_OS::thr_equal (shared_data, self) == 0)
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) somebody wrote on my data %d\n",
+ shared_data));
+ }
- //ACE_DEBUG ((LM_DEBUG, " (%t) done with guarded data\n"));
+ --current_writers;
+ }
- ACE_DEBUG((LM_DEBUG, " (%t) write %d done at %T\n", iterations));
- // ACE_Thread::yield ();
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) write %d done at %T\n",
+ iterations));
}
+
return 0;
}
-
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Atomic_Op<ACE_Thread_Mutex, int>;
template class ACE_Read_Guard<ACE_RW_Mutex>;
@@ -200,29 +235,29 @@ int main (int argc, char *argv[])
#if defined (ACE_HAS_THREADS)
parse_args (argc, argv);
- current_readers = 0; // Possibly already done
- current_writers = 0; // Possibly already done
-
- ACE_DEBUG ((LM_DEBUG, " (%t) main thread starting\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) main thread starting\n"));
if (ACE_Thread_Manager::instance ()->spawn_n (n_readers,
- ACE_THR_FUNC (reader),
- 0,
- THR_NEW_LWP) == -1)
+ ACE_THR_FUNC (reader),
+ 0,
+ THR_NEW_LWP) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1);
else if (ACE_Thread_Manager::instance ()->spawn_n (n_writers,
- ACE_THR_FUNC (writer),
- 0,
- THR_NEW_LWP) == -1)
+ ACE_THR_FUNC (writer),
+ 0,
+ THR_NEW_LWP) == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn_n"), 1);
ACE_Thread_Manager::instance ()->wait ();
- ACE_DEBUG ((LM_DEBUG, " (%t) exiting main thread\n"));
+ ACE_DEBUG ((LM_DEBUG,
+ "(%t) exiting main thread\n"));
#else
ACE_UNUSED_ARG (argc);
ACE_UNUSED_ARG (argv);
- ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
+ ACE_ERROR ((LM_ERROR,
+ "threads not supported on this platform\n"));
#endif /* ACE_HAS_THREADS */
ACE_END_TEST;
return 0;