summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/IList.h10
-rw-r--r--cpp/src/qpid/ISList.h10
-rw-r--r--cpp/src/qpid/Options.cpp50
-rw-r--r--cpp/src/qpid/amqp_0_10/Holder.h4
-rw-r--r--cpp/src/qpid/broker/Daemon.cpp129
-rw-r--r--cpp/src/qpid/broker/Message.h6
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp8
-rw-r--r--cpp/src/qpid/log/Logger.cpp2
8 files changed, 167 insertions, 52 deletions
diff --git a/cpp/src/qpid/IList.h b/cpp/src/qpid/IList.h
index f5c78ced68..6a5299862c 100644
--- a/cpp/src/qpid/IList.h
+++ b/cpp/src/qpid/IList.h
@@ -38,6 +38,8 @@ template <class Pointer> class IListNode {
typedef Pointer pointer;
typedef typename Pointee<Pointer>::type NodeType;
typedef typename pointer_to_other<Pointer, const NodeType>::type const_pointer;
+
+ pointer prev, next;
protected:
IListNode() : prev() {}
@@ -49,7 +51,6 @@ template <class Pointer> class IListNode {
const_pointer getPrev() const { return prev; }
private:
- pointer prev, next;
friend class IList<NodeType>;
};
@@ -168,10 +169,14 @@ template<class Node> class IList {
template <class U> Iterator(
const Iterator<U>& i,
typename boost::enable_if_convertible<U*, T*>::type* = 0
- ) : ptr(i.ptr) {}
+ ) : ptr(i.ptr) {}
operator pointer() { return ptr; }
operator const_pointer() const { return ptr; }
+
+
+ pointer ptr;
+
private:
friend class boost::iterator_core_access;
@@ -183,7 +188,6 @@ template<class Node> class IList {
void decrement() { ptr = ptr->prev; }
bool equal(const Iterator& x) const { return ptr == x.ptr; }
- pointer ptr;
friend class IList<Node>;
};
diff --git a/cpp/src/qpid/ISList.h b/cpp/src/qpid/ISList.h
index 96ba3ec726..b0004c9561 100644
--- a/cpp/src/qpid/ISList.h
+++ b/cpp/src/qpid/ISList.h
@@ -49,13 +49,15 @@ template <class Pointer> class ISListNode {
typedef Pointer pointer;
typedef typename Pointee<Pointer>::type NodeType;
typedef typename pointer_to_other<Pointer, const NodeType>::type const_pointer;
+
+ pointer getNext() { return next; }
+ pointer * getNextPtr() { return & next; }
+ const_pointer getNext() const { return next; }
protected:
ISListNode() : next() {}
ISListNode(const ISListNode&) {} // Don't copy the next pointer.
- pointer getNext() { return next; }
- const_pointer getNext() const { return next; }
private:
pointer next;
@@ -151,6 +153,7 @@ template <class Node> class ISList : private boost::noncopyable {
operator pointer() { return *pptr; }
operator const_pointer() const { return *pptr; }
+ pointer* pptr;
private:
friend class boost::iterator_core_access;
@@ -158,10 +161,9 @@ template <class Node> class ISList : private boost::noncopyable {
Iterator(const pointer* pp) : pptr(const_cast<pointer*>(pp)) {};
T& dereference() const { return **pptr; }
- void increment() { pptr = &(**pptr).next; }
+ void increment() { pptr = (**pptr).getNextPtr(); }
bool equal(const Iterator& x) const { return pptr == x.pptr; }
- pointer* pptr;
friend class ISList<Node>;
};
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index a5d3b54dd6..1628fea0df 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -50,11 +50,49 @@ struct EnvOptMapper {
static const std::string prefix("QPID_");
if (envVar.substr(0, prefix.size()) == prefix) {
string env = envVar.substr(prefix.size());
+#if (BOOST_VERSION >= 103300)
typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
OptDescs::const_iterator i =
find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1));
if (i != opts.options().end())
return (*i)->long_name();
+#else
+ /*===================================================================
+ For Boost version 103200 and below.
+
+ In Boost version 103200, the options_description::options member,
+ used above, is private. So what I will do here is use the
+ count() funtion, which returns a 1 or 0 indicating presence or
+ absence of the environment variable.
+
+ If it is present, I will return its name. Env vars do not have
+ short and long forms, so the name is synonymous with the long
+ name. (This would not work for command line args.)
+ And if it's absent -- an empty string.
+ =====================================================================*/
+
+
+ /*------------------------------------------------------------
+ The env vars come in unaltered, i.e. QPID_FOO, but the
+ options are stored normalized as "qpid-foo". Change the
+ local variable "env" so it can be found by "opts".
+ ------------------------------------------------------------*/
+ for (std::string::iterator i = env.begin(); i != env.end(); ++i)
+ {
+ *i = (*i == '_')
+ ? '-'
+ : ::tolower(*i);
+ }
+
+ if ( opts.count(env.c_str()) > 0 )
+ {
+ return env.c_str();
+ }
+ else
+ {
+ return string();
+ }
+#endif
}
return string();
}
@@ -64,11 +102,19 @@ struct EnvOptMapper {
if (pos == string::npos)
return string();
string key = line.substr (0, pos);
+#if (BOOST_VERSION >= 103300)
typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
OptDescs::const_iterator i =
find_if(opts.options().begin(), opts.options().end(), boost::bind(matchCase, key, _1));
if (i != opts.options().end())
return string (line) + "\n";
+#else
+ try {
+ po::option_description desc = opts.find(key.c_str());
+ return string (line) + "\n";
+ }
+ catch (const std::exception& e) {}
+#endif
return string ();
}
@@ -91,6 +137,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a
parsing="command line options";
if (argc > 0 && argv != 0) {
if (allowUnknown) {
+#if (BOOST_VERSION >= 103300)
// This hideous workaround is required because boost 1.33 has a bug
// that causes 'allow_unregistered' to not work.
po::command_line_parser clp = po::command_line_parser(argc, const_cast<char**>(argv)).
@@ -103,6 +150,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a
if (!i->unregistered)
filtopts.options.push_back (*i);
po::store(filtopts, vm);
+#endif
}
else
po::store(po::parse_command_line(argc, const_cast<char**>(argv), *this), vm);
@@ -141,8 +189,10 @@ void Options::parse(int argc, char** argv, const std::string& configFile, bool a
catch (const std::exception& e) {
ostringstream msg;
msg << "Error in " << parsing << ": " << e.what() << endl;
+#if (BOOST_VERSION >= 103300)
if (find_nothrow("help", false))
msg << "Use --help to see valid options" << endl;
+#endif
throw Exception(msg.str());
}
}
diff --git a/cpp/src/qpid/amqp_0_10/Holder.h b/cpp/src/qpid/amqp_0_10/Holder.h
index 3c734d967f..8712db6c86 100644
--- a/cpp/src/qpid/amqp_0_10/Holder.h
+++ b/cpp/src/qpid/amqp_0_10/Holder.h
@@ -70,7 +70,7 @@ class Holder : public framing::Blob<Size, BaseHeld> {
template <class S> void serialize(S& s) {
s.split(*this);
- apply(s, *this->get());
+ qpid::amqp_0_10::apply(s, *this->get());
}
template <class T> T* getIf() {
@@ -92,7 +92,7 @@ class Holder : public framing::Blob<Size, BaseHeld> {
template <class D, class B, size_t S>
Holder<D,B,S>& Holder<D,B,S>::operator=(const B& rhs) {
Assign assign(*this);
- apply(assign, rhs);
+ qpid::amqp_0_10::apply(assign, rhs);
return *this;
}
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp
index 3fcc487324..6fd1c3a292 100644
--- a/cpp/src/qpid/broker/Daemon.cpp
+++ b/cpp/src/qpid/broker/Daemon.cpp
@@ -19,9 +19,6 @@
#include "qpid/log/Statement.h"
#include "qpid/Exception.h"
-#include <boost/iostreams/stream.hpp>
-#include <boost/iostreams/device/file_descriptor.hpp>
-
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
@@ -33,7 +30,6 @@ namespace qpid {
namespace broker {
using namespace std;
-typedef boost::iostreams::stream<boost::iostreams::file_descriptor> fdstream;
namespace {
/** Throw an exception containing msg and strerror if throwIf is true.
@@ -45,7 +41,11 @@ void throwIf(bool condition, const string& msg, int errNo=errno) {
}
-struct LockFile : public fdstream {
+/*--------------------------------------------------
+ Rewritten using low-level IO, for compatibility
+ with earlier Boost versions, i.e. 103200.
+--------------------------------------------------*/
+struct LockFile {
LockFile(const std::string& path_, bool create)
: path(path_), fd(-1), created(create)
@@ -55,13 +55,12 @@ struct LockFile : public fdstream {
fd = ::open(path.c_str(), flags, 0644);
throwIf(fd < 0,"Cannot open "+path);
throwIf(::lockf(fd, F_TLOCK, 0) < 0, "Cannot lock "+path);
- open(boost::iostreams::file_descriptor(fd));
}
~LockFile() {
if (fd >= 0) {
::lockf(fd, F_ULOCK, 0);
- close();
+ ::close(fd);
}
}
@@ -87,9 +86,13 @@ string Daemon::pidFile(uint16_t port) {
return path.str();
}
+/*--------------------------------------------------
+ Rewritten using low-level IO, for compatibility
+ with earlier Boost versions, i.e. 103200.
+--------------------------------------------------*/
void Daemon::fork()
{
- throwIf(pipe(pipeFds) < 0, "Can't create pipe");
+ throwIf(::pipe(pipeFds) < 0, "Can't create pipe");
throwIf((pid = ::fork()) < 0, "Daemon fork failed");
if (pid == 0) { // Child
try {
@@ -115,9 +118,12 @@ void Daemon::fork()
}
catch (const exception& e) {
QPID_LOG(critical, "Daemon startup failed: " << e.what());
- fdstream pipe(pipeFds[1]);
- assert(pipe.is_open());
- pipe << "0 " << e.what() << endl;
+ stringstream pipeFailureMessage;
+ pipeFailureMessage << "0 " << e.what() << endl;
+ write ( pipeFds[1],
+ pipeFailureMessage.str().c_str(),
+ strlen(pipeFailureMessage.str().c_str())
+ );
}
}
else { // Parent
@@ -137,50 +143,101 @@ uint16_t Daemon::wait(int timeout) { // parent waits for child.
tv.tv_sec = timeout;
tv.tv_usec = 0;
+ /*--------------------------------------------------
+ Rewritten using low-level IO, for compatibility
+ with earlier Boost versions, i.e. 103200.
+ --------------------------------------------------*/
fd_set fds;
FD_ZERO(&fds);
FD_SET(pipeFds[0], &fds);
int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
throwIf(n==0, "Timed out waiting for daemon");
throwIf(n<0, "Error waiting for daemon");
- fdstream pipe(pipeFds[0]);
- pipe.exceptions(ios::failbit|ios::badbit|ios::eofbit);
uint16_t port = 0;
- try {
- pipe >> port;
- if (port == 0) {
- string errmsg;
- pipe >> skipws;
- getline(pipe, errmsg);
- throw Exception("Daemon startup failed"+
- (errmsg.empty() ? string(".") : ": " + errmsg));
- }
- }
- catch (const fdstream::failure& e) {
- throw Exception(string("Failed to read daemon port: ")+e.what());
+ /*
+ * Read the child's port number from the pipe.
+ */
+ int desired_read = sizeof(uint16_t);
+ if ( desired_read > ::read(pipeFds[0], & port, desired_read) ) {
+ throw Exception("Cannot write lock file "+lockFile);
}
+
+ /*
+ * If the port number is 0, the child has put an error message
+ * on the pipe. Get it and throw it.
+ */
+ if ( 0 == port ) {
+ // Skip whitespace
+ char c = ' ';
+ while ( isspace(c) ) {
+ if ( 1 > ::read(pipeFds[0], &c, 1) )
+ throw Exception("Child port == 0, and no error message on pipe.");
+ }
+
+ // Get Message
+ string errmsg;
+ while ( 1 ) {
+ if ( 1 > ::read(pipeFds[0], &c, 1) )
+ throw Exception("Daemon startup failed"+
+ (errmsg.empty() ? string(".") : ": " + errmsg));
+ }
+ }
+
return port;
}
+
+/*
+ * When the child is ready, it writes its pid to the
+ * lockfile and its port number on the pipe back to
+ * its parent process. This indicates that the
+ * child has successfully daemonized. When the parent
+ * hears the good news, it ill exit.
+ */
void Daemon::ready(uint16_t port) { // child
lockFile = pidFile(port);
LockFile lf(lockFile, true);
- lf << getpid() << endl;
- if (lf.fail())
- throw Exception("Cannot write lock file "+lockFile);
- fdstream pipe(pipeFds[1]);
- QPID_LOG(debug, "Daemon ready on port: " << port);
- pipe << port << endl;
- throwIf(!pipe.good(), "Error writing to parent");
+
+ /*---------------------------------------------------
+ Rewritten using low-level IO, for compatibility
+ with earlier Boost versions, i.e. 103200.
+ ---------------------------------------------------*/
+ /*
+ * Write the PID to the lockfile.
+ */
+ pid_t pid = getpid();
+ int desired_write = sizeof(pid_t);
+ if ( desired_write > ::write(lf.fd, & pid, desired_write) ) {
+ throw Exception("Cannot write lock file "+lockFile);
+ }
+
+ /*
+ * Write the port number to the parent.
+ */
+ desired_write = sizeof(uint16_t);
+ if ( desired_write > ::write(pipeFds[1], & port, desired_write) ) {
+ throw Exception("Error writing to parent." );
+ }
+
+ QPID_LOG(debug, "Daemon ready on port: " << port);
}
+/*
+ * The parent process reads the child's pid
+ * from the lockfile.
+ */
pid_t Daemon::getPid(uint16_t port) {
string name = pidFile(port);
- LockFile lockFile(name, false);
+ LockFile lf(name, false);
pid_t pid;
- lockFile >> pid;
- if (lockFile.fail())
- throw Exception("Cannot read lock file "+name);
+ /*---------------------------------------------------
+ Rewritten using low-level IO, for compatibility
+ with earlier Boost versions, i.e. 103200.
+ ---------------------------------------------------*/
+ int desired_read = sizeof(pid_t);
+ if ( desired_read > ::read(lf.fd, & pid, desired_read) ) {
+ throw Exception("Cannot read lock file " + name);
+ }
if (kill(pid, 0) < 0 && errno != EPERM) {
unlink(name.c_str());
throw Exception("Removing stale lock file "+name);
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 561cdede59..4fd2f1401d 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -75,11 +75,13 @@ public:
const framing::FrameSet& getFrames() const { return frames; }
template <class T> T* getProperties() {
- return frames.getHeaders()->get<T>(true);
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>(true);
}
template <class T> const T* getProperties() const {
- return frames.getHeaders()->get<T>();
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>(true);
}
template <class T> const T* getMethod() const {
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index f9019b036c..b72bd15803 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -35,8 +35,8 @@ namespace framing {
SessionState::SessionState(uint32_t ack, bool enableReplay, const Uuid& uuid) :
state(ATTACHED),
id(uuid),
- lastReceived(-1),
- lastSent(-1),
+ lastReceived(u_int32_t(-1)),
+ lastSent(u_int32_t(-1)),
ackInterval(ack),
sendAckAt(lastReceived+ackInterval),
solicitAckAt(lastSent+ackInterval),
@@ -47,8 +47,8 @@ SessionState::SessionState(uint32_t ack, bool enableReplay, const Uuid& uuid) :
SessionState::SessionState(const Uuid& uuid) :
state(ATTACHED),
id(uuid),
- lastReceived(-1),
- lastSent(-1),
+ lastReceived(u_int32_t(-1)),
+ lastSent(u_int32_t(-1)),
ackInterval(0),
sendAckAt(0),
solicitAckAt(0),
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index c0fc8ac959..f2f86e62fc 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -144,7 +144,7 @@ void Logger::log(const Statement& s, const std::string& msg) {
os << " ";
os << msg << endl;
std::string formatted=os.str();
-
+
{
ScopedLock l(lock);
std::for_each(outputs.begin(), outputs.end(),