diff options
Diffstat (limited to 'qpid/cpp/src/qmf')
-rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/AgentImpl.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/AgentSession.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSession.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSessionImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/DataAddr.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/DataAddrImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/PrivateImplRef.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ResilientConnection.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SchemaImpl.cpp | 9 |
10 files changed, 41 insertions, 63 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index 684f8e4fba..915f2a1c88 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - sender(session.directSender), schemaCache(s.schemaCache) + sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) { } @@ -102,11 +102,12 @@ const Variant& AgentImpl::getAttribute(const string& k) const ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator(session.correlator()); + uint32_t correlator; ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -150,7 +151,12 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout) uint32_t AgentImpl::queryAsync(const Query& query) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } sendQuery(query, correlator); return correlator; @@ -166,11 +172,12 @@ uint32_t AgentImpl::queryAsync(const string& text) ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator(session.correlator()); + uint32_t correlator; ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -206,7 +213,12 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } sendMethod(method, args, addr, correlator); return correlator; @@ -584,7 +596,12 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const void AgentImpl::sendSchemaRequest(const SchemaId& id) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { Query query(QUERY_SCHEMA, id); diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h index 09754a3a7e..7fa4f4373a 100644 --- a/qpid/cpp/src/qmf/AgentImpl.h +++ b/qpid/cpp/src/qmf/AgentImpl.h @@ -99,6 +99,7 @@ namespace qmf { uint32_t capability; qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; + uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; boost::shared_ptr<SchemaCache> schemaCache; mutable std::set<std::string> packageSet; diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 71d369325f..4c5a72a467 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -72,7 +72,6 @@ namespace qmf { void open(); void close(); bool nextEvent(AgentEvent& e, Duration t); - int pendingEvents() const; void registerSchema(Schema& s); DataAddr addData(Data& d, const string& n, bool persist); @@ -162,7 +161,6 @@ const string& AgentSession::getName() const { return impl->getName(); } void AgentSession::open() { impl->open(); } void AgentSession::close() { impl->close(); } bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } -int AgentSession::pendingEvents() const { return impl->pendingEvents(); } void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } void AgentSession::delData(const DataAddr& a) { impl->delData(a); } @@ -320,7 +318,7 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) + if (eventQueue.empty()) cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); @@ -334,13 +332,6 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) } -int AgentSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 7b51d80032..e12c1152f6 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -54,7 +54,6 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } -int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } @@ -66,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), + connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -214,7 +213,7 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) + if (eventQueue.empty()) cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); @@ -228,13 +227,6 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) } -int ConsoleSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -429,23 +421,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian iter = content.find("_values"); if (iter == content.end()) return; - const Variant::Map& in_attrs(iter->second.asMap()); - Variant::Map attrs; - - // - // Copy the map from the message to "attrs". Translate any old-style - // keys to their new key values in the process. - // - for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { - if (iter->first == "epoch") - attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; - else if (iter->first == "timestamp") - attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; - else if (iter->first == "heartbeat_interval") - attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; - else - attrs[iter->first] = iter->second; - } + Variant::Map attrs(iter->second.asMap()); iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index 429dfc4881..675c8bcfb5 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -58,7 +58,6 @@ namespace qmf { void open(); void close(); bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t); - int pendingEvents() const; uint32_t getAgentCount() const; Agent getAgent(uint32_t i) const; Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } @@ -90,8 +89,6 @@ namespace qmf { std::string directBase; std::string topicBase; boost::shared_ptr<SchemaCache> schemaCache; - qpid::sys::Mutex corrlock; - uint32_t nextCorrelator; void enqueueEvent(const ConsoleEvent&); void enqueueEventLH(const ConsoleEvent&); @@ -102,7 +99,6 @@ namespace qmf { void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); void run(); - uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; diff --git a/qpid/cpp/src/qmf/DataAddr.cpp b/qpid/cpp/src/qmf/DataAddr.cpp index d16e12062e..fb51d5787f 100644 --- a/qpid/cpp/src/qmf/DataAddr.cpp +++ b/qpid/cpp/src/qmf/DataAddr.cpp @@ -36,9 +36,7 @@ DataAddr::~DataAddr() { PI::dtor(*this); } DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); } bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; } -bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; } bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; } -bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; } DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); } DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); } @@ -47,7 +45,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); } uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); } Variant::Map DataAddr::asMap() const { return impl->asMap(); } -bool DataAddrImpl::operator==(const DataAddrImpl& other) const +bool DataAddrImpl::operator==(const DataAddrImpl& other) { return agentName == other.agentName && @@ -56,7 +54,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other) const } -bool DataAddrImpl::operator<(const DataAddrImpl& other) const +bool DataAddrImpl::operator<(const DataAddrImpl& other) { if (agentName < other.agentName) return true; if (agentName > other.agentName) return false; diff --git a/qpid/cpp/src/qmf/DataAddrImpl.h b/qpid/cpp/src/qmf/DataAddrImpl.h index 11d512f0c4..3f9cae9453 100644 --- a/qpid/cpp/src/qmf/DataAddrImpl.h +++ b/qpid/cpp/src/qmf/DataAddrImpl.h @@ -38,8 +38,8 @@ namespace qmf { // // Methods from API handle // - bool operator==(const DataAddrImpl&) const; - bool operator<(const DataAddrImpl&) const; + bool operator==(const DataAddrImpl&); + bool operator<(const DataAddrImpl&); DataAddrImpl(const qpid::types::Variant::Map&); DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) : agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {} diff --git a/qpid/cpp/src/qmf/PrivateImplRef.h b/qpid/cpp/src/qmf/PrivateImplRef.h index 960cbb2e09..8b698c4199 100644 --- a/qpid/cpp/src/qmf/PrivateImplRef.h +++ b/qpid/cpp/src/qmf/PrivateImplRef.h @@ -23,8 +23,8 @@ */ #include "qmf/ImportExport.h" -#include "qpid/RefCounted.h" #include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" namespace qmf { diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp index 41dd9ff00c..ab65b8d768 100644 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp @@ -334,7 +334,8 @@ void ResilientConnectionImpl::notify() { if (notifyFd != -1) { - (void) ::write(notifyFd, ".", 1); + int unused_ret; //Suppress warnings about ignoring return value. + unused_ret = ::write(notifyFd, ".", 1); } } @@ -431,7 +432,8 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k if (notifyFd != -1) { - (void) ::write(notifyFd, ".", 1); + int unused_ret; //Suppress warnings about ignoring return value. + unused_ret = ::write(notifyFd, ".", 1); } } diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index f75663e131..e0948a9911 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -55,12 +55,9 @@ void SchemaHash::update(uint8_t data) void SchemaHash::update(const char* data, uint32_t len) { - union h { - uint8_t b[16]; - uint64_t q[2]; - }* h = reinterpret_cast<union h*>(&hash[0]); - uint64_t* first = &h->q[0]; - uint64_t* second = &h->q[1]; + uint64_t* first = (uint64_t*) hash; + uint64_t* second = (uint64_t*) hash + 1; + for (uint32_t idx = 0; idx < len; idx++) { *first = *first ^ (uint64_t) data[idx]; *second = *second << 1; |