summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/Options.cpp6
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp34
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cpg.h5
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp4
-rw-r--r--cpp/src/qpid/log/Logger.cpp11
-rw-r--r--cpp/src/qpid/sys/Time.h48
7 files changed, 73 insertions, 53 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index 18a3f726f9..7f4536ff7b 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -27,12 +27,13 @@ namespace qpid {
using namespace std;
namespace {
-const std::string prefix("QPID_");
+
char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); }
struct Mapper {
Mapper(const Options& o) : opts(o) {}
string operator()(const string& env) {
+ static const std::string prefix("QPID_");
if (env.substr(0, prefix.size()) == prefix) {
string opt = env.substr(prefix.size());
transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
@@ -58,7 +59,8 @@ void Options::parse(int argc, char** argv, const std::string& configFile)
try {
po::variables_map vm;
parsing="command line options";
- po::store(po::parse_command_line(argc, argv, *this), vm);
+ if (argc > 0 && argv != 0)
+ po::store(po::parse_command_line(argc, argv, *this), vm);
parsing="environment variables";
po::store(po::parse_environment(*this, Mapper(*this)), vm);
po::notify(vm); // configFile may be updated from arg/env options.
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 256378ccd5..b59bfe878d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -115,13 +115,14 @@ Cluster::MemberList Cluster::getMembers() const {
}
void Cluster::deliver(
- cpg_handle_t /*handle*/,
- struct cpg_name* /* group */,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
+ cpg_handle_t /*handle*/,
+ cpg_name* group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
{
+ assert(name == *group);
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
SessionFrame frame;
@@ -149,26 +150,27 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
assert(notifyIn);
- MemberList list;
- {
- Mutex::ScopedLock l(lock);
+ MemberList list;
+ {
+ Mutex::ScopedLock l(lock);
shared_ptr<Member>& member=members[from];
if (!member)
member.reset(new Member(notifyIn->getUrl()));
- else
+ else
member->url = notifyIn->getUrl();
- lock.notifyAll();
+ lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
- }
+ }
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*current*/, int /*nCurrent*/,
- struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined)
+ cpg_name *group,
+ cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined)
{
+ assert(name == *group);
bool newMembers=false;
MemberList updated;
{
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 3148e52789..87e483141e 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -90,25 +90,22 @@ Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
handles.put(handle, &handler);
+ QPID_LOG(debug, "Initialize CPG handle " << handle);
}
Cpg::~Cpg() {
try {
shutdown();
} catch (const std::exception& e) {
- QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
+ QPID_LOG(error, "Exception in Cpg destructor: " << e.what());
}
}
-struct Cpg::ClearHandleOnExit {
- ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
- ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
- cpg_handle_t handle;
-};
-
void Cpg::shutdown() {
+ QPID_LOG(debug, "Shutdown CPG handle " << handle);
if (handles.get(handle)) {
- ClearHandleOnExit guard(handle); // Exception safe
+ QPID_LOG(debug, "Finalize CPG handle " << handle);
+ handles.put(handle, 0);
check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -173,8 +170,11 @@ ostream& operator <<(ostream& out, const Cpg::Id& id) {
return out << ":" << id.pid();
}
+ostream& operator <<(ostream& out, const cpg_name& name) {
+ return out << string(name.value, name.length);
+}
-}} // namespace qpid::cpg
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index d616be74e2..351b65d56a 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -170,9 +170,14 @@ class Cpg : public Dispatchable {
Handler& handler;
};
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+inline bool operator==(const cpg_name& a, const cpg_name& b) {
+ return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
+}
+inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 13f1d3cece..778c9ab505 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -33,8 +33,8 @@ namespace framing {
AMQP_MethodVersionMap AMQFrame::versionMap;
-AMQFrame::AMQFrame(ProtocolVersion _version):
-version(_version)
+AMQFrame::AMQFrame(ProtocolVersion _version)
+ : channel(0), type(0), version(_version)
{
assert(version != ProtocolVersion(0,0));
}
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index ceb3977106..6e8f3a59cc 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -85,7 +85,15 @@ Logger& Logger::instance() {
return boost::details::pool::singleton_default<Logger>::instance();
}
-Logger::Logger() : flags(0) {}
+Logger::Logger() : flags(0) {
+ // Initialize myself from env variables so all programs
+ // (e.g. tests) can use logging even if they don't parse
+ // command line args.
+ Options opts;
+ opts.parse(0, 0);
+ configure(opts,"");
+}
+
Logger::~Logger() {}
void Logger::select(const Selector& s) {
@@ -190,6 +198,7 @@ void Logger::remove(Statement& s) {
void Logger::configure(const Options& opts, const std::string& prog)
{
+ clear();
Options o(opts);
if (o.trace)
o.selectors.push_back("trace+");
diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h
index 25b1606844..cff5b70d8e 100644
--- a/cpp/src/qpid/sys/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -32,41 +32,43 @@ class Duration;
/** Times in nanoseconds */
class AbsTime {
+ static int64_t max() { return std::numeric_limits<int64_t>::max(); }
int64_t time_ns;
-
- friend class Duration;
+
+ friend class Duration;
-public:
- inline AbsTime() {}
- inline AbsTime(const AbsTime& time0, const Duration& duration);
- // Default asignment operation fine
- // Default copy constructor fine
+ public:
+ inline AbsTime() {}
+ inline AbsTime(const AbsTime& time0, const Duration& duration);
+ // Default asignment operation fine
+ // Default copy constructor fine
- static AbsTime now();
- inline static AbsTime FarFuture();
+ static AbsTime now();
+ inline static AbsTime FarFuture();
- friend bool operator<(const AbsTime& a, const AbsTime& b);
- friend bool operator>(const AbsTime& a, const AbsTime& b);
+ friend bool operator<(const AbsTime& a, const AbsTime& b);
+ friend bool operator>(const AbsTime& a, const AbsTime& b);
};
class Duration {
+ static int64_t max() { return std::numeric_limits<int64_t>::max(); }
int64_t nanosecs;
- friend class AbsTime;
+ friend class AbsTime;
-public:
- inline Duration(int64_t time0);
- inline explicit Duration(const AbsTime& time0);
- inline explicit Duration(const AbsTime& start, const AbsTime& finish);
- inline operator int64_t() const;
+ public:
+ inline Duration(int64_t time0);
+ inline explicit Duration(const AbsTime& time0);
+ inline explicit Duration(const AbsTime& start, const AbsTime& finish);
+ inline operator int64_t() const;
};
-AbsTime::AbsTime(const AbsTime& time0, const Duration& duration0) :
- time_ns(time0.time_ns+duration0.nanosecs)
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) :
+ time_ns(d == Duration::max() ? max() : t.time_ns+d.nanosecs)
{}
-AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int64_t>::max(); return ff;}
+AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = max(); return ff;}
inline AbsTime now() { return AbsTime::now(); }
@@ -74,15 +76,15 @@ inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b
inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; }
Duration::Duration(int64_t time0) :
- nanosecs(time0)
+ nanosecs(time0)
{}
Duration::Duration(const AbsTime& time0) :
- nanosecs(time0.time_ns)
+ nanosecs(time0.time_ns)
{}
Duration::Duration(const AbsTime& start, const AbsTime& finish) :
- nanosecs(finish.time_ns - start.time_ns)
+ nanosecs(finish.time_ns - start.time_ns)
{}
Duration::operator int64_t() const