summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r--qpid/cpp/src/qpid/acl/AclData.cpp91
-rw-r--r--qpid/cpp/src/qpid/acl/AclData.h12
-rw-r--r--qpid/cpp/src/qpid/acl/AclReader.cpp227
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h17
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h10
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp18
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp111
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h85
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp48
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h50
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp53
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h9
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp15
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp55
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h34
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp55
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h17
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/Quorum_cman.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/Address.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/Receiver.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/ReceiverImpl.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/Sender.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/SenderImpl.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/Session.cpp12
-rw-r--r--qpid/cpp/src/qpid/messaging/SessionImpl.h4
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp11
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp11
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.cpp46
39 files changed, 756 insertions, 352 deletions
diff --git a/qpid/cpp/src/qpid/acl/AclData.cpp b/qpid/cpp/src/qpid/acl/AclData.cpp
index d2a55c0027..81519c3311 100644
--- a/qpid/cpp/src/qpid/acl/AclData.cpp
+++ b/qpid/cpp/src/qpid/acl/AclData.cpp
@@ -53,42 +53,65 @@ bool AclData::matchProp(const std::string & src, const std::string& src1)
}
}
-AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& name, std::map<Property, std::string>* params)
-{
- AclResult aclresult = decisionMode;
-
- if (actionList[action] && actionList[action][objType]){
- AclData::actObjItr itrRule = actionList[action][objType]->find(id);
- if (itrRule == actionList[action][objType]->end())
- itrRule = actionList[action][objType]->find("*");
- if (itrRule != actionList[action][objType]->end() ) {
-
- //loop the vector
- for (ruleSetItr i=itrRule->second.begin(); i<itrRule->second.end(); i++) {
-
- // loop the names looking for match
- bool match =true;
- for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++)
- {
- //match name is exists first
- if (pMItr->first == acl::PROP_NAME){
- if (!matchProp(pMItr->second, name)){
- match= false;
- }
- }else if (params){ //match pMItr against params
- propertyMapItr paramItr = params->find (pMItr->first);
- if (paramItr == params->end()){
- match = false;
- }else if (!matchProp(paramItr->second, pMItr->second)){
- match = false;
- }
+AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType,
+ const std::string& name, std::map<Property, std::string>* params) {
+
+ QPID_LOG(debug, "ACL: Lookup for id:" << id << " action:" << AclHelper::getActionStr((Action) action)
+ << " objectType:" << AclHelper::getObjectTypeStr((ObjectType) objType) << " name:" << name
+ << " with params " << AclHelper::propertyMapToString(params));
+
+ AclResult aclresult = decisionMode;
+ if (actionList[action] && actionList[action][objType]) {
+ AclData::actObjItr itrRule = actionList[action][objType]->find(id);
+ if (itrRule == actionList[action][objType]->end())
+ itrRule = actionList[action][objType]->find("*");
+ if (itrRule != actionList[action][objType]->end()) {
+
+ QPID_LOG(debug, "ACL: checking the following rules for : " << itrRule->first );
+
+ //loop the vector
+ for (ruleSetItr i = itrRule->second.begin(); i < itrRule->second.end(); i++) {
+ QPID_LOG(debug, "ACL: checking rule " << i->toString());
+ // loop the names looking for match
+ bool match = true;
+ for (propertyMapItr pMItr = i->props.begin(); (pMItr != i->props.end()) && match; pMItr++) {
+ //match name is exists first
+ if (pMItr->first == acl::PROP_NAME) {
+ if (matchProp(pMItr->second, name)){
+ QPID_LOG(debug, "ACL: name '" << name << "' matched with name '"
+ << pMItr->second << "' given in the rule");
+ }else{
+ match = false;
+ QPID_LOG(debug, "ACL: name '" << name << "' didn't match with name '"
+ << pMItr->second << "' given in the rule");
+ }
+ } else if (params) { //match pMItr against params
+ propertyMapItr paramItr = params->find(pMItr->first);
+ if (paramItr == params->end()) {
+ match = false;
+ QPID_LOG(debug, "ACL: the given parameter map in lookup doesn't contain the property '"
+ << AclHelper::getPropertyStr(pMItr->first) << "'");
+ } else if (!matchProp(pMItr->second, paramItr->second)) {
+ QPID_LOG(debug, "ACL: the pair("
+ << AclHelper::getPropertyStr(paramItr->first) << "," << paramItr->second
+ << ") given in lookup doesn't match the pair("
+ << AclHelper::getPropertyStr(pMItr->first) << "," << pMItr->second << ") given in the rule");
+ match = false;
}
}
- if (match) return getACLResult(i->logOnly, i->log);
- }
- }
- }
- return aclresult;
+ }
+ if (match)
+ {
+ aclresult = getACLResult(i->logOnly, i->log);
+ QPID_LOG(debug,"Successful match, the decision is:" << AclHelper::getAclResultStr(aclresult));
+ return aclresult;
+ }
+ }
+ }
+ }
+
+ QPID_LOG(debug,"No successful match, defaulting to the decision mode " << AclHelper::getAclResultStr(aclresult));
+ return aclresult;
}
AclResult AclData::lookup(const std::string& id, const Action& action, const ObjectType& objType, const std::string& /*Exchange*/ name, const std::string& RoutingKey)
diff --git a/qpid/cpp/src/qpid/acl/AclData.h b/qpid/cpp/src/qpid/acl/AclData.h
index 249c3523eb..a63afab12b 100644
--- a/qpid/cpp/src/qpid/acl/AclData.h
+++ b/qpid/cpp/src/qpid/acl/AclData.h
@@ -22,7 +22,7 @@
#include "qpid/broker/AclModule.h"
#include <vector>
-
+#include <sstream>
namespace qpid {
namespace acl {
@@ -45,6 +45,16 @@ public:
rule (propertyMap& p):log(false),logOnly(false),props(p) {};
+
+ std::string toString () const {
+ std::ostringstream ruleStr;
+ ruleStr << "[log=" << log << ", logOnly=" << logOnly << " props{";
+ for (propertyMapItr pMItr = props.begin(); pMItr != props.end(); pMItr++) {
+ ruleStr << " " << AclHelper::getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+ }
+ ruleStr << " }]";
+ return ruleStr.str();
+ }
};
typedef std::vector<rule> ruleSet;
typedef ruleSet::const_iterator ruleSetItr;
diff --git a/qpid/cpp/src/qpid/acl/AclReader.cpp b/qpid/cpp/src/qpid/acl/AclReader.cpp
index 8f5e4f5b57..8f419a6f50 100644
--- a/qpid/cpp/src/qpid/acl/AclReader.cpp
+++ b/qpid/cpp/src/qpid/acl/AclReader.cpp
@@ -83,115 +83,142 @@ std::string AclReader::aclRule::toString() {
return oss.str();
}
-void AclReader::loadDecisionData( boost::shared_ptr<AclData> d )
-{
- d->clear();
- QPID_LOG(debug, "ACL Load Rules");
- int cnt = rules.size();
+void AclReader::loadDecisionData(boost::shared_ptr<AclData> d) {
+ d->clear();
+ QPID_LOG(debug, "ACL Load Rules");
+ int cnt = rules.size();
bool foundmode = false;
- for (rlCitr i=rules.end()-1; cnt; i--,cnt--) {
- QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2) << cnt << " " << (*i)->toString());
-
- if (!foundmode && (*i)->actionAll && (*i)->names.size()==1 && (*((*i)->names.begin())).compare("*")==0 ){
- d->decisionMode = (*i)->res;
- QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(d->decisionMode));
- foundmode=true;
- }else{
- AclData::rule rule((*i)->props);
- bool addrule= true;
-
- switch ((*i)->res)
- {
- case qpid::acl::ALLOWLOG:
- rule.log = true;
- if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG)
- rule.logOnly = true;
+
+ for (rlCitr i = rules.end() - 1; cnt; i--, cnt--) {
+ QPID_LOG(debug, "ACL Processing " << std::setfill(' ') << std::setw(2)
+ << cnt << " " << (*i)->toString());
+
+ if (!foundmode && (*i)->actionAll && (*i)->names.size() == 1
+ && (*((*i)->names.begin())).compare("*") == 0) {
+ d->decisionMode = (*i)->res;
+ QPID_LOG(debug, "ACL FoundMode " << AclHelper::getAclResultStr(
+ d->decisionMode));
+ foundmode = true;
+ } else {
+ AclData::rule rule((*i)->props);
+ bool addrule = true;
+
+ switch ((*i)->res) {
+ case qpid::acl::ALLOWLOG:
+ rule.log = true;
+ if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+ == qpid::acl::ALLOWLOG)
+ rule.logOnly = true;
+ break;
+ case qpid::acl::ALLOW:
+ if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode
+ == qpid::acl::ALLOWLOG)
+ addrule = false;
+ break;
+ case qpid::acl::DENYLOG:
+ rule.log = true;
+ if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+ == qpid::acl::DENYLOG)
+ rule.logOnly = true;
+ break;
+ case qpid::acl::DENY:
+ if (d->decisionMode == qpid::acl::DENY || d->decisionMode
+ == qpid::acl::DENYLOG)
+ addrule = false;
break;
- case qpid::acl::ALLOW:
- if (d->decisionMode == qpid::acl::ALLOW || d->decisionMode == qpid::acl::ALLOWLOG)
- addrule = false;
- break;
- case qpid::acl::DENYLOG:
- rule.log = true;
- if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG)
- rule.logOnly = true;
- break;
- case qpid::acl::DENY:
- if (d->decisionMode == qpid::acl::DENY || d->decisionMode == qpid::acl::DENYLOG)
- addrule = false;
- break;
- default:
- throw Exception("Invalid ACL Result loading rules.");
- }
-
-
- // Action -> Object -> map<user -> set<Rule> >
- if (addrule){
- for (int acnt= ((*i)->actionAll?0:(*i)->action);
- acnt< acl::ACTIONSIZE; (*i)->actionAll?acnt++:acnt=acl::ACTIONSIZE ) {
-
- if (acnt == acl::ACT_PUBLISH) d->transferAcl = true; // we have transfer ACL
-
- QPID_LOG(debug, "ACL Adding action:" << AclHelper::getActionStr((Action)acnt) );
-
- //find the Action, create if not exist
- if (d->actionList[acnt]==NULL) {
- d->actionList[acnt] = new AclData::aclAction[qpid::acl::OBJECTSIZE];
- for (int j=0;j<qpid::acl::OBJECTSIZE; j++)
- d->actionList[acnt][j] = NULL;
- }
+ default:
+ throw Exception("Invalid ACL Result loading rules.");
+ }
+
+ // Action -> Object -> map<user -> set<Rule> >
+ if (addrule) {
+ std::ostringstream actionstr;
+ for (int acnt = ((*i)->actionAll ? 0 : (*i)->action); acnt
+ < acl::ACTIONSIZE; (*i)->actionAll ? acnt++ : acnt
+ = acl::ACTIONSIZE) {
+
+ if (acnt == acl::ACT_PUBLISH)
+ d->transferAcl = true; // we have transfer ACL
+
+ actionstr << AclHelper::getActionStr((Action) acnt) << ",";
+
+ //find the Action, create if not exist
+ if (d->actionList[acnt] == NULL) {
+ d->actionList[acnt]
+ = new AclData::aclAction[qpid::acl::OBJECTSIZE];
+ for (int j = 0; j < qpid::acl::OBJECTSIZE; j++)
+ d->actionList[acnt][j] = NULL;
+ }
// optimize this loop to limit to valid options only!!
- for (int ocnt= ((*i)->objStatus!=aclRule::VALUE ?0:(*i)->object);
- ocnt< acl::OBJECTSIZE;
- (*i)->objStatus!=aclRule::VALUE?ocnt++:ocnt=acl::OBJECTSIZE ) {
-
- QPID_LOG(debug, "ACL Adding object:" << AclHelper::getObjectTypeStr((ObjectType)ocnt) );
-
- //find the Object, create if not exist
- if (d->actionList[acnt][ocnt] == NULL)
- d->actionList[acnt][ocnt] = new AclData::actionObject;
-
- // add users and Rule to object set
- bool allNames=false;
- // check to see if names.begin is '*'
- if ( (*(*i)->names.begin()).compare("*")==0 ) allNames = true;
-
- for (nsCitr itr = (allNames?names.begin():(*i)->names.begin());
- itr != (allNames?names.end():(*i)->names.end()); itr++) {
- AclData::actObjItr itrRule = d->actionList[acnt][ocnt]->find(*itr);
- if (itrRule == d->actionList[acnt][ocnt]->end()) {
- QPID_LOG(debug, "ACL Adding rule & user:" << *itr);
- AclData::ruleSet rSet;
- rSet.push_back(rule);
- d->actionList[acnt][ocnt]->insert(make_pair( std::string(*itr) , rSet) );
- }else{
-
- // TODO add code to check for dead rules
- // allow peter create queue name=tmp <-- dead rule!!
- // allow peter create queue
-
- itrRule->second.push_back(rule);
- QPID_LOG(debug, "ACL Adding rule to user:" << *itr);
- }
- }
-
- }
-
- }
- }else{
- QPID_LOG(debug, "ACL Skipping based on Mode:" << AclHelper::getAclResultStr(d->decisionMode) );
- }
- }
-
- }
+ for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0
+ : (*i)->object); ocnt < acl::OBJECTSIZE; (*i)->objStatus
+ != aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+
+ //find the Object, create if not exist
+ if (d->actionList[acnt][ocnt] == NULL)
+ d->actionList[acnt][ocnt]
+ = new AclData::actionObject;
+
+ // add users and Rule to object set
+ bool allNames = false;
+ // check to see if names.begin is '*'
+ if ((*(*i)->names.begin()).compare("*") == 0)
+ allNames = true;
+
+ for (nsCitr itr = (allNames ? names.begin()
+ : (*i)->names.begin()); itr
+ != (allNames ? names.end() : (*i)->names.end()); itr++) {
+
+ AclData::actObjItr itrRule =
+ d->actionList[acnt][ocnt]->find(*itr);
+
+ if (itrRule == d->actionList[acnt][ocnt]->end()) {
+ AclData::ruleSet rSet;
+ rSet.push_back(rule);
+ d->actionList[acnt][ocnt]->insert(make_pair(
+ std::string(*itr), rSet));
+ } else {
+
+ // TODO add code to check for dead rules
+ // allow peter create queue name=tmp <-- dead rule!!
+ // allow peter create queue
+
+ itrRule->second.push_back(rule);
+ }
+ }
+
+ }
+ }
+
+ std::ostringstream objstr;
+ for (int ocnt = ((*i)->objStatus != aclRule::VALUE ? 0 : (*i)->object); ocnt < acl::OBJECTSIZE;
+ (*i)->objStatus != aclRule::VALUE ? ocnt++ : ocnt = acl::OBJECTSIZE) {
+ objstr << AclHelper::getObjectTypeStr((ObjectType) ocnt) << ",";
+ }
+
+ bool allNames = ((*(*i)->names.begin()).compare("*") == 0);
+ std::ostringstream userstr;
+ for (nsCitr itr = (allNames ? names.begin() : (*i)->names.begin());
+ itr != (allNames ? names.end() : (*i)->names.end()); itr++) {
+ userstr << *itr << ",";
+ }
+
+ QPID_LOG(debug,"ACL: Adding actions {" << actionstr.str().substr(0,actionstr.str().length()-1)
+ << "} to objects {" << objstr.str().substr(0,objstr.str().length()-1)
+ << "} with props " << AclHelper::propertyMapToString(&rule.props)
+ << " for users {" << userstr.str().substr(0,userstr.str().length()-1) << "}" );
+ } else {
+ QPID_LOG(debug, "ACL Skipping based on Mode:"
+ << AclHelper::getAclResultStr(d->decisionMode));
+ }
+ }
+ }
}
-
-
void AclReader::aclRule::processName(const std::string& name, const groupMap& groups) {
if (name.compare("all") == 0) {
names.insert("*");
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index a78b2d5b4a..536fa21b2b 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -26,7 +26,7 @@
#include <map>
#include <set>
#include <string>
-
+#include <sstream>
namespace qpid {
@@ -179,6 +179,8 @@ class AclHelper {
typedef std::map<ObjectType, actionMapPtr> objectMap;
typedef objectMap::const_iterator omCitr;
typedef boost::shared_ptr<objectMap> objectMapPtr;
+ typedef std::map<Property, std::string> propMap;
+ typedef propMap::const_iterator propMapItr;
// This map contains the legal combinations of object/action/properties found in an ACL file
static void loadValidationMap(objectMapPtr& map) {
@@ -248,6 +250,19 @@ class AclHelper {
map->insert(objectPair(OBJ_METHOD, a4));
}
+
+ static std::string propertyMapToString(const std::map<Property, std::string>* params) {
+ std::ostringstream ss;
+ ss << "{";
+ if (params)
+ {
+ for (propMapItr pMItr = params->begin(); pMItr != params->end(); pMItr++) {
+ ss << " " << getPropertyStr((Property) pMItr-> first) << "=" << pMItr->second;
+ }
+ }
+ ss << " }";
+ return ss.str();
+ }
};
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index b1fc1295f3..90d81b81c6 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
const std::string qpidSequenceCounter("qpid.sequence_counter");
@@ -51,17 +51,19 @@ const std::string fedOpBind("B");
const std::string fedOpUnbind("U");
const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
+
+const std::string QPID_MANAGEMENT("qpid.management");
}
Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
if (parent){
if (parent->sequence || parent->ive) parent->sequenceLock.lock();
-
+
if (parent->sequence){
parent->sequenceNo++;
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
- }
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
if (parent->ive) {
parent->lastMsg = &( msg.getMessage());
}
@@ -99,11 +101,9 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
}
}
-static const std::string QPID_MANAGEMENT("qpid.management");
-
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
- : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
+ : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b)
{
if (parent != 0 && broker != 0)
@@ -169,7 +169,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe
string name;
string type;
FieldTable args;
-
+
buffer.getShortString(name);
bool durable(buffer.getOctet());
buffer.getShortString(type);
@@ -185,7 +185,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe
}
}
-void Exchange::encode(Buffer& buffer) const
+void Exchange::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.putOctet(durable);
@@ -195,8 +195,8 @@ void Exchange::encode(Buffer& buffer) const
buffer.put(args);
}
-uint32_t Exchange::encodedSize() const
-{
+uint32_t Exchange::encodedSize() const
+{
return name.size() + 1/*short string size*/
+ 1 /*durable*/
+ getType().size() + 1/*short string size*/
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 639f04faa2..7360010192 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -362,7 +362,7 @@ boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor)
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
- }
+ }
return empty;
}
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 0024509bc8..e4d09b1042 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -34,12 +34,12 @@
#include <vector>
namespace qpid {
-
+
namespace framing {
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -145,9 +145,9 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
- void forcePersistent();
- bool isForcedPersistent();
+
+ void forcePersistent();
+ bool isForcedPersistent();
boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index e01fd81074..14b233fd6c 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,14 +30,14 @@ using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-namespace
+namespace
{
std::string type_str(uint8_t type);
+ const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
- state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
-static const std::string QPID_MANAGEMENT("qpid.management");
+MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
+ state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -54,10 +54,10 @@ void MessageBuilder::handle(AMQFrame& frame)
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- message->getFrames().append(header);
+ message->getFrames().append(header);
} else if (type != HEADER_BODY) {
throw CommandInvalidException(
- QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
<< type_str(type) << ")"));
}
state = CONTENT;
@@ -74,13 +74,13 @@ void MessageBuilder::handle(AMQFrame& frame)
} else {
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
- if (state == CONTENT
- && stagingThreshold
+ if (state == CONTENT
+ && stagingThreshold
&& message->getFrames().getContentSize() >= stagingThreshold
&& !NullMessageStore::isNullStore(store)
- && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
+ && message->getExchangeName() != QPID_MANAGEMENT /* don't stage mgnt messages */)
{
- message->releaseContent(store);
+ message->releaseContent(store);
staging = true;
}
}
@@ -108,7 +108,7 @@ const std::string CONTENT_BODY_S = "CONTENT";
const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
const std::string UNKNOWN = "unknown";
-std::string type_str(uint8_t type)
+std::string type_str(uint8_t type)
{
switch(type) {
case METHOD_BODY: return METHOD_BODY_S;
@@ -124,7 +124,7 @@ std::string type_str(uint8_t type)
void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
{
if (expected != actual) {
- throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
<< type_str(expected) << " got " << type_str(actual) << ")"));
}
}
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
index c1f86d4ca4..2ef223aa81 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
@@ -62,7 +62,7 @@ void PersistableMessage::flush()
void PersistableMessage::setContentReleased() {contentReleased = true; }
bool PersistableMessage::isContentReleased()const { return contentReleased; }
-
+
bool PersistableMessage::isEnqueueComplete() {
sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index 05de9ff4c3..0274b41375 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -46,7 +46,7 @@ class PersistableMessage : public Persistable
sys::Mutex asyncEnqueueLock;
sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
-
+
/**
* Tracks the number of outstanding asynchronous enqueue
* operations. When the message is enqueued asynchronously the
@@ -97,7 +97,7 @@ class PersistableMessage : public Persistable
void flush();
bool isContentReleased() const;
-
+
QPID_BROKER_EXTERN bool isEnqueueComplete();
QPID_BROKER_EXTERN void enqueueComplete();
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 08ee133981..b2a8e223c5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -597,7 +597,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
- }
+ }
}else {
messages.push_back(qm);
listeners.populate(copy);
@@ -702,7 +702,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
msg->addTraceId(traceId);
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index c134f399c8..72a91dff24 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -111,7 +111,7 @@ class QueueRegistry {
/** Call f for each queue in the registry. */
template <class F> void eachQueue(F f) const {
- qpid::sys::RWlock::ScopedWlock l(lock);
+ qpid::sys::RWlock::ScopedRlock l(lock);
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 2a19115fd1..5bc4cdf960 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -195,7 +195,7 @@ void RecoverableQueueImpl::setPersistenceId(uint64_t id)
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index af07605552..a1ad5a0a30 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -72,7 +72,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange declare request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
}
//TODO: implement autoDelete
@@ -134,7 +134,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange delete request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
}
//TODO: implement unused
@@ -154,7 +154,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
AclModule* acl = getBroker().getAcl();
if (acl) {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange query request from " << getConnection().getUserId()));
+ throw NotAllowedException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId()));
}
try {
@@ -171,8 +171,12 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
{
AclModule* acl = getBroker().getAcl();
if (acl) {
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,routingKey) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange bind request from " << getConnection().getUserId()));
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -234,8 +238,8 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchangeName,&params) )
- throw NotAllowedException(QPID_MSG("ACL denied exhange bound request from " << getConnection().getUserId()));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,&params) )
+ throw NotAllowedException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
Exchange::shared_ptr exchange;
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 43cbf3aa4d..a715c623bf 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -29,7 +29,14 @@
#include "qpid/client/Message.h"
#include "qpid/client/MessageImpl.h"
-#include <boost/state_saver.hpp>
+#include <boost/version.hpp>
+#if (BOOST_VERSION >= 104000)
+# include <boost/serialization/state_saver.hpp>
+ using boost::serialization::state_saver;
+#else
+# include <boost/state_saver.hpp>
+ using boost::state_saver;
+#endif /* BOOST_VERSION */
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
@@ -65,7 +72,7 @@ void Dispatcher::run()
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
- boost::state_saver<bool> reset(running); // Reset to false on exit.
+ state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
while (!queue->isClosed()) {
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
new file mode 100644
index 0000000000..80be5c56f3
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AcceptTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+void AcceptTracker::State::accept()
+{
+ unconfirmed.add(unaccepted);
+ unaccepted.clear();
+}
+
+void AcceptTracker::State::release()
+{
+ unaccepted.clear();
+}
+
+uint32_t AcceptTracker::State::acceptsPending()
+{
+ return unconfirmed.size();
+}
+
+void AcceptTracker::State::completed(qpid::framing::SequenceSet& set)
+{
+ unconfirmed.remove(set);
+}
+
+void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id)
+{
+ aggregateState.unaccepted.add(id);
+ destinationState[destination].unaccepted.add(id);
+}
+
+void AcceptTracker::accept(qpid::client::AsyncSession& session)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.accept();
+ }
+ Record record;
+ record.status = session.messageAccept(aggregateState.unaccepted);
+ record.accepted = aggregateState.unaccepted;
+ pending.push_back(record);
+ aggregateState.accept();
+}
+
+void AcceptTracker::release(qpid::client::AsyncSession& session)
+{
+ session.messageRelease(aggregateState.unaccepted);
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.release();
+ }
+ aggregateState.release();
+}
+
+uint32_t AcceptTracker::acceptsPending()
+{
+ checkPending();
+ return aggregateState.acceptsPending();
+}
+
+uint32_t AcceptTracker::acceptsPending(const std::string& destination)
+{
+ checkPending();
+ return destinationState[destination].acceptsPending();
+}
+
+void AcceptTracker::reset()
+{
+ destinationState.clear();
+ aggregateState.unaccepted.clear();
+ aggregateState.unconfirmed.clear();
+ pending.clear();
+}
+
+void AcceptTracker::checkPending()
+{
+ while (!pending.empty() && pending.front().status.isComplete()) {
+ completed(pending.front().accepted);
+ pending.pop_front();
+ }
+}
+
+void AcceptTracker::completed(qpid::framing::SequenceSet& set)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.completed(set);
+ }
+ aggregateState.completed(set);
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
new file mode 100644
index 0000000000..fb58a3a8c8
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -0,0 +1,85 @@
+#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <deque>
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Tracks the set of messages requiring acceptance, and those for
+ * which an accept has been issued but is yet to be confirmed
+ * complete.
+ */
+class AcceptTracker
+{
+ public:
+ void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
+ void accept(qpid::client::AsyncSession&);
+ void release(qpid::client::AsyncSession&);
+ uint32_t acceptsPending();
+ uint32_t acceptsPending(const std::string& destination);
+ void reset();
+ private:
+ struct State
+ {
+ /**
+ * ids of messages that have been delivered but not yet
+ * accepted
+ */
+ qpid::framing::SequenceSet unaccepted;
+ /**
+ * ids of messages for which an accpet has been issued but not
+ * yet confirmed as completed
+ */
+ qpid::framing::SequenceSet unconfirmed;
+
+ void accept();
+ void release();
+ uint32_t acceptsPending();
+ void completed(qpid::framing::SequenceSet&);
+ };
+ typedef std::map<std::string, State> StateMap;
+ struct Record
+ {
+ qpid::client::Completion status;
+ qpid::framing::SequenceSet accepted;
+ };
+ typedef std::deque<Record> Records;
+
+ State aggregateState;
+ StateMap destinationState;
+ Records pending;
+
+ void checkPending();
+ void completed(qpid::framing::SequenceSet&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
deleted file mode 100644
index 52b623b65c..0000000000
--- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "CompletionTracker.h"
-
-namespace qpid {
-namespace client {
-namespace amqp0_10 {
-
-using qpid::framing::SequenceNumber;
-
-void CompletionTracker::track(SequenceNumber command, void* token)
-{
- tokens[command] = token;
-}
-
-void CompletionTracker::completedTo(SequenceNumber command)
-{
- Tokens::iterator i = tokens.lower_bound(command);
- if (i != tokens.end()) {
- lastCompleted = i->second;
- tokens.erase(tokens.begin(), ++i);
- }
-}
-
-void* CompletionTracker::getLastCompletedToken()
-{
- return lastCompleted;
-}
-
-}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
deleted file mode 100644
index 6147c5682e..0000000000
--- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
-#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/SequenceNumber.h"
-#include <map>
-
-namespace qpid {
-namespace client {
-namespace amqp0_10 {
-
-/**
- * Provides a mapping from command ids to application supplied
- * 'tokens', and is used to determine when the sending or
- * acknowledging of a specific message is complete.
- */
-class CompletionTracker
-{
- public:
- void track(qpid::framing::SequenceNumber command, void* token);
- void completedTo(qpid::framing::SequenceNumber command);
- void* getLastCompletedToken();
- private:
- typedef std::map<qpid::framing::SequenceNumber, void*> Tokens;
- Tokens tokens;
- void* lastCompleted;
-};
-}}} // namespace qpid::client::amqp0_10
-
-#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index b0a16674e1..d22208368b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -81,12 +81,31 @@ struct MatchAndTrack
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
}
bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@ void IncomingMessages::releaseAll()
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 5e28877305..e84cd18892 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -27,6 +27,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/BlockingQueue.h"
#include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
namespace qpid {
@@ -74,13 +75,19 @@ class IncomingMessages
void accept();
void releaseAll();
void releasePending(const std::string& destination);
+
+ uint32_t pendingAccept();
+ uint32_t pendingAccept(const std::string& destination);
+
+ uint32_t available();
+ uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
qpid::client::AsyncSession session;
- qpid::framing::SequenceSet unaccepted;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
+ AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 31efff38a6..da91c4a160 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -120,6 +120,21 @@ qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener;
const std::string& ReceiverImpl::getName() const { return destination; }
+uint32_t ReceiverImpl::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+ return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
+{
+ return parent.pendingAck(destination);
+}
+
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a,
const qpid::messaging::Filter* f,
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 509c784513..b941348fc8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -62,6 +62,9 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void stop();
const std::string& getName() const;
void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t available();
+ uint32_t pendingAck();
void setListener(qpid::messaging::MessageListener* listener);
qpid::messaging::MessageListener* getListener();
void received(qpid::messaging::Message& message);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index c619d1226a..4cd2dc0521 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -32,11 +32,12 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address,
const qpid::messaging::Variant::Map& _options) :
parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
- capacity(50), window(0) {}
+ capacity(50), window(0), flushed(false) {}
-void SenderImpl::send(const qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& message)
{
- execute1<Send>(&m);
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
}
void SenderImpl::cancel()
@@ -44,6 +45,20 @@ void SenderImpl::cancel()
execute<Cancel>();
}
+void SenderImpl::setCapacity(uint32_t c)
+{
+ bool flush = c < capacity;
+ capacity = c;
+ execute1<CheckPendingSends>(flush);
+}
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+ CheckPendingSends f(*this, false);
+ parent.execute(f);
+ return f.pending;
+}
+
void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
@@ -60,18 +75,31 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
}
}
+void SenderImpl::waitForCapacity()
+{
+ //TODO: add option to throw exception rather than blocking?
+ if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true);
+ window = 0;
+ }
+}
+
void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: make recoding for replay optional
+ //TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
- if (++window > (capacity / 2)) {//TODO: make this configurable?
- session.flush();
- checkPendingSends();
- window = 0;
- }
}
void SenderImpl::replay()
@@ -81,11 +109,18 @@ void SenderImpl::replay()
}
}
-void SenderImpl::checkPendingSends()
+uint32_t SenderImpl::checkPendingSends(bool flush)
{
+ if (flush) {
+ session.flush();
+ flushed = true;
+ } else {
+ flushed = false;
+ }
while (!outgoing.empty() && outgoing.front().status.isComplete()) {
outgoing.pop_front();
}
+ return outgoing.size();
}
void SenderImpl::cancelImpl()
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 4ba793d71c..028d26bda7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl
const qpid::messaging::Variant::Map& options);
void send(const qpid::messaging::Message&);
void cancel();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t pending();
void init(qpid::client::AsyncSession, AddressResolution&);
private:
@@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
OutgoingMessages outgoing;
uint32_t capacity;
uint32_t window;
+ bool flushed;
- void checkPendingSends();
+ uint32_t checkPendingSends(bool flush);
void replay();
+ void waitForCapacity();
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
void cancelImpl();
+
//functors for application visible methods (allowing locking and
//retry to be centralised):
struct Command
@@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
struct Send : Command
{
const qpid::messaging::Message* message;
-
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
- void operator()() { impl.sendImpl(*message); }
+ bool repeat;
+
+ Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ void operator()()
+ {
+ impl.waitForCapacity();
+ //from this point message will be recorded if there is any
+ //failure (and replayed) so need not repeat the call
+ repeat = false;
+ impl.sendImpl(*message);
+ }
};
struct Cancel : Command
@@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl
void operator()() { impl.cancelImpl(); }
};
+ struct CheckPendingSends : Command
+ {
+ bool flush;
+ uint32_t pending;
+ CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+ void operator()() { pending = impl.checkPendingSends(flush); }
+ };
+
//helper templates for some common patterns
template <class F> void execute()
{
@@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl
parent.execute(f);
}
- template <class F, class P> void execute1(P p)
+ template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ return parent.execute(f);
}
};
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 0e6c430d89..bc6289d84b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -298,6 +298,61 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t
}
}
+uint32_t SessionImpl::available()
+{
+ return get1<Available, uint32_t>((const std::string*) 0);
+}
+uint32_t SessionImpl::available(const std::string& destination)
+{
+ return get1<Available, uint32_t>(&destination);
+}
+
+struct SessionImpl::Available : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.availableImpl(destination); }
+};
+
+uint32_t SessionImpl::availableImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.available(*destination);
+ } else {
+ return incoming.available();
+ }
+}
+
+uint32_t SessionImpl::pendingAck()
+{
+ return get1<PendingAck, uint32_t>((const std::string*) 0);
+}
+
+uint32_t SessionImpl::pendingAck(const std::string& destination)
+{
+ return get1<PendingAck, uint32_t>(&destination);
+}
+
+struct SessionImpl::PendingAck : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.pendingAckImpl(destination); }
+};
+
+uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.pendingAccept(*destination);
+ } else {
+ return incoming.pendingAccept();
+ }
+}
+
void SessionImpl::syncImpl()
{
session.sync();
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 1c7db17bbb..b453f3f08f 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -83,6 +83,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
+ uint32_t available();
+ uint32_t available(const std::string& destination);
+
+ uint32_t pendingAck();
+ uint32_t pendingAck(const std::string& destination);
+
void setSession(qpid::client::Session);
template <class T> bool execute(T& f)
@@ -128,6 +134,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
const qpid::messaging::Filter* filter,
const qpid::messaging::VariantMap& options);
+ uint32_t availableImpl(const std::string* destination);
+ uint32_t pendingAckImpl(const std::string* destination);
//functors for public facing methods (allows locking and retry
//logic to be centralised)
@@ -178,6 +186,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct CreateSender;
struct CreateReceiver;
+ struct PendingAck;
+ struct Available;
//helper templates for some common patterns
template <class F> bool execute()
@@ -196,6 +206,13 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this, p);
return execute(f);
}
+
+ template <class F, class R, class P> R get1(P p)
+ {
+ F f(*this, p);
+ while (!execute(f)) {}
+ return f.result;
+ }
};
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 4cc977d14a..6873827b81 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -156,8 +156,17 @@ bool Connection::checkUnsupported(const AMQBody& body) {
return !message.empty();
}
+struct GiveReadCreditOnExit {
+ Connection& connection;
+ int credit;
+ GiveReadCreditOnExit(Connection& connection_, int credit_) :
+ connection(connection_), credit(credit_) {}
+ ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); }
+};
+
// Called in delivery thread, in cluster order.
void Connection::deliveredFrame(const EventFrame& f) {
+ GiveReadCreditOnExit gc(*this, f.readCredit);
assert(!catchUp);
currentChannel = f.frame.getChannel();
if (f.frame.getBody() // frame can be emtpy with just readCredit
@@ -171,7 +180,6 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
- giveReadCredit(f.readCredit);
}
// A local connection is closed by the network layer.
diff --git a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
index 277adaf7b1..507d9649b9 100644
--- a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
+++ b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,8 +33,8 @@ namespace {
boost::function<void()> errorFn;
-void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
- if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int /*arg*/) {
+ if (reason == CMAN_REASON_STATECHANGE && !cman_is_quorate(handle)) {
QPID_LOG(critical, "Lost contact with cluster quorum.");
if (errorFn) errorFn();
cman_stop_notification(handle);
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp
index ed35054a00..813a8e1377 100644
--- a/qpid/cpp/src/qpid/messaging/Address.cpp
+++ b/qpid/cpp/src/qpid/messaging/Address.cpp
@@ -21,9 +21,6 @@
#include "qpid/messaging/Address.h"
namespace qpid {
-namespace client {
-}
-
namespace messaging {
Address::Address() {}
diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp
index 2e8b89d27f..3290ea98ac 100644
--- a/qpid/cpp/src/qpid/messaging/Receiver.cpp
+++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp
@@ -45,6 +45,9 @@ Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeou
void Receiver::start() { impl->start(); }
void Receiver::stop() { impl->stop(); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
+uint32_t Receiver::available() { return impl->available(); }
+uint32_t Receiver::pendingAck() { return impl->pendingAck(); }
void Receiver::cancel() { impl->cancel(); }
void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); }
diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
index 77697b730c..7db20acc29 100644
--- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -44,6 +44,9 @@ class ReceiverImpl : public virtual qpid::RefCounted
virtual void start() = 0;
virtual void stop() = 0;
virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t available() = 0;
+ virtual uint32_t pendingAck() = 0;
virtual void cancel() = 0;
virtual void setListener(MessageListener*) = 0;
};
diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp
index 8db700b060..62b2944701 100644
--- a/qpid/cpp/src/qpid/messaging/Sender.cpp
+++ b/qpid/cpp/src/qpid/messaging/Sender.cpp
@@ -40,5 +40,8 @@ Sender::~Sender() { PI::dtor(*this); }
Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); }
void Sender::send(const Message& message) { impl->send(message); }
void Sender::cancel() { impl->cancel(); }
+void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); }
+uint32_t Sender::getCapacity() { return impl->getCapacity(); }
+uint32_t Sender::pending() { return impl->pending(); }
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h
index 77d2cfaeaf..fa3794ca4e 100644
--- a/qpid/cpp/src/qpid/messaging/SenderImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h
@@ -37,6 +37,9 @@ class SenderImpl : public virtual qpid::RefCounted
virtual ~SenderImpl() {}
virtual void send(const Message& message) = 0;
virtual void cancel() = 0;
+ virtual void setCapacity(uint32_t) = 0;
+ virtual uint32_t getCapacity() = 0;
+ virtual uint32_t pending() = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp
index 284b20dacc..62b1ca0dcf 100644
--- a/qpid/cpp/src/qpid/messaging/Session.cpp
+++ b/qpid/cpp/src/qpid/messaging/Session.cpp
@@ -103,15 +103,7 @@ bool Session::dispatch(qpid::sys::Duration timeout)
{
return impl->dispatch(timeout);
}
-
-void* Session::getLastConfirmedSent()
-{
- return impl->getLastConfirmedSent();
-}
-
-void* Session::getLastConfirmedAcknowledged()
-{
- return impl->getLastConfirmedAcknowledged();
-}
+uint32_t Session::available() { return impl->available(); }
+uint32_t Session::pendingAck() { return impl->pendingAck(); }
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h
index 9b122a24bc..0933cea9c8 100644
--- a/qpid/cpp/src/qpid/messaging/SessionImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h
@@ -56,8 +56,8 @@ class SessionImpl : public virtual qpid::RefCounted
virtual Sender createSender(const Address& address, const VariantMap& options) = 0;
virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0;
virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0;
- virtual void* getLastConfirmedSent() = 0;
- virtual void* getLastConfirmedAcknowledged() = 0;
+ virtual uint32_t available() = 0;
+ virtual uint32_t pendingAck() = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 52208d0519..9da6c835ce 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,6 +45,9 @@ using qpid::sys::Duration;
using qpid::sys::TIME_SEC;
using qpid::sys::TIME_INFINITE;
+namespace qpid {
+namespace tests {
+
// count of messages
int64_t smsgs = 0;
int64_t sbytes = 0;
@@ -144,6 +147,10 @@ void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, con
p->shutdown();
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char* argv[]) {
vector<string> args(&argv[0], &argv[argc]);
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index 1ab5268596..07d6379362 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,9 @@ using qpid::sys::Poller;
using qpid::sys::Dispatcher;
// All the accepted connections
+namespace qpid {
+namespace tests {
+
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
@@ -134,6 +137,10 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
cr->data->start(poller);
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char* argv[]) {
vector<string> args(&argv[0], &argv[argc]);
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
index a41c8840ff..8a1ef6149e 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
@@ -156,7 +156,7 @@ bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::F
// This will parse the document using either Xerces or FastXDM, depending
// on your XQilla configuration. FastXDM can be as much as 10x faster.
-
+
Sequence seq(context->parseDocument(xml));
if(!seq.isEmpty() && seq.first()->isNode()) {
@@ -206,11 +206,11 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
- {
+ {
RWlock::ScopedRlock l(lock);
- p = bindingsMap[routingKey].snapshot();
- if (!p) return;
- }
+ p = bindingsMap[routingKey].snapshot();
+ if (!p) return;
+ }
int count(0);
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
@@ -222,24 +222,24 @@ void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldT
if ((*i)->mgmtBinding != 0)
(*i)->mgmtBinding->inc_msgMatched ();
}
- }
- if (!count) {
- QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- }
+ }
+ if (!count) {
+ QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ } else {
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ }
} catch (...) {
QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}