diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/IList.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/ISList.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/Options.cpp | 50 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Holder.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Daemon.cpp | 129 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 2 |
8 files changed, 167 insertions, 52 deletions
diff --git a/cpp/src/qpid/IList.h b/cpp/src/qpid/IList.h index f5c78ced68..6a5299862c 100644 --- a/cpp/src/qpid/IList.h +++ b/cpp/src/qpid/IList.h @@ -38,6 +38,8 @@ template <class Pointer> class IListNode { typedef Pointer pointer; typedef typename Pointee<Pointer>::type NodeType; typedef typename pointer_to_other<Pointer, const NodeType>::type const_pointer; + + pointer prev, next; protected: IListNode() : prev() {} @@ -49,7 +51,6 @@ template <class Pointer> class IListNode { const_pointer getPrev() const { return prev; } private: - pointer prev, next; friend class IList<NodeType>; }; @@ -168,10 +169,14 @@ template<class Node> class IList { template <class U> Iterator( const Iterator<U>& i, typename boost::enable_if_convertible<U*, T*>::type* = 0 - ) : ptr(i.ptr) {} + ) : ptr(i.ptr) {} operator pointer() { return ptr; } operator const_pointer() const { return ptr; } + + + pointer ptr; + private: friend class boost::iterator_core_access; @@ -183,7 +188,6 @@ template<class Node> class IList { void decrement() { ptr = ptr->prev; } bool equal(const Iterator& x) const { return ptr == x.ptr; } - pointer ptr; friend class IList<Node>; }; diff --git a/cpp/src/qpid/ISList.h b/cpp/src/qpid/ISList.h index 96ba3ec726..b0004c9561 100644 --- a/cpp/src/qpid/ISList.h +++ b/cpp/src/qpid/ISList.h @@ -49,13 +49,15 @@ template <class Pointer> class ISListNode { typedef Pointer pointer; typedef typename Pointee<Pointer>::type NodeType; typedef typename pointer_to_other<Pointer, const NodeType>::type const_pointer; + + pointer getNext() { return next; } + pointer * getNextPtr() { return & next; } + const_pointer getNext() const { return next; } protected: ISListNode() : next() {} ISListNode(const ISListNode&) {} // Don't copy the next pointer. - pointer getNext() { return next; } - const_pointer getNext() const { return next; } private: pointer next; @@ -151,6 +153,7 @@ template <class Node> class ISList : private boost::noncopyable { operator pointer() { return *pptr; } operator const_pointer() const { return *pptr; } + pointer* pptr; private: friend class boost::iterator_core_access; @@ -158,10 +161,9 @@ template <class Node> class ISList : private boost::noncopyable { Iterator(const pointer* pp) : pptr(const_cast<pointer*>(pp)) {}; T& dereference() const { return **pptr; } - void increment() { pptr = &(**pptr).next; } + void increment() { pptr = (**pptr).getNextPtr(); } bool equal(const Iterator& x) const { return pptr == x.pptr; } - pointer* pptr; friend class ISList<Node>; }; diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index a5d3b54dd6..1628fea0df 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -50,11 +50,49 @@ struct EnvOptMapper { static const std::string prefix("QPID_"); if (envVar.substr(0, prefix.size()) == prefix) { string env = envVar.substr(prefix.size()); +#if (BOOST_VERSION >= 103300) typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs; OptDescs::const_iterator i = find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1)); if (i != opts.options().end()) return (*i)->long_name(); +#else + /*=================================================================== + For Boost version 103200 and below. + + In Boost version 103200, the options_description::options member, + used above, is private. So what I will do here is use the + count() funtion, which returns a 1 or 0 indicating presence or + absence of the environment variable. + + If it is present, I will return its name. Env vars do not have + short and long forms, so the name is synonymous with the long + name. (This would not work for command line args.) + And if it's absent -- an empty string. + =====================================================================*/ + + + /*------------------------------------------------------------ + The env vars come in unaltered, i.e. QPID_FOO, but the + options are stored normalized as "qpid-foo". Change the + local variable "env" so it can be found by "opts". + ------------------------------------------------------------*/ + for (std::string::iterator i = env.begin(); i != env.end(); ++i) + { + *i = (*i == '_') + ? '-' + : ::tolower(*i); + } + + if ( opts.count(env.c_str()) > 0 ) + { + return env.c_str(); + } + else + { + return string(); + } +#endif } return string(); } @@ -64,11 +102,19 @@ struct EnvOptMapper { if (pos == string::npos) return string(); string key = line.substr (0, pos); +#if (BOOST_VERSION >= 103300) typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs; OptDescs::const_iterator i = find_if(opts.options().begin(), opts.options().end(), boost::bind(matchCase, key, _1)); if (i != opts.options().end()) return string (line) + "\n"; +#else + try { + po::option_description desc = opts.find(key.c_str()); + return string (line) + "\n"; + } + catch (const std::exception& e) {} +#endif return string (); } @@ -91,6 +137,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a parsing="command line options"; if (argc > 0 && argv != 0) { if (allowUnknown) { +#if (BOOST_VERSION >= 103300) // This hideous workaround is required because boost 1.33 has a bug // that causes 'allow_unregistered' to not work. po::command_line_parser clp = po::command_line_parser(argc, const_cast<char**>(argv)). @@ -103,6 +150,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a if (!i->unregistered) filtopts.options.push_back (*i); po::store(filtopts, vm); +#endif } else po::store(po::parse_command_line(argc, const_cast<char**>(argv), *this), vm); @@ -141,8 +189,10 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a catch (const std::exception& e) { ostringstream msg; msg << "Error in " << parsing << ": " << e.what() << endl; +#if (BOOST_VERSION >= 103300) if (find_nothrow("help", false)) msg << "Use --help to see valid options" << endl; +#endif throw Exception(msg.str()); } } diff --git a/cpp/src/qpid/amqp_0_10/Holder.h b/cpp/src/qpid/amqp_0_10/Holder.h index 3c734d967f..8712db6c86 100644 --- a/cpp/src/qpid/amqp_0_10/Holder.h +++ b/cpp/src/qpid/amqp_0_10/Holder.h @@ -70,7 +70,7 @@ class Holder : public framing::Blob<Size, BaseHeld> { template <class S> void serialize(S& s) { s.split(*this); - apply(s, *this->get()); + qpid::amqp_0_10::apply(s, *this->get()); } template <class T> T* getIf() { @@ -92,7 +92,7 @@ class Holder : public framing::Blob<Size, BaseHeld> { template <class D, class B, size_t S> Holder<D,B,S>& Holder<D,B,S>::operator=(const B& rhs) { Assign assign(*this); - apply(assign, rhs); + qpid::amqp_0_10::apply(assign, rhs); return *this; } diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp index 3fcc487324..6fd1c3a292 100644 --- a/cpp/src/qpid/broker/Daemon.cpp +++ b/cpp/src/qpid/broker/Daemon.cpp @@ -19,9 +19,6 @@ #include "qpid/log/Statement.h" #include "qpid/Exception.h" -#include <boost/iostreams/stream.hpp> -#include <boost/iostreams/device/file_descriptor.hpp> - #include <errno.h> #include <fcntl.h> #include <signal.h> @@ -33,7 +30,6 @@ namespace qpid { namespace broker { using namespace std; -typedef boost::iostreams::stream<boost::iostreams::file_descriptor> fdstream; namespace { /** Throw an exception containing msg and strerror if throwIf is true. @@ -45,7 +41,11 @@ void throwIf(bool condition, const string& msg, int errNo=errno) { } -struct LockFile : public fdstream { +/*-------------------------------------------------- + Rewritten using low-level IO, for compatibility + with earlier Boost versions, i.e. 103200. +--------------------------------------------------*/ +struct LockFile { LockFile(const std::string& path_, bool create) : path(path_), fd(-1), created(create) @@ -55,13 +55,12 @@ struct LockFile : public fdstream { fd = ::open(path.c_str(), flags, 0644); throwIf(fd < 0,"Cannot open "+path); throwIf(::lockf(fd, F_TLOCK, 0) < 0, "Cannot lock "+path); - open(boost::iostreams::file_descriptor(fd)); } ~LockFile() { if (fd >= 0) { ::lockf(fd, F_ULOCK, 0); - close(); + ::close(fd); } } @@ -87,9 +86,13 @@ string Daemon::pidFile(uint16_t port) { return path.str(); } +/*-------------------------------------------------- + Rewritten using low-level IO, for compatibility + with earlier Boost versions, i.e. 103200. +--------------------------------------------------*/ void Daemon::fork() { - throwIf(pipe(pipeFds) < 0, "Can't create pipe"); + throwIf(::pipe(pipeFds) < 0, "Can't create pipe"); throwIf((pid = ::fork()) < 0, "Daemon fork failed"); if (pid == 0) { // Child try { @@ -115,9 +118,12 @@ void Daemon::fork() } catch (const exception& e) { QPID_LOG(critical, "Daemon startup failed: " << e.what()); - fdstream pipe(pipeFds[1]); - assert(pipe.is_open()); - pipe << "0 " << e.what() << endl; + stringstream pipeFailureMessage; + pipeFailureMessage << "0 " << e.what() << endl; + write ( pipeFds[1], + pipeFailureMessage.str().c_str(), + strlen(pipeFailureMessage.str().c_str()) + ); } } else { // Parent @@ -137,50 +143,101 @@ uint16_t Daemon::wait(int timeout) { // parent waits for child. tv.tv_sec = timeout; tv.tv_usec = 0; + /*-------------------------------------------------- + Rewritten using low-level IO, for compatibility + with earlier Boost versions, i.e. 103200. + --------------------------------------------------*/ fd_set fds; FD_ZERO(&fds); FD_SET(pipeFds[0], &fds); int n=select(FD_SETSIZE, &fds, 0, 0, &tv); throwIf(n==0, "Timed out waiting for daemon"); throwIf(n<0, "Error waiting for daemon"); - fdstream pipe(pipeFds[0]); - pipe.exceptions(ios::failbit|ios::badbit|ios::eofbit); uint16_t port = 0; - try { - pipe >> port; - if (port == 0) { - string errmsg; - pipe >> skipws; - getline(pipe, errmsg); - throw Exception("Daemon startup failed"+ - (errmsg.empty() ? string(".") : ": " + errmsg)); - } - } - catch (const fdstream::failure& e) { - throw Exception(string("Failed to read daemon port: ")+e.what()); + /* + * Read the child's port number from the pipe. + */ + int desired_read = sizeof(uint16_t); + if ( desired_read > ::read(pipeFds[0], & port, desired_read) ) { + throw Exception("Cannot write lock file "+lockFile); } + + /* + * If the port number is 0, the child has put an error message + * on the pipe. Get it and throw it. + */ + if ( 0 == port ) { + // Skip whitespace + char c = ' '; + while ( isspace(c) ) { + if ( 1 > ::read(pipeFds[0], &c, 1) ) + throw Exception("Child port == 0, and no error message on pipe."); + } + + // Get Message + string errmsg; + while ( 1 ) { + if ( 1 > ::read(pipeFds[0], &c, 1) ) + throw Exception("Daemon startup failed"+ + (errmsg.empty() ? string(".") : ": " + errmsg)); + } + } + return port; } + +/* + * When the child is ready, it writes its pid to the + * lockfile and its port number on the pipe back to + * its parent process. This indicates that the + * child has successfully daemonized. When the parent + * hears the good news, it ill exit. + */ void Daemon::ready(uint16_t port) { // child lockFile = pidFile(port); LockFile lf(lockFile, true); - lf << getpid() << endl; - if (lf.fail()) - throw Exception("Cannot write lock file "+lockFile); - fdstream pipe(pipeFds[1]); - QPID_LOG(debug, "Daemon ready on port: " << port); - pipe << port << endl; - throwIf(!pipe.good(), "Error writing to parent"); + + /*--------------------------------------------------- + Rewritten using low-level IO, for compatibility + with earlier Boost versions, i.e. 103200. + ---------------------------------------------------*/ + /* + * Write the PID to the lockfile. + */ + pid_t pid = getpid(); + int desired_write = sizeof(pid_t); + if ( desired_write > ::write(lf.fd, & pid, desired_write) ) { + throw Exception("Cannot write lock file "+lockFile); + } + + /* + * Write the port number to the parent. + */ + desired_write = sizeof(uint16_t); + if ( desired_write > ::write(pipeFds[1], & port, desired_write) ) { + throw Exception("Error writing to parent." ); + } + + QPID_LOG(debug, "Daemon ready on port: " << port); } +/* + * The parent process reads the child's pid + * from the lockfile. + */ pid_t Daemon::getPid(uint16_t port) { string name = pidFile(port); - LockFile lockFile(name, false); + LockFile lf(name, false); pid_t pid; - lockFile >> pid; - if (lockFile.fail()) - throw Exception("Cannot read lock file "+name); + /*--------------------------------------------------- + Rewritten using low-level IO, for compatibility + with earlier Boost versions, i.e. 103200. + ---------------------------------------------------*/ + int desired_read = sizeof(pid_t); + if ( desired_read > ::read(lf.fd, & pid, desired_read) ) { + throw Exception("Cannot read lock file " + name); + } if (kill(pid, 0) < 0 && errno != EPERM) { unlink(name.c_str()); throw Exception("Removing stale lock file "+name); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 561cdede59..4fd2f1401d 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -75,11 +75,13 @@ public: const framing::FrameSet& getFrames() const { return frames; } template <class T> T* getProperties() { - return frames.getHeaders()->get<T>(true); + qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + return p->get<T>(true); } template <class T> const T* getProperties() const { - return frames.getHeaders()->get<T>(); + qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + return p->get<T>(true); } template <class T> const T* getMethod() const { diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp index f9019b036c..b72bd15803 100644 --- a/cpp/src/qpid/framing/SessionState.cpp +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -35,8 +35,8 @@ namespace framing { SessionState::SessionState(uint32_t ack, bool enableReplay, const Uuid& uuid) : state(ATTACHED), id(uuid), - lastReceived(-1), - lastSent(-1), + lastReceived(u_int32_t(-1)), + lastSent(u_int32_t(-1)), ackInterval(ack), sendAckAt(lastReceived+ackInterval), solicitAckAt(lastSent+ackInterval), @@ -47,8 +47,8 @@ SessionState::SessionState(uint32_t ack, bool enableReplay, const Uuid& uuid) : SessionState::SessionState(const Uuid& uuid) : state(ATTACHED), id(uuid), - lastReceived(-1), - lastSent(-1), + lastReceived(u_int32_t(-1)), + lastSent(u_int32_t(-1)), ackInterval(0), sendAckAt(0), solicitAckAt(0), diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index c0fc8ac959..f2f86e62fc 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -144,7 +144,7 @@ void Logger::log(const Statement& s, const std::string& msg) { os << " "; os << msg << endl; std::string formatted=os.str(); - + { ScopedLock l(lock); std::for_each(outputs.begin(), outputs.end(), |