diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
85 files changed, 1361 insertions, 1120 deletions
diff --git a/qpid/cpp/src/qpid/acl/AclData.cpp b/qpid/cpp/src/qpid/acl/AclData.cpp index 81519c3311..5d7a028736 100644 --- a/qpid/cpp/src/qpid/acl/AclData.cpp +++ b/qpid/cpp/src/qpid/acl/AclData.cpp @@ -18,7 +18,8 @@ #include "qpid/acl/AclData.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/IntegerTypes.h" +#include <boost/lexical_cast.hpp> namespace qpid { namespace acl { @@ -57,14 +58,15 @@ AclResult AclData::lookup(const std::string& id, const Action& action, const Obj const std::string& name, std::map<Property, std::string>* params) { QPID_LOG(debug, "ACL: Lookup for id:" << id << " action:" << AclHelper::getActionStr((Action) action) - << " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " name:" << name - << " with params " << AclHelper::propertyMapToString(params)); + << " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " name:" << name + << " with params " << AclHelper::propertyMapToString(params)); AclResult aclresult = decisionMode; if (actionList[action] && actionList[action][objType]) { AclData::actObjItr itrRule = actionList[action][objType]->find(id); if (itrRule == actionList[action][objType]->end()) itrRule = actionList[action][objType]->find("*"); + if (itrRule != actionList[action][objType]->end()) { QPID_LOG(debug, "ACL: checking the following rules for : " << itrRule->first ); @@ -79,25 +81,48 @@ AclResult AclData::lookup(const std::string& id, const Action& action, const Obj if (pMItr->first == acl::PROP_NAME) { if (matchProp(pMItr->second, name)){ QPID_LOG(debug, "ACL: name '" << name << "' matched with name '" - << pMItr->second << "' given in the rule"); - }else{ + << pMItr->second << "' given in the rule"); + }else{ match = false; QPID_LOG(debug, "ACL: name '" << name << "' didn't match with name '" - << pMItr->second << "' given in the rule"); + << pMItr->second << "' given in the rule"); } } else if (params) { //match pMItr against params propertyMapItr paramItr = params->find(pMItr->first); if (paramItr == params->end()) { match = false; QPID_LOG(debug, "ACL: the given parameter map in lookup doesn't contain the property '" - << AclHelper::getPropertyStr(pMItr->first) << "'"); - } else if (!matchProp(pMItr->second, paramItr->second)) { + << AclHelper::getPropertyStr(pMItr->first) << "'"); + }else if ( pMItr->first == acl::PROP_MAXQUEUECOUNT || pMItr->first == acl::PROP_MAXQUEUESIZE ) { + if ( pMItr->first == paramItr->first ) { + uint64_t aclMax = boost::lexical_cast<uint64_t>(pMItr->second); + uint64_t paramMax = boost::lexical_cast<uint64_t>(paramItr->second); + QPID_LOG(debug, "ACL: Numeric comparison for property " << + AclHelper::getPropertyStr(paramItr->first) << + " (value given in lookup = " << + boost::lexical_cast<std::string>(paramItr->second) << + ", value give in rule = " << + boost::lexical_cast<std::string>(pMItr->second) << " )"); + if (( aclMax ) && ( paramMax == 0 || paramMax > aclMax)){ + match = decisionMode == qpid::acl::ALLOW ; + QPID_LOG(debug, "ACL: Limit exceeded and match=" << + (match ? "true": "false") << + " as decision mode is " << AclHelper::getAclResultStr(decisionMode)); + } + } + }else if (matchProp(pMItr->second, paramItr->second)) { + QPID_LOG(debug, "ACL: the pair(" + << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second + << ") given in lookup matched the pair(" + << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule"); + } else { QPID_LOG(debug, "ACL: the pair(" - << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second - << ") given in lookup doesn't match the pair(" - << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule"); + << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second + << ") given in lookup doesn't match the pair(" + << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule"); match = false; - } + + } } } if (match) @@ -116,37 +141,63 @@ AclResult AclData::lookup(const std::string& id, const Action& action, const Obj AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& /*Exchange*/ name, const std::string& RoutingKey) { - AclResult aclresult = decisionMode; + + QPID_LOG(debug, "ACL: Lookup for id:" << id << " action:" << AclHelper::getActionStr((Action) action) + << " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " exchange name:" << name + << " with routing key " << RoutingKey); + + AclResult aclresult = decisionMode; - if (actionList[action] && actionList[action][objType]){ - AclData::actObjItr itrRule = actionList[action][objType]->find(id); - if (itrRule == actionList[action][objType]->end()) + if (actionList[action] && actionList[action][objType]){ + AclData::actObjItr itrRule = actionList[action][objType]->find(id); + + if (itrRule == actionList[action][objType]->end()) itrRule = actionList[action][objType]->find("*"); + if (itrRule != actionList[action][objType]->end() ) { + QPID_LOG(debug, "ACL: checking the following rules for : " << itrRule->first ); + //loop the vector - for (ruleSetItr i=itrRule->second.begin(); i<itrRule->second.end(); i++) { - + for (ruleSetItr i=itrRule->second.begin(); i<itrRule->second.end(); i++) { + QPID_LOG(debug, "ACL: checking rule " << i->toString()); + // loop the names looking for match bool match =true; for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++) { - //match name is exists first + //match name is exists first if (pMItr->first == acl::PROP_NAME){ - if (!matchProp(pMItr->second, name)){ - match= false; - } + if (matchProp(pMItr->second, name)){ + QPID_LOG(debug, "ACL: name '" << name << "' matched with name '" + << pMItr->second << "' given in the rule"); + + }else{ + match= false; + QPID_LOG(debug, "ACL: name '" << name << "' didn't match with name '" + << pMItr->second << "' given in the rule"); + } }else if (pMItr->first == acl::PROP_ROUTINGKEY){ - if (!matchProp(pMItr->second, RoutingKey)){ - match= false; - } + if (matchProp(pMItr->second, RoutingKey)){ + QPID_LOG(debug, "ACL: name '" << name << "' matched with routing_key '" + << pMItr->second << "' given in the rule"); + }else{ + match= false; + QPID_LOG(debug, "ACL: name '" << name << "' didn't match with routing_key '" + << pMItr->second << "' given in the rule"); + } } } - if (match) return getACLResult(i->logOnly, i->log); - } + if (match){ + aclresult = getACLResult(i->logOnly, i->log); + QPID_LOG(debug,"Successful match, the decision is:" << AclHelper::getAclResultStr(aclresult)); + return aclresult; + } + } } - } - return aclresult; + } + QPID_LOG(debug,"No successful match, defaulting to the decision mode " << AclHelper::getAclResultStr(aclresult)); + return aclresult; } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 093e9cea32..f9f39316e2 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -78,7 +78,7 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : - interval(10), extThread(false), + interval(10), extThread(false), pipeHandle(0), initialized(false), connected(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), @@ -89,13 +89,12 @@ ManagementAgentImpl::ManagementAgentImpl() : ManagementAgentImpl::~ManagementAgentImpl() { + // shutdown & cleanup all threads connThreadBody.close(); + pubThreadBody.close(); - // If the thread is doing work on the connection, we must wait for it to - // complete before shutting down. - if (!connThreadBody.isSleeping()) { - connThread.join(); - } + connThread.join(); + pubThread.join(); // Release the memory associated with stored management objects. { @@ -777,6 +776,7 @@ void ManagementAgentImpl::ConnectionThread::run() static const int delayFactor(2); int delay(delayMin); string dest("qmfagent"); + ConnectionThread::shared_ptr tmp; sessionId.generate(); queueName << "qmfagent-" << sessionId; @@ -787,7 +787,7 @@ void ManagementAgentImpl::ConnectionThread::run() QPID_LOG(debug, "QMF Agent attempting to connect to the broker..."); connection.open(agent.connectionSettings); session = connection.newSession(queueName.str()); - subscriptions = new client::SubscriptionManager(session); + subscriptions.reset(new client::SubscriptionManager(session)); session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, arg::exclusive=true); @@ -811,11 +811,12 @@ void ManagementAgentImpl::ConnectionThread::run() operational = false; agent.connected = false; + tmp = subscriptions; + subscriptions.reset(); } + tmp.reset(); // frees the subscription outside the lock delay = delayMin; connection.close(); - delete subscriptions; - subscriptions = 0; } } catch (exception &e) { if (delay < delayMax) @@ -824,14 +825,19 @@ void ManagementAgentImpl::ConnectionThread::run() } { + // sleep for "delay" seconds, but peridically check if the + // agent is shutting down so we don't hang for up to delayMax + // seconds during agent shutdown Mutex::ScopedLock _lock(connLock); if (shutdown) return; sleeping = true; - { - Mutex::ScopedUnlock _unlock(connLock); - ::sleep(delay); - } + int totalSleep = 0; + do { + Mutex::ScopedUnlock _unlock(connLock); + ::sleep(delayMin); + totalSleep += delayMin; + } while (totalSleep < delay && !shutdown); sleeping = false; if (shutdown) return; @@ -848,10 +854,12 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); if (!operational) return; + s = subscriptions; } Message msg; @@ -866,8 +874,8 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, } catch(exception& e) { QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); // Bounce the connection - if (subscriptions) - subscriptions->stop(); + if (s) + s->stop(); } } @@ -881,12 +889,14 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint void ManagementAgentImpl::ConnectionThread::close() { + ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); shutdown = true; + s = subscriptions; } - if (subscriptions) - subscriptions->stop(); + if (s) + s->stop(); } bool ManagementAgentImpl::ConnectionThread::isSleeping() const @@ -898,8 +908,13 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const void ManagementAgentImpl::PublishThread::run() { - while (true) { + uint16_t totalSleep; + + while (!shutdown) { agent.periodicProcessing(); - ::sleep(agent.getInterval()); + totalSleep = 0; + while (totalSleep++ < agent.getInterval() && !shutdown) { + ::sleep(1); + } } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index f9cad9ebf5..a876496e98 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -163,12 +163,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen friend class ConnectionThread; class ConnectionThread : public sys::Runnable { + typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr; + bool operational; ManagementAgentImpl& agent; framing::Uuid sessionId; client::Connection connection; client::Session session; - client::SubscriptionManager* subscriptions; + ConnectionThread::shared_ptr subscriptions; std::stringstream queueName; mutable sys::Mutex connLock; bool shutdown; @@ -176,7 +178,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void run(); public: ConnectionThread(ManagementAgentImpl& _agent) : - operational(false), agent(_agent), subscriptions(0), + operational(false), agent(_agent), shutdown(false), sleeping(false) {} ~ConnectionThread(); void sendBuffer(qpid::framing::Buffer& buf, @@ -192,8 +194,11 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen { ManagementAgentImpl& agent; void run(); + bool shutdown; public: - PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {} + PublishThread(ManagementAgentImpl& _agent) : + agent(_agent), shutdown(false) {} + void close() { shutdown = true; } }; ConnectionThread connThreadBody; diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 96d5146c30..bf2e7d5713 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -31,7 +31,7 @@ namespace amqp_0_10 { using sys::Mutex; Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), identifier(id), initialized(false), + : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0), version(0,10) {} @@ -61,19 +61,23 @@ size_t Connection::decode(const char* buffer, size_t size) { } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); Mutex::ScopedLock l(frameQueueLock); - return (!isClient && !initialized) || !frameQueue.empty(); + if (!popClosed) { + Mutex::ScopedUnlock u(frameQueueLock); + connection->doOutput(); + } + return !popClosed && ((!isClient && !initialized) || !frameQueue.empty()); } bool Connection::isClosed() const { Mutex::ScopedLock l(frameQueueLock); - return frameQueueClosed; + return pushClosed && popClosed; } size_t Connection::encode(const char* buffer, size_t size) { { // Swap frameQueue data into workQueue to avoid holding lock while we encode. Mutex::ScopedLock l(frameQueueLock); + if (popClosed) return 0; // Can't pop any more frames. assert(workQueue.empty()); workQueue.swap(frameQueue); } @@ -102,6 +106,8 @@ size_t Connection::encode(const char* buffer, size_t size) { // Put back any frames we did not encode. frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end()); workQueue.clear(); + if (frameQueue.empty() && pushClosed) + popClosed = true; } return out.getPosition(); } @@ -111,9 +117,10 @@ void Connection::activateOutput() { output.activateOutput(); } void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); } void Connection::close() { - // Close the output queue. + // No more frames can be pushed onto the queue. + // Frames aleady on the queue can be popped. Mutex::ScopedLock l(frameQueueLock); - frameQueueClosed = true; + pushClosed = true; } void Connection::closed() { @@ -123,7 +130,7 @@ void Connection::closed() { void Connection::send(framing::AMQFrame& f) { { Mutex::ScopedLock l(frameQueueLock); - if (!frameQueueClosed) + if (!pushClosed) frameQueue.push_back(f); buffered += f.encodedSize(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index 6fd51381fc..995d824796 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -47,7 +47,7 @@ class Connection : public sys::ConnectionCodec, FrameQueue frameQueue; FrameQueue workQueue; - bool frameQueueClosed; + bool pushClosed, popClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; std::auto_ptr<sys::ConnectionInputHandler> connection; diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index 536fa21b2b..2f4f7eaacc 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -40,7 +40,8 @@ enum Action {ACT_CONSUME, ACT_PUBLISH, ACT_CREATE, ACT_ACCESS, ACT_BIND, enum Property {PROP_NAME, PROP_DURABLE, PROP_OWNER, PROP_ROUTINGKEY, PROP_PASSIVE, PROP_AUTODELETE, PROP_EXCLUSIVE, PROP_TYPE, PROP_ALTERNATE, PROP_QUEUENAME, PROP_SCHEMAPACKAGE, - PROP_SCHEMACLASS}; + PROP_SCHEMACLASS, PROP_POLICYTYPE, PROP_MAXQUEUESIZE, + PROP_MAXQUEUECOUNT}; enum AclResult {ALLOW, ALLOWLOG, DENY, DENYLOG}; } // namespace acl @@ -132,6 +133,9 @@ class AclHelper { if (str.compare("queuename") == 0) return PROP_QUEUENAME; if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE; if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS; + if (str.compare("policytype") == 0) return PROP_POLICYTYPE; + if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE; + if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT; throw str; } static inline std::string getPropertyStr(const Property p) { @@ -148,6 +152,9 @@ class AclHelper { case PROP_QUEUENAME: return "queuename"; case PROP_SCHEMAPACKAGE: return "schemapackage"; case PROP_SCHEMACLASS: return "schemaclass"; + case PROP_POLICYTYPE: return "policytype"; + case PROP_MAXQUEUESIZE: return "maxqueuesize"; + case PROP_MAXQUEUECOUNT: return "maxqueuecount"; default: assert(false); // should never get here } return ""; @@ -217,11 +224,14 @@ class AclHelper { // == Queues == propSetPtr p4(new propSet); - p3->insert(PROP_ALTERNATE); - p3->insert(PROP_PASSIVE); - p3->insert(PROP_DURABLE); - p3->insert(PROP_EXCLUSIVE); - p3->insert(PROP_AUTODELETE); + p4->insert(PROP_ALTERNATE); + p4->insert(PROP_PASSIVE); + p4->insert(PROP_DURABLE); + p4->insert(PROP_EXCLUSIVE); + p4->insert(PROP_AUTODELETE); + p4->insert(PROP_POLICYTYPE); + p4->insert(PROP_MAXQUEUESIZE); + p4->insert(PROP_MAXQUEUECOUNT); actionMapPtr a1(new actionMap); a1->insert(actionPair(ACT_ACCESS, p0)); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 13cf88fb11..4259bb2f31 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -91,7 +91,8 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), - maxSessionRate(0) + maxSessionRate(0), + asyncQueueEvents(true) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -121,7 +122,8 @@ Broker::Options::Options(const std::string& name) : ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") - ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)"); + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); } const std::string empty; @@ -150,7 +152,7 @@ Broker::Broker(const Broker::Options& conf) : *this), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), queueCleaner(queues, timer), - queueEvents(poller), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) @@ -208,8 +210,10 @@ Broker::Broker(const Broker::Options& conf) : (*i)->earlyInitialize(*this); // If no plugin store module registered itself, set up the null store. - if (store.get() == 0) - setStore (new NullMessageStore()); + if (store.get() == 0) { + boost::shared_ptr<MessageStore> p(new NullMessageStore()); + setStore (p); + } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -296,7 +300,7 @@ boost::intrusive_ptr<Broker> Broker::create(const Options& opts) return boost::intrusive_ptr<Broker>(new Broker(opts)); } -void Broker::setStore (MessageStore* _store) +void Broker::setStore (boost::shared_ptr<MessageStore>& _store) { store.reset(new MessageStoreModule (_store)); queues.setStore (store.get()); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 0517ceca95..5ca01e0867 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -111,6 +111,7 @@ public: bool requireEncrypted; std::string knownHosts; uint32_t maxSessionRate; + bool asyncQueueEvents; private: std::string getHome(); @@ -171,7 +172,7 @@ public: /** Shut down the broker */ virtual void shutdown(); - QPID_BROKER_EXTERN void setStore (MessageStore*); + QPID_BROKER_EXTERN void setStore (boost::shared_ptr<MessageStore>& store); MessageStore& getStore() { return *store; } void setAcl (AclModule* _acl) {acl = _acl;} AclModule* getAcl() { return acl; } diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index b9f24dee5f..29fe47beac 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -145,39 +145,12 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) { PreRoute pr(msg, this); - Queues::ConstPtr p; + ConstBindingList b; { Mutex::ScopedLock l(lock); - p = bindings[routingKey].queues.snapshot(); - } - int count(0); - - if (p) { - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); - } - } - - if(!count){ - QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey - << "; no matching binding found"); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); + b = bindings[routingKey].queues.snapshot(); } + doRoute(msg, b); } diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp index b189ef4cdb..bca3f90bbe 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.cpp +++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp @@ -48,12 +48,26 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw() void DtxAck::commit() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::committed)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } + } void DtxAck::rollback() throw() { - for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); - pending.clear(); + try { + for_each(pending.begin(), pending.end(), mem_fun_ref(&DeliveryRecord::requeue)); + pending.clear(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 90d81b81c6..757127eef2 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -76,6 +76,40 @@ Exchange::PreRoute::~PreRoute(){ } } +void Exchange::doRoute(Deliverable& msg, ConstBindingList b) +{ + int count = 0; + + if (b.get()) { + // Block the content release if the message is transient AND there is more than one binding + if (!msg.getMessage().isPersistent() && b->size() > 1) + msg.getMessage().blockContentRelease(); + + for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + } + + if (mgmtExchange != 0) + { + mgmtExchange->inc_msgReceives (); + mgmtExchange->inc_byteReceives (msg.contentSize ()); + if (count == 0) + { + //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); + mgmtExchange->inc_msgDrops (); + mgmtExchange->inc_byteDrops (msg.contentSize ()); + } + else + { + mgmtExchange->inc_msgRoutes (count); + mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); + } + } +} + void Exchange::routeIVE(){ if (ive && lastMsg.get()){ DeliverableMessage dmsg(lastMsg); diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index c1e878200f..9bea376c28 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -79,6 +79,9 @@ protected: Exchange* parent; }; + typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList; + typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; + void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index e9007ba682..b7d46a33fe 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -106,36 +106,12 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons return true; } -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ +void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/) +{ PreRoute pr(msg, this); - uint32_t count(0); - - BindingsArray::ConstPtr p = bindings.snapshot(); - if (p.get()){ - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); - } - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, bindings.snapshot()); } - + bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { BindingsArray::ConstPtr ptr = bindings.snapshot(); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index c628c44909..a7c90156e1 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -104,7 +104,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, } -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ +void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args) +{ if (!args) { //can't match if there were no headers passed in if (mgmtExchange != 0) { @@ -118,31 +119,17 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons PreRoute pr(msg, this); - uint32_t count(0); - - Bindings::ConstPtr p = bindings.snapshot(); - if (p.get()){ + ConstBindingList p = bindings.snapshot(); + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + if (p.get()) + { for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { if (match((*i)->args, *args)) { - msg.deliverTo((*i)->queue); - count++; - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + b->push_back(*i); } } } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives(); - mgmtExchange->inc_byteReceives(msg.contentSize()); - if (count == 0) { - mgmtExchange->inc_msgDrops(); - mgmtExchange->inc_byteDrops(msg.contentSize()); - } else { - mgmtExchange->inc_msgRoutes(count); - mgmtExchange->inc_byteRoutes(count * msg.contentSize()); - } - } + doRoute(msg, b); } diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7360010192..e2799b0bff 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -49,7 +49,7 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {} + expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {} Message::~Message() { @@ -98,7 +98,7 @@ const FieldTable* Message::getApplicationHeaders() const return getAdapter().getApplicationHeaders(frames); } -bool Message::isPersistent() +bool Message::isPersistent() const { return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -108,12 +108,16 @@ bool Message::requiresAccept() return getAdapter().requiresAccept(frames); } -uint32_t Message::getRequiredCredit() const +uint32_t Message::getRequiredCredit() { - //add up payload for all header and content frames in the frameset - SumBodySize sum; - frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); - return sum.getSize(); + sys::Mutex::ScopedLock l(lock); + if (!requiredCredit) { + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>()); + requiredCredit = sum.getSize(); + } + return requiredCredit; } void Message::encode(framing::Buffer& buffer) const @@ -181,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer) loaded = true; } -void Message::releaseContent(MessageStore* _store) +void Message::tryReleaseContent() { - if (!store) { - store = _store; + if (checkContentReleasable()) { + releaseContent(); } +} + +void Message::releaseContent(MessageStore* s) +{ + //deprecated, use setStore(store); releaseContent(); instead + if (!store) setStore(s); + releaseContent(); +} + +void Message::releaseContent() +{ + sys::Mutex::ScopedLock l(lock); if (store) { if (!getPersistenceId()) { intrusive_ptr<PersistableMessage> pmsg(this); store->stage(pmsg); staged = true; } + //ensure required credit is cached before content frames are released + getRequiredCredit(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); @@ -211,31 +229,29 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { - if (isContentReleased()) { - intrusive_ptr<const PersistableMessage> pmsg(this); - - bool done = false; - string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, pmsg, data, offset, maxContentSize); - done = data.size() < maxContentSize; - frame.setBof(false); - frame.setEof(true); - QPID_LOG(debug, "loaded frame" << frame); - if (offset > 0) { - frame.setBos(false); - } - if (!done) { - frame.setEos(false); - } else return false; - return true; + intrusive_ptr<const PersistableMessage> pmsg(this); + + bool done = false; + string& data = frame.castBody<AMQContentBody>()->getData(); + store->loadContent(queue, pmsg, data, offset, maxContentSize); + done = data.size() < maxContentSize; + frame.setBof(false); + frame.setEof(true); + QPID_LOG(debug, "loaded frame" << frame); + if (offset > 0) { + frame.setBos(false); } - else return false; + if (!done) { + frame.setEos(false); + } else return false; + return true; } void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { + sys::Mutex::ScopedLock l(lock); if (isContentReleased() && !frames.isComplete()) { - + sys::Mutex::ScopedUnlock u(lock); uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) @@ -373,28 +389,36 @@ void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Que } void Message::allEnqueuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, enqueueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = enqueueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } void Message::allDequeuesComplete() { - MessageCallback* cb = 0; - { - sys::Mutex::ScopedLock l(lock); - std::swap(cb, dequeueCallback); - } + sys::Mutex::ScopedLock l(callbackLock); + MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; } -void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; } +void Message::setEnqueueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = &cb; +} + +void Message::resetEnqueueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + enqueueCallback = 0; +} -void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; } -void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; } +void Message::setDequeueCompleteCallback(MessageCallback& cb) { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = &cb; +} + +void Message::resetDequeueCompleteCallback() { + sys::Mutex::ScopedLock l(callbackLock); + dequeueCallback = 0; +} framing::FieldTable& Message::getOrInsertHeaders() { diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e4d09b1042..3894960c95 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -74,7 +74,7 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; framing::FieldTable& getOrInsertHeaders(); - QPID_BROKER_EXTERN bool isPersistent(); + QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); @@ -108,7 +108,7 @@ public: return frames.isA<T>(); } - uint32_t getRequiredCredit() const; + uint32_t getRequiredCredit(); void encode(framing::Buffer& buffer) const; void encodeContent(framing::Buffer& buffer) const; @@ -129,12 +129,9 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); + void QPID_BROKER_EXTERN tryReleaseContent(); + void releaseContent(); + void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead void destroy(); bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const; @@ -187,8 +184,12 @@ public: mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; + + sys::Mutex callbackLock; MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; + + uint32_t requiredCredit; static std::string updateDestination; }; diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index 14b233fd6c..b1a2b77b05 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -80,7 +80,7 @@ void MessageBuilder::handle(AMQFrame& frame) && !NullMessageStore::isNullStore(store) && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */) { - message->releaseContent(store); + message->releaseContent(); staging = true; } } @@ -96,6 +96,7 @@ void MessageBuilder::end() void MessageBuilder::start(const SequenceNumber& id) { message = intrusive_ptr<Message>(new Message(id)); + message->setStore(store); state = METHOD; staging = false; } diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp index 0b8a5db1c7..5f7cceebd3 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -32,11 +32,11 @@ using qpid::framing::FieldTable; namespace qpid { namespace broker { -MessageStoreModule::MessageStoreModule(MessageStore* _store) : store(_store) {} +MessageStoreModule::MessageStoreModule(boost::shared_ptr<MessageStore>& _store) + : store(_store) {} MessageStoreModule::~MessageStoreModule() { - delete store; } bool MessageStoreModule::init(const Options*) { return true; } @@ -173,7 +173,7 @@ void MessageStoreModule::collectPreparedXids(std::set<std::string>& xids) bool MessageStoreModule::isNull() const { - return NullMessageStore::isNullStore(store); + return NullMessageStore::isNullStore(store.get()); } }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageStoreModule.h b/qpid/cpp/src/qpid/broker/MessageStoreModule.h index 02cbd13cf1..56b5a3c1ae 100644 --- a/qpid/cpp/src/qpid/broker/MessageStoreModule.h +++ b/qpid/cpp/src/qpid/broker/MessageStoreModule.h @@ -26,6 +26,7 @@ #include "qpid/broker/RecoveryManager.h" #include <boost/intrusive_ptr.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -35,9 +36,9 @@ namespace broker { */ class MessageStoreModule : public MessageStore { - MessageStore* store; + boost::shared_ptr<MessageStore> store; public: - MessageStoreModule(MessageStore* store); + MessageStoreModule(boost::shared_ptr<MessageStore>& store); bool init(const Options* options); void truncateInit(const bool pushDownStoreFiles = false); diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index 2ef223aa81..303a0501f4 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -36,7 +36,6 @@ PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : asyncEnqueueCounter(0), asyncDequeueCounter(0), - contentReleased(false), store(0) {} @@ -59,9 +58,15 @@ void PersistableMessage::flush() } } -void PersistableMessage::setContentReleased() {contentReleased = true; } +void PersistableMessage::setContentReleased() +{ + contentReleaseState.released = true; +} -bool PersistableMessage::isContentReleased()const { return contentReleased; } +bool PersistableMessage::isContentReleased() const +{ + return contentReleaseState.released; +} bool PersistableMessage::isEnqueueComplete() { sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); @@ -153,6 +158,26 @@ void PersistableMessage::dequeueAsync() { asyncDequeueCounter++; } +PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} + +void PersistableMessage::setStore(MessageStore* s) +{ + store = s; +} + +void PersistableMessage::requestContentRelease() +{ + contentReleaseState.requested = true; +} +void PersistableMessage::blockContentRelease() +{ + contentReleaseState.blocked = true; +} +bool PersistableMessage::checkContentReleasable() +{ + return contentReleaseState.requested && !contentReleaseState.blocked; +} + }} diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index 0274b41375..2576e266d2 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -68,8 +68,16 @@ class PersistableMessage : public Persistable void enqueueAsync(); void dequeueAsync(); - bool contentReleased; syncList synclist; + struct ContentReleaseState + { + bool blocked; + bool requested; + bool released; + + ContentReleaseState(); + }; + ContentReleaseState contentReleaseState; protected: /** Called when all enqueues are complete for this message. */ @@ -96,8 +104,15 @@ class PersistableMessage : public Persistable void flush(); - bool isContentReleased() const; - + bool QPID_BROKER_EXTERN isContentReleased() const; + + void QPID_BROKER_EXTERN setStore(MessageStore*); + void requestContentRelease(); + void blockContentRelease(); + bool checkContentReleasable(); + + virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; + QPID_BROKER_EXTERN bool isEnqueueComplete(); QPID_BROKER_EXTERN void enqueueComplete(); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index b2a8e223c5..86de96468d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -181,6 +181,8 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ void Queue::recover(boost::intrusive_ptr<Message>& msg){ + if (policy.get()) policy->recoverEnqueued(msg); + push(msg, true); if (store){ // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure @@ -206,11 +208,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (!isEnqueued(msg)) return; - QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); + if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); @@ -563,16 +564,10 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ - Messages dequeues; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (policy.get()) { - policy->tryEnqueue(qm); - //depending on policy, may have some dequeues - if (!isRecovery) pendingDequeues.swap(dequeues); - } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -606,12 +601,11 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (eventMgr) eventMgr->enqueued(qm); else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); } + if (policy.get()) { + policy->enqueued(qm); + } } copy.notify(); - if (!dequeues.empty()) { - //depending on policy, may have some dequeues - for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); - } } QueuedMessage Queue::getFront() @@ -697,8 +691,19 @@ void Queue::setLastNodeFailure() } // return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) +bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck) { + if (policy.get() && !suppressPolicyCheck) { + Messages dequeues; + { + Mutex::ScopedLock locker(messageLock); + policy->tryEnqueue(msg); + policy->getPendingDequeues(dequeues); + } + //depending on policy, may have some dequeues that need to performed without holding the lock + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + } + if (inLastNodeFailure && persistLastNode){ msg->forcePersistent(); } @@ -707,15 +712,27 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) msg->addTraceId(traceId); } - if (msg->isPersistent() && store) { + if ((msg->isPersistent() || msg->checkContentReleasable()) && store) { msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } + if (!store) { + //Messages enqueued on a transient queue should be prevented + //from having their content released as it may not be + //recoverable by these queue for delivery + msg->blockContentRelease(); + } return false; } +void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg) +{ + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->enqueueAborted(msg); +} + // return true if store exists, bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { @@ -726,7 +743,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) dequeued(msg); } } - if (msg.payload->isPersistent() && store) { + if ((msg.payload->isPersistent() || msg.payload->checkContentReleasable()) && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); @@ -781,22 +798,37 @@ void Queue::create(const FieldTable& _settings) void Queue::configure(const FieldTable& _settings, bool recovering) { - setPolicy(QueuePolicy::createQueuePolicy(_settings)); + + eventMode = _settings.getAsInt(qpidQueueEventGeneration); + + if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && + (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + if ( NullMessageStore::isNullStore(store)) { + QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); + } else if (eventMgr && !eventMgr->isSync() ) { + QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); + } + FieldTable copy(_settings); + copy.erase(QueuePolicy::typeKey); + setPolicy(QueuePolicy::createQueuePolicy(getName(), copy)); + } else { + setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); + } //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); - QPID_LOG(debug, "Configured queue with no-local=" << noLocal); + QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); + if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); + QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); lastValueQueue = lastValueQueueNoBrowse; } persistLastNode= _settings.get(qpidPersistLastNode); - if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); + if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName()); traceId = _settings.getAsString(qpidTraceIdentity); std::string excludeList = _settings.getAsString(qpidTraceExclude); @@ -806,8 +838,6 @@ void Queue::configure(const FieldTable& _settings, bool recovering) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); - eventMode = _settings.getAsInt(qpidQueueEventGeneration); - FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers); if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); @@ -975,19 +1005,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } -bool Queue::releaseMessageContent(const QueuedMessage& m) -{ - if (store && !NullMessageStore::isNullStore(store)) { - QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); - m.payload->releaseContent(store); - return true; - } else { - QPID_LOG(warning, "Message " << m.position << " on " << name - << " cannot be released from memory as the queue is not durable"); - return false; - } -} - ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; @@ -1044,11 +1061,12 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { if (m.payload) { - if (policy.get()) policy->tryEnqueue(m); - mgntEnqStats(m.payload); - if (m.payload->isPersistent()) { - enqueue ( 0, m.payload ); + if (policy.get()) { + policy->recoverEnqueued(m.payload); + policy->enqueued(m); } + mgntEnqStats(m.payload); + enqueue ( 0, m.payload, true ); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1059,10 +1077,4 @@ bool Queue::isEnqueued(const QueuedMessage& msg) return !policy.get() || policy->isEnqueued(msg); } -void Queue::addPendingDequeue(const QueuedMessage& msg) -{ - //assumes lock is held - true at present but rather nasty as this is a public method - pendingDequeues.push_back(msg); -} - QueueListeners& Queue::getListeners() { return listeners; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 77799fd967..661e46f619 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -239,7 +239,8 @@ namespace qpid { QPID_BROKER_EXTERN void setLastNodeFailure(); QPID_BROKER_EXTERN void clearLastNodeFailure(); - bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg); + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck = false); + void enqueueAborted(boost::intrusive_ptr<Message> msg); /** * dequeue from store (only done once messages is acknowledged) */ @@ -315,8 +316,6 @@ namespace qpid { bindings.eachBinding(f); } - bool releaseMessageContent(const QueuedMessage&); - void popMsg(QueuedMessage& qmsg); /** Set the position sequence number for the next message on the queue. @@ -335,18 +334,6 @@ namespace qpid { */ void recoveryComplete(); - /** - * This is a hack to avoid deadlocks in durable ring - * queues. It is used for dequeueing messages in response - * to an enqueue while avoid holding lock over call to - * store. - * - * Assumes messageLock is held - true for curent use case - * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public - * method - **/ - void addPendingDequeue(const QueuedMessage &msg); - // For cluster update QueueListeners& getListeners(); }; diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index 6df869673d..bba054b0b8 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -25,25 +25,41 @@ namespace qpid { namespace broker { -QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : - eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) +QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync) : + eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true), sync(isSync) { - eventQueue.start(); + if (!sync) eventQueue.start(); } QueueEvents::~QueueEvents() { - eventQueue.stop(); + if (!sync) eventQueue.stop(); } void QueueEvents::enqueued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(ENQUEUE, m)); + if (enabled) { + Event enq(ENQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(enq); + } else { + eventQueue.push(enq); + } + } } void QueueEvents::dequeued(const QueuedMessage& m) { - if (enabled) eventQueue.push(Event(DEQUEUE, m)); + if (enabled) { + Event deq(DEQUEUE, m); + if (sync) { + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) + j->second(deq); + } else { + eventQueue.push(Event(DEQUEUE, m)); + } + } } void QueueEvents::registerListener(const std::string& id, const EventListener& listener) @@ -70,15 +86,16 @@ QueueEvents::EventQueue::Batch::const_iterator QueueEvents::handle(const EventQueue::Batch& events) { qpid::sys::Mutex::ScopedLock l(lock); for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) { - for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) - j->second(*i); + for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) { + j->second(*i); + } } return events.end(); } void QueueEvents::shutdown() { - if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); + if (!sync && !eventQueue.empty() && !listeners.empty()) eventQueue.shutdown(); } void QueueEvents::enable() @@ -93,6 +110,12 @@ void QueueEvents::disable() QPID_LOG(debug, "Queue events disabled"); } +bool QueueEvents::isSync() +{ + return sync; +} + + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h index 6826c6e79a..c42752133e 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.h +++ b/qpid/cpp/src/qpid/broker/QueueEvents.h @@ -54,7 +54,7 @@ class QueueEvents typedef boost::function<void (Event)> EventListener; - QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller); + QPID_BROKER_EXTERN QueueEvents(const boost::shared_ptr<sys::Poller>& poller, bool isSync = false); QPID_BROKER_EXTERN ~QueueEvents(); QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); @@ -65,6 +65,7 @@ class QueueEvents void disable(); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); + QPID_BROKER_EXTERN bool isSync(); private: typedef qpid::sys::PollableQueue<Event> EventQueue; typedef std::map<std::string, EventListener> Listeners; @@ -73,6 +74,7 @@ class QueueEvents Listeners listeners; volatile bool enabled; qpid::sys::Mutex lock;//protect listeners from concurrent access + bool sync; EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e); diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 39afe90134..a8aa674c53 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -28,8 +28,8 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {} void QueuePolicy::enqueued(uint64_t _size) { @@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size) void QueuePolicy::dequeued(uint64_t _size) { - //Note: underflow detection is not reliable in the face of - //concurrent updates (at present locking in Queue.cpp prevents - //these anyway); updates are atomic and are safe regardless. if (maxCount) { - if (count.get() > 0) { + if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } if (maxSize) { - if (_size > size.get()) { + if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; @@ -58,47 +55,47 @@ void QueuePolicy::dequeued(uint64_t _size) } } -bool QueuePolicy::checkLimit(const QueuedMessage& m) +bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize; - bool countExceeded = maxCount && (count.get() + 1) > maxCount; + bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize; + bool countExceeded = maxCount && (count + 1) > maxCount; bool exceeded = sizeExceeded || countExceeded; if (exceeded) { if (!policyExceeded) { - policyExceeded = true; - if (m.queue) { - if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName()); - if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName()); - } + policyExceeded = true; + if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name); + if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name); } } else { if (policyExceeded) { policyExceeded = false; - if (m.queue) { - QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName()); - } + QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name); } } return !exceeded; } -void QueuePolicy::tryEnqueue(const QueuedMessage& m) +void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { if (checkLimit(m)) { - enqueued(m); + enqueued(m->contentSize()); } else { - std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); - throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position - << " of size " << m.payload->contentSize() << " , policy: " << *this)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this)); } } -void QueuePolicy::enqueued(const QueuedMessage& m) +void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - enqueued(m.payload->contentSize()); + enqueued(m->contentSize()); } +void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) +{ + dequeued(m->contentSize()); +} + +void QueuePolicy::enqueued(const QueuedMessage&) {} + void QueuePolicy::dequeued(const QueuedMessage& m) { dequeued(m.payload->contentSize()); @@ -132,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } - return FLOW_TO_DISK; + return REJECT; } void QueuePolicy::setDefaultMaxSize(uint64_t s) @@ -140,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -148,8 +146,8 @@ void QueuePolicy::encode(Buffer& buffer) const { buffer.putLong(maxCount); buffer.putLongLong(maxSize); - buffer.putLong(count.get()); - buffer.putLongLong(size.get()); + buffer.putLong(count); + buffer.putLongLong(size); } void QueuePolicy::decode ( Buffer& buffer ) @@ -179,16 +177,18 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} -bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); + if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + return true; } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} +RingQueuePolicy::RingQueuePolicy(const std::string& _name, + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -197,15 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - QueuePolicy::enqueued(m); - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -215,49 +212,32 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this return find(m, pendingDequeues, false) || find(m, queue, false); } -bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept QueuedMessage oldest; - { - qpid::sys::Mutex::ScopedLock l(lock); - if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << " [" << *this << "] " - << ": message size = " << m.payload->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); + if (queue.empty()) { + QPID_LOG(debug, "Message too large for ring queue " << name + << " [" << *this << "] " + << ": message size = " << m->contentSize() << " bytes"); + return false; } + oldest = queue.front(); if (oldest.queue->acquire(oldest) || !strict) { - { - //TODO: fix this! In the current code, this method is - //only ever called with the Queue lock already taken. This - //should not be relied upon going forward however and - //clearly the locking in this class is insufficient as - //there is no guarantee that the message previously atthe - //front is still there. - qpid::sys::Mutex::ScopedLock l(lock); - queue.pop_front(); - pendingDequeues.push_back(oldest); - } - oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in " << name << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -266,6 +246,11 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { @@ -277,25 +262,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) return false; } +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + return createQueuePolicy("<unspecified>", maxCount, maxSize, type); +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { + return createQueuePolicy("<unspecified>", settings); +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) +{ uint32_t maxCount = getInt(settings, maxCountKey, 0); uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); if (maxCount || maxSize) { - return createQueuePolicy(maxCount, maxSize, getType(settings)); + return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { return std::auto_ptr<QueuePolicy>(); } } -std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, + uint32_t maxCount, uint64_t maxSize, const std::string& type) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); } } @@ -305,10 +301,10 @@ namespace qpid { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; return out; diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.h b/qpid/cpp/src/qpid/broker/QueuePolicy.h index 54745876d5..b2937e94c7 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.h +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.h @@ -40,14 +40,14 @@ class QueuePolicy uint32_t maxCount; uint64_t maxSize; const std::string type; - qpid::sys::AtomicValue<uint32_t> count; - qpid::sys::AtomicValue<uint64_t> size; + uint32_t count; + uint64_t size; bool policyExceeded; static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - static std::string getType(const qpid::framing::FieldTable& settings); public: + typedef std::deque<QueuedMessage> Messages; static QPID_BROKER_EXTERN const std::string maxCountKey; static QPID_BROKER_EXTERN const std::string maxSizeKey; static QPID_BROKER_EXTERN const std::string typeKey; @@ -57,27 +57,34 @@ class QueuePolicy static QPID_BROKER_EXTERN const std::string RING_STRICT; virtual ~QueuePolicy() {} - QPID_BROKER_EXTERN void tryEnqueue(const QueuedMessage&); + QPID_BROKER_EXTERN void tryEnqueue(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void recoverEnqueued(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN void enqueueAborted(boost::intrusive_ptr<Message> msg); + virtual void enqueued(const QueuedMessage&); virtual void dequeued(const QueuedMessage&); virtual bool isEnqueued(const QueuedMessage&); - virtual bool checkLimit(const QueuedMessage&); QPID_BROKER_EXTERN void update(qpid::framing::FieldTable& settings); uint32_t getMaxCount() const { return maxCount; } uint64_t getMaxSize() const { return maxSize; } void encode(framing::Buffer& buffer) const; void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; + virtual void getPendingDequeues(Messages& result); - + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + static std::string getType(const qpid::framing::FieldTable& settings); static void setDefaultMaxSize(uint64_t); friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueuePolicy&); protected: - QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + const std::string name; - virtual void enqueued(const QueuedMessage&); + QueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); + + virtual bool checkLimit(boost::intrusive_ptr<Message> msg); void enqueued(uint64_t size); void dequeued(uint64_t size); }; @@ -86,21 +93,20 @@ class QueuePolicy class FlowToDiskPolicy : public QueuePolicy { public: - FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize); - bool checkLimit(const QueuedMessage&); + FlowToDiskPolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize); + bool checkLimit(boost::intrusive_ptr<Message> msg); }; class RingQueuePolicy : public QueuePolicy { public: - RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); + RingQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = RING); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); bool isEnqueued(const QueuedMessage&); - bool checkLimit(const QueuedMessage&); + bool checkLimit(boost::intrusive_ptr<Message> msg); + void getPendingDequeues(Messages& result); private: - typedef std::deque<QueuedMessage> Messages; - qpid::sys::Mutex lock; Messages pendingDequeues; Messages queue; const bool strict; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index bdd5f33601..7e3090bf17 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -65,7 +65,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) tagGenerator("sgen"), dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), - userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) + userID(getSession().getConnection().getUserId()) { acl = getSession().getBroker().getAcl(); } @@ -302,6 +302,18 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) return !blocked; } +namespace { +struct ConsumerName { + const SemanticState::ConsumerImpl& consumer; + ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} +}; + +ostream& operator<<(ostream& o, const ConsumerName& pc) { + return o << pc.consumer.getName() << " on " + << pc.consumer.getParent().getSession().getSessionId(); +} +} + void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; @@ -312,7 +324,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) if (byteCredit != 0xFFFFFFFF) { byteCredit -= msg->getRequiredCredit(); } - QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent + QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); @@ -320,15 +332,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent - << ", bytes: " << byteCredit << " msgs: " << msgCredit); - return false; - } else { - QPID_LOG(debug, "Credit available for '" << name << "' on " << parent - << " bytes: " << byteCredit << " msgs: " << msgCredit); - return true; - } + bool enoughCredit = msgCredit > 0 && + (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); + QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") + << ConsumerName(*this) + << ", have bytes: " << byteCredit << " msgs: " << msgCredit + << ", need " << msg->getRequiredCredit() << " bytes"); + return enoughCredit; } SemanticState::ConsumerImpl::~ConsumerImpl() {} @@ -356,6 +366,9 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { } else { DeliverableMessage deliverable(msg); route(msg, deliverable); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } } } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index da8383fc12..89fe7b83dd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -129,6 +129,7 @@ class SemanticState : private boost::noncopyable { const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } }; private: @@ -163,6 +164,7 @@ class SemanticState : private boost::noncopyable { ~SemanticState(); SessionContext& getSession() { return session; } + const SessionContext& getSession() const { return session; } ConsumerImpl& find(const std::string& destination); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index a1ad5a0a30..2ac6d66e62 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -337,6 +337,10 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw NotAllowedException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); } @@ -472,8 +476,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, AclModule* acl = getBroker().getAcl(); if (acl) - { - // add flags as needed + { if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) throw NotAllowedException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index cfdbd100c3..afbbb2cc22 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -28,7 +28,7 @@ #include "qpid/sys/OutputControl.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/OwnershipToken.h" - +#include "qpid/SessionId.h" #include <boost/noncopyable.hpp> @@ -45,6 +45,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; + virtual const SessionId& getSessionId() const = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 67fd4f4f38..eade93ddaa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -118,6 +118,8 @@ class SessionState : public qpid::SessionState, bool processSendCredit(uint32_t msgs); + const SessionId& getSessionId() const { return getId(); } + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.cpp b/qpid/cpp/src/qpid/broker/SignalHandler.cpp index f4a3822554..b565cfd419 100644 --- a/qpid/cpp/src/qpid/broker/SignalHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SignalHandler.cpp @@ -38,6 +38,8 @@ void SignalHandler::setBroker(const boost::intrusive_ptr<Broker>& b) { signal(SIGCHLD,SIG_IGN); } +void SignalHandler::shutdown() { shutdownHandler(0); } + void SignalHandler::shutdownHandler(int) { if (broker.get()) { broker->shutdown(); diff --git a/qpid/cpp/src/qpid/broker/SignalHandler.h b/qpid/cpp/src/qpid/broker/SignalHandler.h index d2cdfae07c..bbe831b61d 100644 --- a/qpid/cpp/src/qpid/broker/SignalHandler.h +++ b/qpid/cpp/src/qpid/broker/SignalHandler.h @@ -38,6 +38,9 @@ class SignalHandler /** Set the broker to be shutdown on signals */ static void setBroker(const boost::intrusive_ptr<Broker>& broker); + /** Initiate shut-down of broker */ + static void shutdown(); + private: static void shutdownHandler(int); static boost::intrusive_ptr<Broker> broker; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 6bf0b104ea..cb04742677 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -293,44 +293,23 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern) return q != qv.end(); } -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ +void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/) +{ Binding::vector mb; + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); PreRoute pr(msg, this); - uint32_t count(0); - { - RWlock::ScopedRlock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, routingKey)) { - Binding::vector& qv(i->second.bindingVector); - for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){ - mb.push_back(*j); + RWlock::ScopedRlock l(lock); + for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { + if (match(i->first, routingKey)) { + Binding::vector& qv(i->second.bindingVector); + for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ + b->push_back(*j); + } } } } - } - - for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) { - msg.deliverTo((*j)->queue); - if ((*j)->mgmtBinding != 0) - (*j)->mgmtBinding->inc_msgMatched (); - } - - if (mgmtExchange != 0) - { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - if (count == 0) - { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - else - { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } + doRoute(msg, b); } bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) diff --git a/qpid/cpp/src/qpid/broker/TxAccept.cpp b/qpid/cpp/src/qpid/broker/TxAccept.cpp index e47ac84990..928ac12c10 100644 --- a/qpid/cpp/src/qpid/broker/TxAccept.cpp +++ b/qpid/cpp/src/qpid/broker/TxAccept.cpp @@ -88,7 +88,13 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - ops.commit(); + try { + ops.commit(); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } void TxAccept::rollback() throw() {} diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp index 17b99fd883..4b083033ea 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.cpp +++ b/qpid/cpp/src/qpid/broker/TxPublish.cpp @@ -26,9 +26,14 @@ using namespace qpid::broker; TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {} -bool TxPublish::prepare(TransactionContext* ctxt) throw(){ +bool TxPublish::prepare(TransactionContext* ctxt) throw() +{ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg)); + while (!queues.empty()) { + prepare(ctxt, queues.front()); + prepared.push_back(queues.front()); + queues.pop_front(); + } return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -38,11 +43,30 @@ bool TxPublish::prepare(TransactionContext* ctxt) throw(){ return false; } -void TxPublish::commit() throw(){ - for_each(queues.begin(), queues.end(), Commit(msg)); +void TxPublish::commit() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Commit(msg)); + if (msg->checkContentReleasable()) { + msg->releaseContent(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to commit: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to commit (unknown error)"); + } } -void TxPublish::rollback() throw(){ +void TxPublish::rollback() throw() +{ + try { + for_each(prepared.begin(), prepared.end(), Rollback(msg)); + } catch (const std::exception& e) { + QPID_LOG(error, "Failed to complete rollback: " << e.what()); + } catch(...) { + QPID_LOG(error, "Failed to complete rollback (unknown error)"); + } + } void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ @@ -54,16 +78,14 @@ void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){ } } -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg) - : ctxt(_ctxt), msg(_msg){} - -void TxPublish::Prepare::operator()(const boost::shared_ptr<Queue>& queue){ +void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue) +{ if (!queue->enqueue(ctxt, msg)){ /** - * if not store then mark message for ack and deleivery once - * commit happens, as async IO will never set it when no store - * exists - */ + * if not store then mark message for ack and deleivery once + * commit happens, as async IO will never set it when no store + * exists + */ msg->enqueueComplete(); } } @@ -74,6 +96,12 @@ void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){ queue->process(msg); } +TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){} + +void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){ + queue->enqueueAborted(msg); +} + uint64_t TxPublish::contentSize () { return msg->contentSize (); diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h index d5cf5639c4..b6ab9767ab 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.h +++ b/qpid/cpp/src/qpid/broker/TxPublish.h @@ -47,23 +47,25 @@ namespace qpid { * dispatch or to be added to the in-memory queue. */ class TxPublish : public TxOp, public Deliverable{ - class Prepare{ - TransactionContext* ctxt; + + class Commit{ boost::intrusive_ptr<Message>& msg; public: - Prepare(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg); + Commit(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; - - class Commit{ + class Rollback{ boost::intrusive_ptr<Message>& msg; public: - Commit(boost::intrusive_ptr<Message>& msg); + Rollback(boost::intrusive_ptr<Message>& msg); void operator()(const boost::shared_ptr<Queue>& queue); }; boost::intrusive_ptr<Message> msg; std::list<Queue::shared_ptr> queues; + std::list<Queue::shared_ptr> prepared; + + void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); public: QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 9b2f662c8e..bb348675c6 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -257,6 +257,7 @@ void ConnectionHandler::openOk ( const Array& knownBrokers ) knownBrokersUrls.push_back(Url((*i)->get<std::string>())); if (sasl.get()) { securityLayer = sasl->getSecurityLayer(maxFrameSize); + operUserId = sasl->getUserId(); } setState(OPEN); QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index b1fd5be7c3..e9cc5194ae 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -71,6 +71,7 @@ class ConnectionHandler : private StateManager, std::auto_ptr<Sasl> sasl; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask; + std::string operUserId; void checkState(STATES s, const std::string& msg); @@ -120,6 +121,7 @@ public: std::vector<Url> knownBrokersUrls; static framing::connection::CloseCode convert(uint16_t replyCode); + const std::string& getUserId() const { return operUserId; } }; }} diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 45ad819ebd..c56d6a6807 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,6 +151,12 @@ void ConnectionImpl::open() handler.waitForOpen(); + // If the SASL layer has provided an "operational" userId for the connection, + // put it in the negotiated settings. + const std::string& userId(handler.getUserId()); + if (!userId.empty()) + handler.username = userId; + //enable security layer if one has been negotiated: std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index f69032b26d..fbb571d40a 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -51,10 +51,10 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } @@ -93,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -118,16 +118,17 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable void run(); void handleClosed(); bool closeInternal(); - + + void connected(const Socket&); + void connectFailed(const std::string& msg); bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort(); @@ -142,7 +143,6 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); - public: TCPConnector(framing::ProtocolVersion pVersion, @@ -163,6 +163,11 @@ namespace { } init; } +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) @@ -189,15 +194,19 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& /*e*/) { - socket.close(); - throw; - } - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + assert(joined); poller = Poller::shared_ptr(new Poller); + AsynchConnector::create(socket, + poller, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); + closed = false; + joined = false; + receiver = Thread(this); +} + +void TCPConnector::connected(const Socket&) { aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -205,16 +214,23 @@ void TCPConnector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); - closed = false; -} + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); -void TCPConnector::init(){ - Mutex::ScopedLock l(lock); - assert(joined); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - joined = false; - receiver = Thread(this); +} + +void TCPConnector::connectFailed(const std::string& msg) { + QPID_LOG(warning, "Connecting failed: " << msg); + closed = true; + poller->shutdown(); + closeInternal(); + if (shutdownHandler) + shutdownHandler->shutdown(); } bool TCPConnector::closeInternal() { @@ -235,7 +251,7 @@ bool TCPConnector::closeInternal() { receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -243,7 +259,13 @@ void TCPConnector::close() { void TCPConnector::abort() { // Can't abort a closed connection if (!closed) { - aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + } else { + // We're still connecting + connectFailed("Connection timedout"); + } } } @@ -288,18 +310,13 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -395,11 +412,6 @@ void TCPConnector::run() { try { Dispatcher d(poller); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); d.run(); } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 0aefcc04cf..0692c3d85c 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -167,20 +167,9 @@ void RdmaConnector::connect(const std::string& host, int port){ assert(joined); poller = Poller::shared_ptr(new Poller); - // This stuff needs to abstracted out of here to a platform specific file - ::addrinfo *res; - ::addrinfo hints; - hints.ai_flags = 0; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = 0; - int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res); - if (n<0) { - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); - } - + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); Rdma::Connector* c = new Rdma::Connector( - *res->ai_addr, + sa, Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), diff --git a/qpid/cpp/src/qpid/client/Sasl.h b/qpid/cpp/src/qpid/client/Sasl.h index 9dc5817f3d..d773609655 100644 --- a/qpid/cpp/src/qpid/client/Sasl.h +++ b/qpid/cpp/src/qpid/client/Sasl.h @@ -45,6 +45,7 @@ class Sasl virtual std::string start(const std::string& mechanisms) = 0; virtual std::string step(const std::string& challenge) = 0; virtual std::string getMechanism() = 0; + virtual std::string getUserId() = 0; virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; virtual ~Sasl() {} }; diff --git a/qpid/cpp/src/qpid/client/SaslFactory.cpp b/qpid/cpp/src/qpid/client/SaslFactory.cpp index 884f527f01..2258163ec8 100644 --- a/qpid/cpp/src/qpid/client/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/client/SaslFactory.cpp @@ -82,6 +82,7 @@ class CyrusSasl : public Sasl std::string start(const std::string& mechanisms); std::string step(const std::string& challenge); std::string getMechanism(); + std::string getUserId(); std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); private: sasl_conn_t* conn; @@ -266,6 +267,18 @@ std::string CyrusSasl::getMechanism() return mechanism; } +std::string CyrusSasl::getUserId() +{ + int propResult; + const void* operName; + + propResult = sasl_getprop(conn, SASL_USERNAME, &operName); + if (propResult == SASL_OK) + return std::string((const char*) operName); + + return std::string(); +} + void CyrusSasl::interact(sasl_interact_t* client_interact) { diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 8ead44a172..32541dceac 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -64,7 +64,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm proxy(ioHandler), nextIn(0), nextOut(0), - sendMsgCredit(0) + sendMsgCredit(0), + doClearDeliveryPropertiesExchange(true) { channel.next = connectionShared.get(); } @@ -396,11 +397,16 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // Client is not allowed to set the delivery-properties.exchange. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - + // doClearDeliveryPropertiesExchange is set by cluster update client so + // it can send messages with delivery-properties.exchange set. + // + if (doClearDeliveryPropertiesExchange) { + // Normal client is not allowed to set the delivery-properties.exchange + // so clear it here. + AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); + if (headerp && headerp->get<DeliveryProperties>()) + headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); + } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 49d268c44d..0624bb8b3c 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -130,6 +130,8 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); + void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + private: enum State { INACTIVE, @@ -243,6 +245,8 @@ private: // Only keep track of message credit sys::Semaphore* sendMsgCredit; + bool doClearDeliveryPropertiesExchange; + friend class client::SessionHandler; }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 9b9f06ec57..f51a96efd9 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -362,21 +362,12 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} -template <class T> void encode(qpid::messaging::Message& from) -{ - T codec; - from.encode(codec); - from.setContentType(T::contentType); -} - void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp void convert(qpid::messaging::Message& from, qpid::client::Message& to) { //TODO: need to avoid copying as much as possible - if (from.getContent().isList()) encode<ListCodec>(from); - if (from.getContent().isMap()) encode<MapCodec>(from); - to.setData(from.getBytes()); + to.setData(from.getContent()); to.getDeliveryProperties().setRoutingKey(from.getSubject()); //TODO: set other delivery properties to.getMessageProperties().setContentType(from.getContentType()); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index d22208368b..8e060c62d7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -269,18 +269,9 @@ void populate(qpid::messaging::Message& message, FrameSet& command) //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); - command.getContent(message.getBytes()); + command.getContent(message.getContent()); populateHeaders(message, command.getHeaders()); - - //decode content if necessary - if (message.getContentType() == ListCodec::contentType) { - ListCodec codec; - message.decode(codec); - } else if (message.getContentType() == MapCodec::contentType) { - MapCodec codec; - message.decode(codec); - } } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 716f955f98..cbc95b44fb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -33,24 +33,11 @@ namespace amqp0_10 { using qpid::messaging::Address; using qpid::messaging::MessageImplAccess; -template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to) -{ - T codec; - MessageImplAccess::get(from).getEncodedContent(codec, to.getData()); - to.getMessageProperties().setContentType(T::contentType); -} - void OutgoingMessage::convert(const qpid::messaging::Message& from) { //TODO: need to avoid copying as much as possible - if (from.getContent().isList()) { - encode<ListCodec>(from, message); - } else if (from.getContent().isMap()) { - encode<MapCodec>(from, message); - } else { - message.setData(from.getBytes()); - message.getMessageProperties().setContentType(from.getContentType()); - } + message.setData(from.getContent()); + message.getMessageProperties().setContentType(from.getContentType()); const Address& address = from.getReplyTo(); if (!address.value.empty()) { message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); diff --git a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp index 58956609a4..3a662463c1 100644 --- a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp @@ -43,6 +43,7 @@ class WindowsSasl : public Sasl std::string start(const std::string& mechanisms); std::string step(const std::string& challenge); std::string getMechanism(); + std::string getUserId(); std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); private: ConnectionSettings settings; @@ -131,6 +132,11 @@ std::string WindowsSasl::getMechanism() return mechanism; } +std::string WindowsSasl::getUserId() +{ + return std::string(); // TODO - when GSSAPI is supported, return userId for connection. +} + std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize) { return std::auto_ptr<SecurityLayer>(0); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index e35d3e4175..0706fc72e8 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -99,6 +99,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" @@ -120,7 +121,6 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> @@ -144,12 +144,16 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = ::qmf::org::apache::qpid::cluster; -/** NOTE: increment this number whenever any incompatible changes in +/** + * NOTE: must increment this number whenever any incompatible changes in * cluster protocol/behavior are made. It allows early detection and * sensible reporting of an attempt to mix different versions in a * cluster. + * + * Currently use SVN revision to avoid clashes with versions from + * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 2; +const uint32_t Cluster::CLUSTER_VERSION = 820783; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -308,7 +312,7 @@ void Cluster::leave(Lock&) { // Finalize connections now now to avoid problems later in destructor. LEAVE_TRY(localConnections.clear()); LEAVE_TRY(connections.clear()); - LEAVE_TRY(broker.shutdown()); + LEAVE_TRY(broker::SignalHandler::shutdown()); } } @@ -324,20 +328,14 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); deliverEvent(e); } -LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) - LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) - - void Cluster::deliverEvent(const Event& e) { - LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) - deliverEventQueue.push(e); +void Cluster::deliverEvent(const Event& e) { + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody())); deliverFrameQueue.push(e); } @@ -350,7 +348,6 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); EventFrame ef(e, e.getFrame()); @@ -396,13 +393,9 @@ void Cluster::flagError( error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } -LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) - // Handler for deliverFrameQueue. // This thread executes the main logic. - void Cluster::deliveredFrame(const EventFrame& efConst) { - LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); - LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); +void Cluster::deliveredFrame(const EventFrame& efConst) { Mutex::ScopedLock l(lock); if (state == LEFT) return; EventFrame e(efConst); @@ -434,7 +427,6 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp index 35be055d06..5b7011047b 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -45,7 +45,8 @@ ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) { } void ErrorCheck::error( - Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg) + Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, + const std::string& msg) { // Detected a local error, inform cluster and set error state. assert(t != ERROR_TYPE_NONE); // Must be an error. @@ -54,10 +55,11 @@ void ErrorCheck::error( unresolved = ms; frameSeq = seq; connection = &c; - QPID_LOG(error, cluster - << (type == ERROR_TYPE_SESSION ? " channel" : " connection") - << " error " << frameSeq << " on " << c << ": " << msg - << " must be resolved with: " << unresolved); + message = msg; + QPID_LOG(debug, cluster<< (type == ERROR_TYPE_SESSION ? " channel" : " connection") + << " error " << frameSeq << " on " << c + << " must be resolved with: " << unresolved + << ": " << message); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); // If there are already frames queued up by a previous error, review @@ -84,13 +86,15 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error next = frames.erase(i); // Drop matching error check controls if (errorCheck->getType() < type) { // my error is worse than his - QPID_LOG(critical, cluster << " error " << frameSeq - << " did not occur on " << i->getMemberId()); - throw Exception(QPID_MSG("Error " << frameSeq - << " did not occur on all members")); + QPID_LOG(critical, cluster + << " local error " << frameSeq << " did not occur on member " + << i->getMemberId() + << ": " << message); + throw Exception( + QPID_MSG("local error did not occur on all cluster members " << ": " << message)); } else { // his error is worse/same as mine. - QPID_LOG(info, cluster << " error " << frameSeq + QPID_LOG(debug, cluster << " error " << frameSeq << " resolved with " << i->getMemberId()); unresolved.erase(i->getMemberId()); checkResolved(); @@ -128,10 +132,10 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& void ErrorCheck::checkResolved() { if (unresolved.empty()) { // No more potentially conflicted members, we're clear. type = ERROR_TYPE_NONE; - QPID_LOG(info, cluster << " error " << frameSeq << " resolved."); + QPID_LOG(debug, cluster << " error " << frameSeq << " resolved."); } else - QPID_LOG(info, cluster << " error " << frameSeq + QPID_LOG(debug, cluster << " error " << frameSeq << " must be resolved with " << unresolved); } @@ -146,7 +150,7 @@ void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::Sequen // Don't respond to non-errors or to my own errors. if (type == ERROR_TYPE_NONE || from == cluster.getId()) return; - QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally."); + QPID_LOG(debug, cluster << " error " << frameSeq << " did not occur locally."); mcast.mcastControl( ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), cluster.getId() diff --git a/qpid/cpp/src/qpid/cluster/ErrorCheck.h b/qpid/cpp/src/qpid/cluster/ErrorCheck.h index 09028391ac..c975b9af64 100644 --- a/qpid/cpp/src/qpid/cluster/ErrorCheck.h +++ b/qpid/cpp/src/qpid/cluster/ErrorCheck.h @@ -84,6 +84,7 @@ class ErrorCheck SequenceNumber frameSeq; ErrorType type; Connection* connection; + std::string message; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index 30866d3154..4831e7eabe 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -38,9 +38,6 @@ const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. sizeof(uint32_t) // payload size -#ifdef QPID_LATENCY_METRIC - + sizeof(int64_t) // timestamp -#endif ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) @@ -61,9 +58,6 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { throw Exception("Invalid multicast event type"); connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); -#ifdef QPID_LATENCY_METRIC - latency_metric_timestamp = buf.getLongLong(); -#endif } Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { @@ -97,9 +91,6 @@ void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(connectionId.getNumber()); b.putLong(size); -#ifdef QPID_LATENCY_METRIC - b.putLongLong(latency_metric_timestamp); -#endif } // Encode my header in my buffer. diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/Multicaster.cpp index 7e97963318..72fc1533f8 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.cpp +++ b/qpid/cpp/src/qpid/cluster/Multicaster.cpp @@ -31,9 +31,6 @@ namespace cluster { Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : -#if defined (QPID_LATENCY_TRACKER) - cpgLatency("CPG"), -#endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), holding(true) @@ -61,7 +58,6 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); - LATENCY_TRACK(cpgLatency.start()); if (e.isConnection() && holding) { holdingQueue.push_back(e); return; diff --git a/qpid/cpp/src/qpid/cluster/Multicaster.h b/qpid/cpp/src/qpid/cluster/Multicaster.h index f2ee5099bb..c1a0ddffc6 100644 --- a/qpid/cpp/src/qpid/cluster/Multicaster.h +++ b/qpid/cpp/src/qpid/cluster/Multicaster.h @@ -26,7 +26,6 @@ #include "qpid/cluster/Event.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <deque> @@ -58,8 +57,6 @@ class Multicaster /** End holding mode, held events are mcast */ void release(); - LATENCY_TRACK(sys::LatencyCounter cpgLatency;) - private: typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index cb8f01386c..cb75fe5561 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/current_function.hpp> @@ -40,16 +39,9 @@ OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false) {} -#if defined QPID_LATENCY_TRACKER -extern sys::LatencyTracker<const AMQBody*> doOutputTracker; -#endif - void OutputInterceptor::send(framing::AMQFrame& f) { - LATENCY_TRACK(doOutputTracker.finish(f.getBody())); - { - sys::Mutex::ScopedLock l(lock); - next->send(f); - } + sys::Mutex::ScopedLock l(lock); + next->send(f); } void OutputInterceptor::activateOutput() { diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 2e557f2ab6..d6df8bd5ac 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -209,9 +209,16 @@ class MessageUpdater { ClusterConnectionProxy(session).expiryId(*expiryId); } + // We can't send a broker::Message via the normal client API, + // and it would be expensive to copy it into a client::Message + // so we go a bit under the client API covers here. + // SessionBase_0_10Access sb(session); + // Disable client code that clears the delivery-properties.exchange + sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, + message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ diff --git a/qpid/cpp/src/qpid/cluster/UpdateExchange.h b/qpid/cpp/src/qpid/cluster/UpdateExchange.h index 194a3d386d..00a92c7f1e 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateExchange.h +++ b/qpid/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,7 +30,7 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify deliver-properties.exchange + * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange * on messages. */ class UpdateExchange : public broker::FanOutExchange diff --git a/qpid/cpp/src/qpid/messaging/ListContent.cpp b/qpid/cpp/src/qpid/messaging/ListContent.cpp new file mode 100644 index 0000000000..0c3ca5fc62 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ListContent.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ListContent.h" +#include "qpid/messaging/Message.h" +#include "qpid/client/amqp0_10/Codecs.h" + +namespace qpid { +namespace messaging { + +class ListContentImpl : public Variant +{ + Message* msg; + public: + ListContentImpl(Message& m) : Variant(Variant::List()), msg(&m) + { + if (msg->getContent().size()) { + qpid::client::amqp0_10::ListCodec codec; + codec.decode(msg->getContent(), *this); + } + } + + void encode() + { + qpid::client::amqp0_10::ListCodec codec; + codec.encode(*this, msg->getContent()); + } +}; + +ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {} +ListContent::~ListContent() { delete impl; } +ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; } + +ListContent::const_iterator ListContent::begin() const { return impl->asList().begin(); } +ListContent::const_iterator ListContent::end() const { return impl->asList().end(); } +ListContent::const_reverse_iterator ListContent::rbegin() const { return impl->asList().rbegin(); } +ListContent::const_reverse_iterator ListContent::rend() const { return impl->asList().rend(); } + +ListContent::iterator ListContent::begin() { return impl->asList().begin(); } +ListContent::iterator ListContent::end() { return impl->asList().end(); } +ListContent::reverse_iterator ListContent::rbegin() { return impl->asList().rbegin(); } +ListContent::reverse_iterator ListContent::rend() { return impl->asList().rend(); } + +bool ListContent::empty() const { return impl->asList().empty(); } +size_t ListContent::size() const { return impl->asList().size(); } + +const Variant& ListContent::front() const { return impl->asList().front(); } +Variant& ListContent::front() { return impl->asList().front(); } +const Variant& ListContent::back() const { return impl->asList().back(); } +Variant& ListContent::back() { return impl->asList().back(); } + +void ListContent::push_front(const Variant& v) { impl->asList().push_front(v); } +void ListContent::push_back(const Variant& v) { impl->asList().push_back(v); } + +void ListContent::pop_front() { impl->asList().pop_front(); } +void ListContent::pop_back() { impl->asList().pop_back(); } + +ListContent::iterator ListContent::insert(iterator position, const Variant& v) +{ + return impl->asList().insert(position, v); +} +void ListContent::insert(iterator position, size_t n, const Variant& v) +{ + impl->asList().insert(position, n, v); +} +ListContent::iterator ListContent::erase(iterator position) { return impl->asList().erase(position); } +ListContent::iterator ListContent::erase(iterator first, iterator last) { return impl->asList().erase(first, last); } +void ListContent::clear() { impl->asList().clear(); } + +void ListContent::encode() { impl->encode(); } + +const Variant::List& ListContent::asList() const { return impl->asList(); } +Variant::List& ListContent::asList() { return impl->asList(); } + +std::ostream& operator<<(std::ostream& out, const ListContent& m) +{ + out << m.asList(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/ListView.cpp b/qpid/cpp/src/qpid/messaging/ListView.cpp new file mode 100644 index 0000000000..b717d157fa --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/ListView.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/ListView.h" +#include "qpid/messaging/Message.h" +#include "qpid/client/amqp0_10/Codecs.h" + +namespace qpid { +namespace messaging { + +class ListViewImpl : public Variant +{ + public: + ListViewImpl(const Message& msg) : Variant(Variant::List()) + { + if (msg.getContent().size()) { + qpid::client::amqp0_10::ListCodec codec; + codec.decode(msg.getContent(), *this); + } + } +}; + +ListView::ListView(const Message& m) :impl(new ListViewImpl(m)) {} +ListView::~ListView() { delete impl; } +ListView& ListView::operator=(const ListView& l) { *impl = *l.impl; return *this; } + +ListView::const_iterator ListView::begin() const { return impl->asList().begin(); } +ListView::const_iterator ListView::end() const { return impl->asList().end(); } +ListView::const_reverse_iterator ListView::rbegin() const { return impl->asList().rbegin(); } +ListView::const_reverse_iterator ListView::rend() const { return impl->asList().rend(); } + +bool ListView::empty() const { return impl->asList().empty(); } +size_t ListView::size() const { return impl->asList().size(); } + +const Variant& ListView::front() const { return impl->asList().front(); } +const Variant& ListView::back() const { return impl->asList().back(); } + +const Variant::List& ListView::asList() const { return impl->asList(); } + +std::ostream& operator<<(std::ostream& out, const ListView& m) +{ + out << m.asList(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MapContent.cpp b/qpid/cpp/src/qpid/messaging/MapContent.cpp new file mode 100644 index 0000000000..c653561fc9 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MapContent.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/MapContent.h" +#include "qpid/messaging/Message.h" +#include "qpid/client/amqp0_10/Codecs.h" + +namespace qpid { +namespace messaging { + +class MapContentImpl : public Variant +{ + Message* msg; + public: + MapContentImpl(Message& m) : Variant(Variant::Map()), msg(&m) + { + if (msg->getContent().size()) { + qpid::client::amqp0_10::MapCodec codec; + codec.decode(msg->getContent(), *this); + } + } + + void encode() + { + qpid::client::amqp0_10::MapCodec codec; + codec.encode(*this, msg->getContent()); + } +}; + +MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {} +MapContent::~MapContent() { delete impl; } +MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; } + +MapContent::const_iterator MapContent::begin() const { return impl->asMap().begin(); } +MapContent::const_iterator MapContent::end() const { return impl->asMap().end(); } +MapContent::const_reverse_iterator MapContent::rbegin() const { return impl->asMap().rbegin(); } +MapContent::const_reverse_iterator MapContent::rend() const { return impl->asMap().rend(); } +MapContent::iterator MapContent::begin() { return impl->asMap().begin(); } +MapContent::iterator MapContent::end() { return impl->asMap().end(); } +MapContent::reverse_iterator MapContent::rbegin() { return impl->asMap().rbegin(); } +MapContent::reverse_iterator MapContent::rend() { return impl->asMap().rend(); } + +bool MapContent::empty() const { return impl->asMap().empty(); } +size_t MapContent::size() const { return impl->asMap().size(); } + +MapContent::const_iterator MapContent::find(const key_type& key) const { return impl->asMap().find(key); } +MapContent::iterator MapContent::find(const key_type& key) { return impl->asMap().find(key); } +const Variant& MapContent::operator[](const key_type& key) const { return impl->asMap()[key]; } +Variant& MapContent::operator[](const key_type& key) { return impl->asMap()[key]; } + +std::pair<MapContent::iterator,bool> MapContent::insert(const value_type& item) { return impl->asMap().insert(item); } +MapContent::iterator MapContent::insert(iterator position, const value_type& item) { return impl->asMap().insert(position, item); } +void MapContent::erase(iterator position) { impl->asMap().erase(position); } +void MapContent::erase(iterator first, iterator last) { impl->asMap().erase(first, last); } +size_t MapContent::erase(const key_type& key) { return impl->asMap().erase(key); } +void MapContent::clear() { impl->asMap().clear(); } + +void MapContent::encode() { impl->encode(); } + +const std::map<MapContent::key_type, Variant>& MapContent::asMap() const { return impl->asMap(); } +std::map<MapContent::key_type, Variant>& MapContent::asMap() { return impl->asMap(); } + + +std::ostream& operator<<(std::ostream& out, const MapContent& m) +{ + out << m.asMap(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MapView.cpp b/qpid/cpp/src/qpid/messaging/MapView.cpp new file mode 100644 index 0000000000..ffa6e91a16 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/MapView.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/MapView.h" +#include "qpid/messaging/Message.h" +#include "qpid/client/amqp0_10/Codecs.h" + +namespace qpid { +namespace messaging { + +class MapViewImpl : public Variant +{ + public: + MapViewImpl(const Message& msg) : Variant(Variant::Map()) + { + if (msg.getContent().size()) { + qpid::client::amqp0_10::MapCodec codec; + codec.decode(msg.getContent(), *this); + } + } +}; + +MapView::MapView(const Message& m) : impl(new MapViewImpl(m)) {} +MapView::~MapView() { delete impl; } +MapView& MapView::operator=(const MapView& m) { *impl = *m.impl; return *this; } + +MapView::const_iterator MapView::begin() const { return impl->asMap().begin(); } +MapView::const_iterator MapView::end() const { return impl->asMap().end(); } +MapView::const_reverse_iterator MapView::rbegin() const { return impl->asMap().rbegin(); } +MapView::const_reverse_iterator MapView::rend() const { return impl->asMap().rend(); } + +bool MapView::empty() const { return impl->asMap().empty(); } +size_t MapView::size() const { return impl->asMap().size(); } + +MapView::const_iterator MapView::find(const key_type& key) const { return impl->asMap().find(key); } +const Variant& MapView::operator[](const key_type& key) const { return impl->asMap()[key]; } + +const std::map<MapView::key_type, Variant>& MapView::asMap() const { return impl->asMap(); } + +std::ostream& operator<<(std::ostream& out, const MapView& m) +{ + out << m.asMap(); + return out; +} + +}} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Message.cpp b/qpid/cpp/src/qpid/messaging/Message.cpp index 1d844b3027..fb4e800eaa 100644 --- a/qpid/cpp/src/qpid/messaging/Message.cpp +++ b/qpid/cpp/src/qpid/messaging/Message.cpp @@ -27,7 +27,7 @@ namespace messaging { Message::Message(const std::string& bytes) : impl(new MessageImpl(bytes)) {} Message::Message(const char* bytes, size_t count) : impl(new MessageImpl(bytes, count)) {} -Message::Message(const Message& m) : impl(new MessageImpl(m.getBytes())) {} +Message::Message(const Message& m) : impl(new MessageImpl(m.getContent())) {} Message::~Message() { delete impl; } Message& Message::operator=(const Message& m) { *impl = *m.impl; return *this; } @@ -44,27 +44,15 @@ const std::string& Message::getContentType() const { return impl->getContentType const VariantMap& Message::getHeaders() const { return impl->getHeaders(); } VariantMap& Message::getHeaders() { return impl->getHeaders(); } -void Message::setBytes(const std::string& c) { impl->setBytes(c); } -void Message::setBytes(const char* chars, size_t count) { impl->setBytes(chars, count); } -const std::string& Message::getBytes() const { return impl->getBytes(); } -std::string& Message::getBytes() { return impl->getBytes(); } +void Message::setContent(const std::string& c) { impl->setBytes(c); } +void Message::setContent(const char* chars, size_t count) { impl->setBytes(chars, count); } +const std::string& Message::getContent() const { return impl->getBytes(); } +std::string& Message::getContent() { return impl->getBytes(); } -const char* Message::getRawContent() const { return impl->getBytes().data(); } -size_t Message::getContentSize() const { return impl->getBytes().size(); } - -MessageContent& Message::getContent() { return *impl; } -const MessageContent& Message::getContent() const { return *impl; } -void Message::setContent(const std::string& s) { *impl = s; } -void Message::setContent(const Variant::Map& m) { *impl = m; } -void Message::setContent(const Variant::List& l) { *impl = l; } - -void Message::encode(Codec& codec) { impl->encode(codec); } - -void Message::decode(Codec& codec) { impl->decode(codec); } - -std::ostream& operator<<(std::ostream& out, const MessageContent& content) +void Message::getContent(std::pair<const char*, size_t>& content) const { - return content.print(out); + content.first = impl->getBytes().data(); + content.second = impl->getBytes().size(); } }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp index 5df9218e03..e17fccd64f 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -28,8 +28,8 @@ namespace { const std::string EMPTY_STRING = ""; } -MessageImpl::MessageImpl(const std::string& c) : bytes(c), type(VAR_VOID), internalId(0) {} -MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), type(VAR_VOID), internalId(0) {} +MessageImpl::MessageImpl(const std::string& c) : bytes(c), internalId(0) {} +MessageImpl::MessageImpl(const char* chars, size_t count) : bytes(chars, count), internalId(0) {} void MessageImpl::setReplyTo(const Address& d) { replyTo = d; } const Address& MessageImpl::getReplyTo() const { return replyTo; } @@ -44,155 +44,14 @@ const VariantMap& MessageImpl::getHeaders() const { return headers; } VariantMap& MessageImpl::getHeaders() { return headers; } //should these methods be on MessageContent? -void MessageImpl::setBytes(const std::string& c) { clear(); bytes = c; } -void MessageImpl::setBytes(const char* chars, size_t count) { clear(); bytes.assign(chars, count); } +void MessageImpl::setBytes(const std::string& c) { bytes = c; } +void MessageImpl::setBytes(const char* chars, size_t count) { bytes.assign(chars, count); } const std::string& MessageImpl::getBytes() const { return bytes; } std::string& MessageImpl::getBytes() { return bytes; } - -Variant& MessageImpl::operator[](const std::string& key) { return asMap()[key]; } - -std::ostream& MessageImpl::print(std::ostream& out) const -{ - if (type == VAR_MAP) { - return out << content.asMap(); - } else if (type == VAR_LIST) { - return out << content.asList(); - } else { - return out << bytes; - } -} - -template <class T> MessageContent& MessageImpl::append(T& t) -{ - if (type == VAR_VOID) { - //TODO: this is inefficient, probably want to hold on to the stream object - std::stringstream s; - s << bytes; - s << t; - bytes = s.str(); - } else if (type == VAR_LIST) { - content.asList().push_back(Variant(t)); - } else { - throw InvalidConversion("<< operator only valid on strings and lists"); - } - return *this; -} - -MessageContent& MessageImpl::operator<<(const std::string& v) { return append(v); } -MessageContent& MessageImpl::operator<<(const char* v) { return append(v); } -MessageContent& MessageImpl::operator<<(bool v) { return append(v); } -MessageContent& MessageImpl::operator<<(int8_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(int16_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(int32_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(int64_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(uint8_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(uint16_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(uint32_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(uint64_t v) { return append(v); } -MessageContent& MessageImpl::operator<<(double v) { return append(v); } -MessageContent& MessageImpl::operator<<(float v) { return append(v); } -MessageContent& MessageImpl::operator=(const std::string& s) -{ - type = VAR_VOID; - bytes = s; - return *this; -} -MessageContent& MessageImpl::operator=(const char* c) -{ - type = VAR_VOID; - bytes = c; - return *this; -} -MessageContent& MessageImpl::operator=(const Variant::Map& m) -{ - type = VAR_MAP; - content = m; - return *this; -} - -MessageContent& MessageImpl::operator=(const Variant::List& l) -{ - type = VAR_LIST; - content = l; - return *this; -} - -void MessageImpl::encode(Codec& codec) -{ - if (content.getType() != VAR_VOID) { - bytes = EMPTY_STRING; - codec.encode(content, bytes); - } -} - -void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const -{ - if (content.getType() != VAR_VOID) { - codec.encode(content, out); - } else { - out = bytes; - } -} - -void MessageImpl::decode(Codec& codec) -{ - codec.decode(bytes, content); - if (content.getType() == VAR_MAP) type = VAR_MAP; - else if (content.getType() == VAR_LIST) type = VAR_LIST; - else type = VAR_VOID;//TODO: what if codec set some type other than map or list?? -} - void MessageImpl::setInternalId(qpid::framing::SequenceNumber i) { internalId = i; } qpid::framing::SequenceNumber MessageImpl::getInternalId() { return internalId; } -bool MessageImpl::isVoid() const { return type == VAR_VOID; } - -const std::string& MessageImpl::asString() const -{ - if (isVoid()) return getBytes(); - else return content.getString();//will throw an error -} -std::string& MessageImpl::asString() -{ - if (isVoid()) return getBytes(); - else return content.getString();//will throw an error -} - -const char* MessageImpl::asChars() const -{ - if (!isVoid()) throw InvalidConversion("Content is of structured type."); - return bytes.data(); -} -size_t MessageImpl::size() const -{ - return bytes.size(); -} - -const Variant::Map& MessageImpl::asMap() const { return content.asMap(); } -Variant::Map& MessageImpl::asMap() -{ - if (isVoid()) { - content = Variant::Map(); - type = VAR_MAP; - } - return content.asMap(); -} -bool MessageImpl::isMap() const { return type == VAR_MAP; } - -const Variant::List& MessageImpl::asList() const { return content.asList(); } -Variant::List& MessageImpl::asList() -{ - if (isVoid()) { - content = Variant::List(); - type = VAR_LIST; - } - return content.asList(); -} -bool MessageImpl::isList() const { return type == VAR_LIST; } - -void MessageImpl::clear() { bytes = EMPTY_STRING; content.reset(); type = VAR_VOID; } - MessageImpl& MessageImplAccess::get(Message& msg) { return *msg.impl; diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h index 1173e7570a..4939cdc5cc 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.h +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -22,15 +22,13 @@ * */ #include "qpid/messaging/Address.h" -#include "qpid/messaging/Codec.h" -#include "qpid/messaging/MessageContent.h" #include "qpid/messaging/Variant.h" #include "qpid/framing/SequenceNumber.h" namespace qpid { namespace messaging { -struct MessageImpl : MessageContent +struct MessageImpl { Address replyTo; std::string subject; @@ -38,8 +36,6 @@ struct MessageImpl : MessageContent Variant::Map headers; std::string bytes; - Variant content;//used only for LIST and MAP - VariantType type;//if LIST, MAP content holds the value; if VOID bytes holds the value qpid::framing::SequenceNumber internalId; @@ -66,54 +62,6 @@ struct MessageImpl : MessageContent void setInternalId(qpid::framing::SequenceNumber id); qpid::framing::SequenceNumber getInternalId(); - bool isVoid() const; - - const std::string& asString() const; - std::string& asString(); - - const char* asChars() const; - size_t size() const; - - const Variant::Map& asMap() const; - Variant::Map& asMap(); - bool isMap() const; - - const Variant::List& asList() const; - Variant::List& asList(); - bool isList() const; - - void clear(); - - void getEncodedContent(Codec& codec, std::string&) const; - void encode(Codec& codec); - void decode(Codec& codec); - - Variant& operator[](const std::string&); - - std::ostream& print(std::ostream& out) const; - - //operator<< for variety of types... - MessageContent& operator<<(const std::string&); - MessageContent& operator<<(const char*); - MessageContent& operator<<(bool); - MessageContent& operator<<(int8_t); - MessageContent& operator<<(int16_t); - MessageContent& operator<<(int32_t); - MessageContent& operator<<(int64_t); - MessageContent& operator<<(uint8_t); - MessageContent& operator<<(uint16_t); - MessageContent& operator<<(uint32_t); - MessageContent& operator<<(uint64_t); - MessageContent& operator<<(double); - MessageContent& operator<<(float); - - //assignment from string, map and list - MessageContent& operator=(const std::string&); - MessageContent& operator=(const char*); - MessageContent& operator=(const Variant::Map&); - MessageContent& operator=(const Variant::List&); - - template <class T> MessageContent& append(T& t); }; class Message; diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index fb02183359..419770568a 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -57,7 +57,7 @@ public: class AsynchConnector { public: typedef boost::function1<void, const Socket&> ConnectedCallback; - typedef boost::function2<void, int, std::string> FailedCallback; + typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback; // Call create() to allocate a new AsynchConnector object with the // specified poller, addressing, and callbacks. @@ -70,7 +70,7 @@ public: std::string hostname, uint16_t port, ConnectedCallback connCb, - FailedCallback failCb = 0); + FailedCallback failCb); protected: AsynchConnector() {} @@ -108,7 +108,7 @@ class AsynchIO { public: typedef AsynchIOBufferBase BufferBase; - typedef boost::function2<bool, AsynchIO&, BufferBase*> ReadCallback; + typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback; typedef boost::function1<void, AsynchIO&> EofCallback; typedef boost::function1<void, AsynchIO&> DisconnectCallback; typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 8094abd43d..eb0f213547 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -103,10 +103,31 @@ void AsynchIOHandler::giveReadCredit(int32_t credit) { aio->startReading(); } -bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { - return false; + return; + } + + // Check here for read credit + if (readCredit.get() != InfiniteCredit) { + if (readCredit.get() == 0) { + // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". + // readbuff is sometimes called with no credit. + // This should be fixed somewhere else to avoid such calls. + aio->unread(buff); + return; + } + // TODO In theory should be able to use an atomic operation before taking the lock + // but in practice there seems to be an unexplained race in that case + ScopedLock<Mutex> l(creditLock); + if (--readCredit == 0) { + assert(readCredit.get() >= 0); + if (readCredit.get() == 0) { + aio->stopReading(); + } + } } + size_t decoded = 0; if (codec) { // Already initiated try { @@ -149,20 +170,6 @@ bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { // Give whole buffer back to aio subsystem aio->queueReadBuffer(buff); } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - return false; - } - } - } - return true; } void AsynchIOHandler::eof(AsynchIO&) { diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index 9785f445a4..e1885bac79 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -65,7 +65,7 @@ class AsynchIOHandler : public OutputControl { QPID_COMMON_EXTERN void giveReadCredit(int32_t credit); // Input side - QPID_COMMON_EXTERN bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); + QPID_COMMON_EXTERN void readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); QPID_COMMON_EXTERN void eof(AsynchIO& aio); QPID_COMMON_EXTERN void disconnect(AsynchIO& aio); diff --git a/qpid/cpp/src/qpid/sys/LatencyTracker.h b/qpid/cpp/src/qpid/sys/LatencyTracker.h deleted file mode 100644 index 3294528ff6..0000000000 --- a/qpid/cpp/src/qpid/sys/LatencyTracker.h +++ /dev/null @@ -1,157 +0,0 @@ -#ifndef QPID_SYS_LATENCYTRACKER_H -#define QPID_SYS_LATENCYTRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include <string> -#include <limits> -#include <map> - -namespace qpid { -namespace sys { - -/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS. - * Uses should be compiled only if QPID_LATENCY_TRACKER is defined. - * See the convenience macros at the end of this file. - */ - -/** Used by LatencyCounter and LatencyTracker below */ -class LatencyStatistic { - public: - LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {} - ~LatencyStatistic() { print(); } - - void record(Duration d) { - total += d; - ++count; - if (d > max) max=d; - if (d < min) min=d; - } - - void print() { - if (count) { - double meanMsec = (double(total)/count)/TIME_MSEC; - printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC); - } - else - printf("\n==== Latency metric %s: no samples.\n", name.c_str()); - } - - private: - std::string name; - unsigned long count; - int64_t total, min, max; -}; - -/** Measure delay between seeing the same value at start and finish. */ -template <class T> class LatencyTracker { - public: - LatencyTracker(std::string name) : measuring(false), stat(name) {} - - void start(T value) { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = value; - measuring = true; - startTime = AbsTime::now(); - } - } - - void finish(T value) { - sys::Mutex::ScopedLock l(lock); - if(measuring && measureAt == value) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - } - - private: - sys::Mutex lock; - bool measuring; - T measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - - -/** Measures delay between the nth call to start and the nth call to finish. - * E.g. to measure latency between sending & receiving an ordered stream of messages. - */ -class LatencyCounter { - public: - LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {} - - void start() { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = startCount; - measuring = true; - startTime = AbsTime::now(); - } - ++startCount; - } - - void finish() { - sys::Mutex::ScopedLock l(lock); - if (measuring && measureAt == finishCount) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - ++finishCount; - } - - private: - sys::Mutex lock; - bool measuring; - uint64_t startCount, finishCount, measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - -/** Measures time spent in a scope. */ -class LatencyScope { - public: - LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {} - - ~LatencyScope() { - sys::Mutex::ScopedLock l(lock); - stat.record(Duration(startTime, AbsTime::now())); - } - - private: - sys::Mutex lock; - LatencyStatistic& stat; - AbsTime startTime; -}; - - -/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */ - -#if defined(QPID_LATENCY_TRACKER) -#define LATENCY_TRACK(X) X -#else -#define LATENCY_TRACK(X) -#endif -}} // namespace qpid::sys - -#endif /*!QPID_SYS_LATENCYTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 6eafb6cf0b..28ff140237 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/OutputControl.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> #include <memory> #include <netdb.h> @@ -304,8 +305,9 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F sin.sin_port = htons(listeningPort); sin.sin_addr.s_addr = INADDR_ANY; + SocketAddress sa("",boost::lexical_cast<std::string>(listeningPort)); listener.reset( - new Rdma::Listener((const sockaddr&)(sin), + new Rdma::Listener(sa, Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), @@ -331,24 +333,14 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, int16_t p, + const std::string& host, int16_t port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { - ::addrinfo *res; - ::addrinfo hints = {}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - stringstream ss; ss << p; - string port = ss.str(); - int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); - if (n<0) { - throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n))); - } - + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); Rdma::Connector* c = new Rdma::Connector( - *res->ai_addr, + sa, Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index f389e99cb8..d108402682 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -31,6 +31,7 @@ namespace qpid { namespace sys { class Duration; +class SocketAddress; class Socket : public IOHandle { @@ -48,6 +49,7 @@ public: void setNonblocking() const; QPID_COMMON_EXTERN void connect(const std::string& host, uint16_t port) const; + QPID_COMMON_EXTERN void connect(const SocketAddress&) const; QPID_COMMON_EXTERN void close() const; diff --git a/qpid/cpp/src/qpid/sys/SocketAddress.h b/qpid/cpp/src/qpid/sys/SocketAddress.h new file mode 100644 index 0000000000..fcb9c81d43 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/SocketAddress.h @@ -0,0 +1,51 @@ +#ifndef _sys_SocketAddress_h +#define _sys_SocketAddress_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +struct addrinfo; + +namespace qpid { +namespace sys { + +class SocketAddress { + friend const ::addrinfo& getAddrInfo(const SocketAddress&); + +public: + /** Create a SocketAddress from hostname and port*/ + QPID_COMMON_EXTERN SocketAddress(const std::string& host, const std::string& port); + QPID_COMMON_EXTERN ~SocketAddress(); + + std::string asString() const; + +private: + std::string host; + std::string port; + ::addrinfo* addrInfo; +}; + +}} +#endif /*!_sys_SocketAddress_h*/ diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index b456beb098..3377be98f1 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -46,7 +46,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory { void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*, - boost::function2<void, int, std::string> failed); + ConnectFailedCallback); uint16_t getPort() const; std::string getHost() const; @@ -54,6 +54,7 @@ class AsynchIOProtocolFactory : public ProtocolFactory { private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback); }; // Static instance to initialise plugin @@ -118,6 +119,15 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, acceptor->start(poller); } +void AsynchIOProtocolFactory::connectFailed( + const Socket& s, int ec, const std::string& emsg, + ConnectFailedCallback failedCb) +{ + failedCb(ec, emsg); + s.close(); + delete &s; +} + void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, @@ -131,13 +141,14 @@ void AsynchIOProtocolFactory::connect( // is no longer needed. Socket* socket = new Socket(); - AsynchConnector::create (*socket, - poller, - host, - port, - boost::bind(&AsynchIOProtocolFactory::established, - this, poller, _1, fact, true), - failed); + AsynchConnector::create(*socket, + poller, + host, + port, + boost::bind(&AsynchIOProtocolFactory::established, + this, poller, _1, fact, true), + boost::bind(&AsynchIOProtocolFactory::connectFailed, + this, _1, _2, _3, failed)); } }} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 8545ebd9cb..c042dcef01 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -21,6 +21,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Time.h" @@ -37,6 +38,7 @@ #include <string.h> #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> using namespace qpid::sys; @@ -161,11 +163,12 @@ class AsynchConnector : public qpid::sys::AsynchConnector, private: void connComplete(DispatchHandle& handle); - void failure(int, std::string); + void failure(int, const std::string&); private: ConnectedCallback connCallback; FailedCallback failCallback; + std::string errMsg; const Socket& socket; public: @@ -174,7 +177,7 @@ public: std::string hostname, uint16_t port, ConnectedCallback connCb, - FailedCallback failCb = 0); + FailedCallback failCb); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -192,12 +195,17 @@ AsynchConnector::AsynchConnector(const Socket& s, socket(s) { socket.setNonblocking(); + SocketAddress sa(hostname, boost::lexical_cast<std::string>(port)); try { - socket.connect(hostname, port); - startWatch(poller); + socket.connect(sa); } catch(std::exception& e) { - failure(-1, std::string(e.what())); + // Defer reporting failure + startWatch(poller); + errMsg = e.what(); + DispatchHandle::call(boost::bind(&AsynchConnector::failure, this, -1, errMsg)); + return; } + startWatch(poller); } void AsynchConnector::connComplete(DispatchHandle& h) @@ -209,17 +217,13 @@ void AsynchConnector::connComplete(DispatchHandle& h) connCallback(socket); DispatchHandle::doDelete(); } else { - failure(errCode, std::string(strError(errCode))); + failure(errCode, strError(errCode)); } } -void AsynchConnector::failure(int errCode, std::string message) +void AsynchConnector::failure(int errCode, const std::string& message) { - if (failCallback) - failCallback(errCode, message); - - socket.close(); - delete &socket; + failCallback(socket, errCode, message); DispatchHandle::doDelete(); } @@ -467,7 +471,8 @@ void AsynchIO::readable(DispatchHandle& h) { threadReadTotal += rc; readTotal += rc; - if (!readCallback(*this, buff)) { + readCallback(*this, buff); + if (readingStopped) { // We have been flow controlled. break; } diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 31044be9ca..02004b1999 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -21,6 +21,7 @@ #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" @@ -36,6 +37,7 @@ #include <iostream> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace sys { @@ -126,42 +128,23 @@ void Socket::setNonblocking() const { QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK)); } -namespace { -const char* h_errstr(int e) { - switch (e) { - case HOST_NOT_FOUND: return "Host not found"; - case NO_ADDRESS: return "Name does not have an IP address"; - case TRY_AGAIN: return "A temporary error occurred on an authoritative name server."; - case NO_RECOVERY: return "Non-recoverable name server error"; - default: return "Unknown error"; - } -} +void Socket::connect(const std::string& host, uint16_t port) const +{ + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + connect(sa); } -void Socket::connect(const std::string& host, uint16_t p) const +void Socket::connect(const SocketAddress& addr) const { - std::stringstream portstream; - portstream << p; - std::string port = portstream.str(); - connectname = host + ":" + port; + connectname = addr.asString(); const int& socket = impl->fd; - ::addrinfo *res; - ::addrinfo hints; - ::memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well - hints.ai_socktype = SOCK_STREAM; - int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res); - if (n != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); // TODO the correct thing to do here is loop on failure until you've used all the returned addresses - if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) && + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { - ::freeaddrinfo(res); - throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port)); + throw Exception(QPID_MSG(strError(errno) << ": " << connectname)); } - ::freeaddrinfo(res); } void @@ -178,15 +161,14 @@ int Socket::listen(uint16_t port, int backlog) const const int& socket = impl->fd; int yes=1; QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) + + SocketAddress sa("", boost::lexical_cast<std::string>(port)); + if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); if (::listen(socket, backlog) < 0) throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); - + + struct sockaddr_in name; socklen_t namelen = sizeof(name); if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) throw QPID_POSIX_ERROR(errno); @@ -226,9 +208,10 @@ std::string Socket::getPeername() const std::string Socket::getPeerAddress() const { - if (!connectname.empty()) - return std::string (connectname); - return getName(impl->fd, false, true); + if (connectname.empty()) { + connectname = getName(impl->fd, false, true); + } + return connectname; } std::string Socket::getLocalAddress() const diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp new file mode 100644 index 0000000000..fe8812299c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/sys/posix/check.h" + +#include <sys/socket.h> +#include <string.h> +#include <netdb.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0) +{ + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); + } + const char* service = port.empty() ? "0" : port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +} + +SocketAddress::~SocketAddress() +{ + ::freeaddrinfo(addrInfo); +} + +std::string SocketAddress::asString() const +{ + return host + ":" + port; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + return *sa.addrInfo; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 9da6c835ce..d39f7885a5 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -40,6 +40,7 @@ using std::rand; using qpid::sys::Poller; using qpid::sys::Dispatcher; +using qpid::sys::SocketAddress; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::sys::TIME_SEC; @@ -154,18 +155,8 @@ using namespace qpid::tests; int main(int argc, char* argv[]) { vector<string> args(&argv[0], &argv[argc]); - ::addrinfo *res; - ::addrinfo hints = {}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; + string host = args[1]; string port = (args.size() < 3) ? "20079" : args[2]; - int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res); - if (n<0) { - cerr << "Can't find information for: " << args[1] << "\n"; - return 1; - } else { - cout << "Connecting to: " << args[1] << ":" << port <<"\n"; - } if (args.size() > 3) msgsize = atoi(args[3].c_str()); @@ -181,8 +172,10 @@ int main(int argc, char* argv[]) { boost::shared_ptr<Poller> p(new Poller()); Dispatcher d(p); + SocketAddress sa(host, port); + cout << "Connecting to: " << sa.asString() <<"\n"; Rdma::Connector c( - *res->ai_addr, + sa, Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&connected, p, _1, _2), boost::bind(&connectionError, p, _1, _2), diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 491c1612fd..8d06fccba1 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -26,6 +26,7 @@ #include <iostream> #include <boost/bind.hpp> +using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; @@ -461,7 +462,7 @@ namespace Rdma { } Listener::Listener( - const sockaddr& src, + const SocketAddress& src, const ConnectionParams& cp, EstablishedCallback ec, ErrorCallback errc, @@ -541,7 +542,7 @@ namespace Rdma { } Connector::Connector( - const sockaddr& dst, + const SocketAddress& dst, const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h index 697d9387ce..12a1b98d24 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -27,6 +27,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/SocketAddress.h" #include <netinet/in.h> @@ -173,14 +174,14 @@ namespace Rdma { class Listener : public ConnectionManager { - sockaddr src_addr; + qpid::sys::SocketAddress src_addr; ConnectionParams checkConnectionParams; ConnectionRequestCallback connectionRequestCallback; EstablishedCallback establishedCallback; public: Listener( - const sockaddr& src, + const qpid::sys::SocketAddress& src, const ConnectionParams& cp, EstablishedCallback ec, ErrorCallback errc, @@ -198,14 +199,14 @@ namespace Rdma { class Connector : public ConnectionManager { - sockaddr dst_addr; + qpid::sys::SocketAddress dst_addr; ConnectionParams connectionParams; RejectedCallback rejectedCallback; ConnectedCallback connectedCallback; public: Connector( - const sockaddr& dst, + const qpid::sys::SocketAddress& dst, const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 07d6379362..4c11ba23eb 100644 --- a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -35,6 +35,7 @@ using std::string; using std::cout; using std::cerr; +using qpid::sys::SocketAddress; using qpid::sys::Poller; using qpid::sys::Dispatcher; @@ -144,20 +145,15 @@ using namespace qpid::tests; int main(int argc, char* argv[]) { vector<string> args(&argv[0], &argv[argc]); - ::sockaddr_in sin; - - int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str()); + std::string port = (args.size() < 2) ? "20079" : args[1]; cout << "Listening on port: " << port << "\n"; - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - sin.sin_addr.s_addr = INADDR_ANY; - try { boost::shared_ptr<Poller> p(new Poller()); Dispatcher d(p); - Rdma::Listener a((const sockaddr&)(sin), + SocketAddress sa("", port); + Rdma::Listener a(sa, Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES), boost::bind(connected, p, _1), connectionError, diff --git a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h index aa2e516e6b..e11497dc02 100644 --- a/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -350,9 +350,9 @@ namespace Rdma { return ConnectionEvent(e); } - void bind(sockaddr& src_addr) const { + void bind(qpid::sys::SocketAddress& src_addr) const { assert(id.get()); - CHECK(::rdma_bind_addr(id.get(), &src_addr)); + CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr)); } void listen(int backlog = DEFAULT_BACKLOG) const { @@ -361,12 +361,11 @@ namespace Rdma { } void resolve_addr( - sockaddr& dst_addr, - sockaddr* src_addr = 0, + qpid::sys::SocketAddress& dst_addr, int timeout_ms = DEFAULT_TIMEOUT) const { assert(id.get()); - CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms)); + CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms)); } void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const { diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 8905b87838..475b18600d 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -634,7 +634,7 @@ void AsynchIO::readComplete(AsynchReadResult *result) { if (status == 0 && bytes > 0) { bool restartRead = true; // May not if receiver doesn't want more if (readCallback) - restartRead = readCallback(*this, result->getBuff()); + readCallback(*this, result->getBuff()); if (restartRead) startReading(); } diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 8a1ef6149e..472ca28954 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -206,45 +206,22 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT PreRoute pr(msg, this); try { XmlBinding::vector::ConstPtr p; - { + BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); + { RWlock::ScopedRlock l(lock); - p = bindingsMap[routingKey].snapshot(); - if (!p) return; - } - int count(0); + p = bindingsMap[routingKey].snapshot(); + if (!p.get()) return; + } for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) { if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { - msg.deliverTo((*i)->queue); - count++; - QPID_LOG(trace, "Delivered to queue" ); - - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + b->push_back(*i); } - } - if (!count) { - QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey); - if (mgmtExchange != 0) { - mgmtExchange->inc_msgDrops (); - mgmtExchange->inc_byteDrops (msg.contentSize ()); - } - } else { - if (mgmtExchange != 0) { - mgmtExchange->inc_msgRoutes (count); - mgmtExchange->inc_byteRoutes (count * msg.contentSize ()); - } - } - - if (mgmtExchange != 0) { - mgmtExchange->inc_msgReceives (); - mgmtExchange->inc_byteReceives (msg.contentSize ()); - } + } + doRoute(msg, b); } catch (...) { QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey); } - - } |