summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/managementgen/schema.py133
-rw-r--r--cpp/managementgen/templates/Class.cpp4
-rw-r--r--cpp/managementgen/templates/Class.h8
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp16
-rw-r--r--cpp/src/qpid/broker/Connection.cpp58
-rw-r--r--cpp/src/qpid/broker/Connection.h5
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h2
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/Link.cpp4
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp30
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h5
-rw-r--r--cpp/src/qpid/broker/Queue.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp8
-rw-r--r--cpp/src/qpid/broker/XmlExchange.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp2
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp3
-rw-r--r--cpp/src/qpid/management/ManagementObject.h1
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h52
-rwxr-xr-xcpp/src/tests/federation.py2
23 files changed, 244 insertions, 138 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py
index 7a9f23ea76..6e48003ab2 100755
--- a/cpp/managementgen/schema.py
+++ b/cpp/managementgen/schema.py
@@ -94,9 +94,23 @@ class SchemaType:
if changeFlag != None:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
+ elif self.accessor == "counterByOne":
+ stream.write (" inline void inc_" + varName + " (){\n");
+ stream.write (" ++" + varName + ";\n")
+ if changeFlag != None:
+ stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" }\n");
+ stream.write (" inline void dec_" + varName + " (){\n");
+ stream.write (" --" + varName + ";\n")
+ if changeFlag != None:
+ stream.write (" " + changeFlag + " = true;\n")
+ stream.write (" }\n");
elif self.accessor == "counter":
stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n");
- stream.write (" " + varName + " += by;\n")
+ stream.write (" if (by == 1)\n")
+ stream.write (" ++" + varName + ";\n")
+ stream.write (" else\n")
+ stream.write (" " + varName + " += by;\n")
if self.style == "wm":
stream.write (" if (" + varName + "High < " + varName + ")\n")
stream.write (" " + varName + "High = " + varName + ";\n")
@@ -104,7 +118,10 @@ class SchemaType:
stream.write (" " + changeFlag + " = true;\n")
stream.write (" }\n");
stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n");
- stream.write (" " + varName + " -= by;\n")
+ stream.write (" if (by == 1)\n")
+ stream.write (" " + varName + "--;\n")
+ stream.write (" else\n")
+ stream.write (" " + varName + " -= by;\n")
if self.style == "wm":
stream.write (" if (" + varName + "Low > " + varName + ")\n")
stream.write (" " + varName + "Low = " + varName + ";\n")
@@ -196,16 +213,18 @@ class Type:
#=====================================================================================
class SchemaConfig:
def __init__ (self, node, typespec):
- self.name = None
- self.type = None
- self.access = "RO"
- self.isIndex = 0
- self.isParentRef = 0
- self.unit = None
- self.min = None
- self.max = None
- self.maxLen = None
- self.desc = None
+ self.name = None
+ self.type = None
+ self.ref = None
+ self.access = "RO"
+ self.isIndex = 0
+ self.isParentRef = 0
+ self.isGeneralRef = 0
+ self.unit = None
+ self.min = None
+ self.max = None
+ self.maxLen = None
+ self.desc = None
attrs = node.attributes
for idx in range (attrs.length):
@@ -216,6 +235,9 @@ class SchemaConfig:
elif key == 'type':
self.type = Type (val, typespec)
+
+ elif key == 'references':
+ self.ref = val
elif key == 'access':
self.access = val
@@ -230,6 +252,11 @@ class SchemaConfig:
raise ValueError ("Expected 'y' in parentRef attribute")
self.isParentRef = 1
+ elif key == 'isGeneralReference':
+ if val != 'y':
+ raise ValueError ("Expected 'y' in isGeneralReference attribute")
+ self.isGeneralRef = 1
+
elif key == 'unit':
self.unit = val
@@ -246,12 +273,12 @@ class SchemaConfig:
self.desc = val
else:
- raise ValueError ("Unknown attribute in configElement '%s'" % key)
+ raise ValueError ("Unknown attribute in property '%s'" % key)
if self.name == None:
- raise ValueError ("Missing 'name' attribute in configElement")
+ raise ValueError ("Missing 'name' attribute in property")
if self.type == None:
- raise ValueError ("Missing 'type' attribute in configElement")
+ raise ValueError ("Missing 'type' attribute in property")
def getName (self):
return self.name
@@ -319,12 +346,12 @@ class SchemaInst:
self.desc = val
else:
- raise ValueError ("Unknown attribute in instElement '%s'" % key)
+ raise ValueError ("Unknown attribute in statistic '%s'" % key)
if self.name == None:
- raise ValueError ("Missing 'name' attribute in instElement")
+ raise ValueError ("Missing 'name' attribute in statistic")
if self.type == None:
- raise ValueError ("Missing 'type' attribute in instElement")
+ raise ValueError ("Missing 'type' attribute in statistic")
def getName (self):
return self.name
@@ -388,6 +415,8 @@ class SchemaInst:
def genInitialize (self, stream):
val = self.type.type.init
+ if self.type.type.accessor == "counterByOne":
+ return
if self.type.type.style != "mma":
stream.write (" " + self.name + " = " + val + ";\n")
if self.type.type.style == "wm":
@@ -589,13 +618,13 @@ class SchemaEvent:
class SchemaClass:
def __init__ (self, package, node, typespec, fragments, options):
- self.packageName = package
- self.configElements = []
- self.instElements = []
- self.methods = []
- self.events = []
- self.options = options
- self.md5Sum = md5.new ()
+ self.packageName = package
+ self.properties = []
+ self.statistics = []
+ self.methods = []
+ self.events = []
+ self.options = options
+ self.md5Sum = md5.new ()
self.hash (node)
@@ -605,13 +634,13 @@ class SchemaClass:
children = node.childNodes
for child in children:
if child.nodeType == Node.ELEMENT_NODE:
- if child.nodeName == 'configElement':
+ if child.nodeName == 'property':
sub = SchemaConfig (child, typespec)
- self.configElements.append (sub)
+ self.properties.append (sub)
- elif child.nodeName == 'instElement':
+ elif child.nodeName == 'statistic':
sub = SchemaInst (child, typespec)
- self.instElements.append (sub)
+ self.statistics.append (sub)
elif child.nodeName == 'method':
sub = SchemaMethod (self, child, typespec)
@@ -644,10 +673,10 @@ class SchemaClass:
name = attrs['name'].nodeValue
for fragment in fragments:
if fragment.name == name:
- for config in fragment.configElements:
- self.configElements.append (config)
- for inst in fragment.instElements:
- self.instElements.append (inst)
+ for config in fragment.properties:
+ self.properties.append (config)
+ for inst in fragment.statistics:
+ self.statistics.append (inst)
for method in fragment.methods:
self.methods.append (method)
for event in fragment.events:
@@ -675,33 +704,33 @@ class SchemaClass:
# match the substitution keywords in the template files.
#===================================================================================
def genAccessorMethods (self, stream, variables):
- for config in self.configElements:
+ for config in self.properties:
if config.access != "RC":
config.genAccessor (stream)
- for inst in self.instElements:
+ for inst in self.statistics:
inst.genAccessor (stream)
def genConfigCount (self, stream, variables):
- stream.write ("%d" % len (self.configElements))
+ stream.write ("%d" % len (self.properties))
def genConfigDeclarations (self, stream, variables):
- for element in self.configElements:
+ for element in self.properties:
element.genDeclaration (stream)
def genConfigElementSchema (self, stream, variables):
- for config in self.configElements:
+ for config in self.properties:
config.genSchema (stream)
def genConstructorArgs (self, stream, variables):
# Constructor args are config elements with read-create access
result = ""
- for element in self.configElements:
+ for element in self.properties:
if element.isConstructorArg ():
stream.write (", ")
element.genFormalParam (stream)
def genConstructorInits (self, stream, variables):
- for element in self.configElements:
+ for element in self.properties:
if element.isConstructorArg ():
stream.write ("," + element.getName () + "(_" + element.getName () + ")")
@@ -729,21 +758,21 @@ class SchemaClass:
pass ###########################################################################
def genHiLoStatResets (self, stream, variables):
- for inst in self.instElements:
+ for inst in self.statistics:
inst.genHiLoStatResets (stream)
def genInitializeElements (self, stream, variables):
- for inst in self.instElements:
+ for inst in self.statistics:
inst.genInitialize (stream)
def genInstChangedStub (self, stream, variables):
- if len (self.instElements) == 0:
+ if len (self.statistics) == 0:
stream.write (" // Stub for getInstChanged. There are no inst elements\n")
stream.write (" bool getInstChanged (void) { return false; }\n")
def genInstCount (self, stream, variables):
count = 0
- for inst in self.instElements:
+ for inst in self.statistics:
count = count + 1
if inst.type.type.style == "wm":
count = count + 2
@@ -752,11 +781,11 @@ class SchemaClass:
stream.write ("%d" % count)
def genInstDeclarations (self, stream, variables):
- for element in self.instElements:
+ for element in self.statistics:
element.genDeclaration (stream)
def genInstElementSchema (self, stream, variables):
- for inst in self.instElements:
+ for inst in self.statistics:
inst.genSchema (stream)
def genMethodArgIncludes (self, stream, variables):
@@ -794,6 +823,10 @@ class SchemaClass:
arg.name, "outBuf") + ";\n")
stream.write (" return;\n }\n")
+ def genSetGeneralReferenceDeclaration (self, stream, variables):
+ for prop in self.properties:
+ if prop.isGeneralRef:
+ stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n")
def genMethodIdDeclarations (self, stream, variables):
number = 1
@@ -822,13 +855,13 @@ class SchemaClass:
stream.write (self.name.upper ())
def genParentArg (self, stream, variables):
- for config in self.configElements:
+ for config in self.properties:
if config.isParentRef == 1:
stream.write (", Manageable* _parent")
return
def genParentRefAssignment (self, stream, variables):
- for config in self.configElements:
+ for config in self.properties:
if config.isParentRef == 1:
stream.write (config.getName () + \
" = _parent->GetManagementObject ()->getObjectId ();")
@@ -842,11 +875,11 @@ class SchemaClass:
stream.write (hex (ord (sum[idx])))
def genWriteConfig (self, stream, variables):
- for config in self.configElements:
+ for config in self.properties:
config.genWrite (stream);
def genWriteInst (self, stream, variables):
- for inst in self.instElements:
+ for inst in self.statistics:
inst.genWrite (stream);
diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp
index 733e29188e..699d8217b6 100644
--- a/cpp/managementgen/templates/Class.cpp
+++ b/cpp/managementgen/templates/Class.cpp
@@ -75,9 +75,9 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count
buf.putShort (/*MGEN:Class.EventCount*/); // Event Count
- // Config Elements
+ // Properties
/*MGEN:Class.ConfigElementSchema*/
- // Inst Elements
+ // Statistics
/*MGEN:Class.InstElementSchema*/
// Methods
/*MGEN:Class.MethodSchema*/
diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h
index 628a70d2d9..441571a174 100644
--- a/cpp/managementgen/templates/Class.h
+++ b/cpp/managementgen/templates/Class.h
@@ -26,6 +26,7 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/sys/AtomicCount.h"
namespace qpid {
namespace management {
@@ -38,9 +39,9 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
static std::string className;
static uint8_t md5Sum[16];
- // Configuration Elements
+ // Properties
/*MGEN:Class.ConfigDeclarations*/
- // Instrumentation Elements
+ // Statistics
/*MGEN:Class.InstDeclarations*/
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
@@ -61,6 +62,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
/*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
~/*MGEN:Class.NameCap*/ (void);
+ /*MGEN:Class.SetGeneralReferenceDeclaration*/
+
std::string getPackageName (void) { return packageName; }
std::string getClassName (void) { return className; }
uint8_t* getMd5Sum (void) { return md5Sum; }
@@ -72,6 +75,5 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject
};
}}
-
#endif /*!_MANAGEMENT_/*MGEN:Class.NameUpper*/_*/
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index b0e006aebc..53b828a709 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -528,6 +528,7 @@ nobase_include_HEADERS = \
qpid/sys/AggregateOutput.h \
qpid/sys/AsynchIO.h \
qpid/sys/AsynchIOHandler.h \
+ qpid/sys/AtomicCount.h \
qpid/sys/AtomicValue.h \
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index f3e103dfaf..18b2c52dad 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -39,7 +39,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const management::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args),
mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest,
- args.i_key, args.i_src_is_queue, args.i_src_is_local,
+ args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes)),
listener(l), name(Uuid(true).str()), persistenceId(0)
{
@@ -61,10 +61,10 @@ void Bridge::create(ConnectionState& c)
session->attach(name, false);
session->commandPoint(0,0);
- if (args.i_src_is_local) {
+ if (args.i_srcIsLocal) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
- if (args.i_src_is_queue) {
+ if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
@@ -79,7 +79,7 @@ void Bridge::create(ConnectionState& c)
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
}
- bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues?
+ bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
@@ -150,8 +150,8 @@ void Bridge::encode(Buffer& buffer) const
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
buffer.putShortString(args.i_key);
- buffer.putOctet(args.i_src_is_queue ? 1 : 0);
- buffer.putOctet(args.i_src_is_local ? 1 : 0);
+ buffer.putOctet(args.i_srcIsQueue ? 1 : 0);
+ buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
}
@@ -165,8 +165,8 @@ uint32_t Bridge::encodedSize() const
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
+ args.i_key.size() + 1
- + 1 // src_is_queue
- + 1 // src_is_local
+ + 1 // srcIsQueue
+ + 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1;
}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ea3d3547f5..61319f3c09 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -55,34 +55,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtId(mgmtId_),
links(broker_.getLinks())
{
- Manageable* parent = broker.GetVhostObject ();
+ Manageable* parent = broker.GetVhostObject();
if (isLink)
- links.notifyConnection (mgmtId, this);
+ links.notifyConnection(mgmtId, this);
if (parent != 0)
{
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent();
- if (agent.get () != 0)
- mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
- agent->addObject (mgmtObject);
+ if (agent.get() != 0)
+ mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink));
+ agent->addObject(mgmtObject);
}
}
-void Connection::requestIOProcessing (boost::function0<void> callback)
+void Connection::requestIOProcessing(boost::function0<void> callback)
{
ioCallback = callback;
out->activateOutput();
}
-Connection::~Connection ()
+Connection::~Connection()
{
if (mgmtObject.get() != 0)
mgmtObject->resourceDestroy();
if (isLink)
- links.notifyClosed (mgmtId);
+ links.notifyClosed(mgmtId);
}
void Connection::received(framing::AMQFrame& frame){
@@ -98,21 +98,21 @@ void Connection::received(framing::AMQFrame& frame){
recordFromClient(frame);
}
-void Connection::recordFromServer (framing::AMQFrame& frame)
+void Connection::recordFromServer(framing::AMQFrame& frame)
{
- if (mgmtObject.get () != 0)
+ if (mgmtObject.get() != 0)
{
- mgmtObject->inc_framesToClient ();
- mgmtObject->inc_bytesToClient (frame.size ());
+ mgmtObject->inc_framesToClient();
+ mgmtObject->inc_bytesToClient(frame.size());
}
}
-void Connection::recordFromClient (framing::AMQFrame& frame)
+void Connection::recordFromClient(framing::AMQFrame& frame)
{
- if (mgmtObject.get () != 0)
+ if (mgmtObject.get() != 0)
{
- mgmtObject->inc_framesFromClient ();
- mgmtObject->inc_bytesFromClient (frame.size ());
+ mgmtObject->inc_framesFromClient();
+ mgmtObject->inc_bytesFromClient(frame.size());
}
}
@@ -129,6 +129,14 @@ string Connection::getAuthCredentials()
if (!isLink)
return string();
+ if (mgmtObject.get() != 0)
+ {
+ if (links.getAuthMechanism(mgmtId) == "ANONYMOUS")
+ mgmtObject->set_authIdentity("anonymous");
+ else
+ mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId));
+ }
+
return links.getAuthCredentials(mgmtId);
}
@@ -138,6 +146,12 @@ void Connection::notifyConnectionForced(const string& text)
links.notifyConnectionForced(mgmtId, text);
}
+void Connection::setUserId(const string& userId)
+{
+ ConnectionState::setUserId(userId);
+ mgmtObject->set_authIdentity(userId);
+}
+
void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
@@ -177,7 +191,7 @@ bool Connection::doOutput()
ioCallback = 0;
if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
+ close(403, "Closed by Management Request", 0, 0);
else
//then do other output as needed:
return outputTasks.doOutput();
@@ -202,20 +216,20 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *ptr_map_ptr(i);
}
-ManagementObject::shared_ptr Connection::GetManagementObject (void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
{
return dynamic_pointer_cast<ManagementObject>(mgmtObject);
}
-Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
+Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]");
+ QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]");
switch (methodId)
{
- case management::Client::METHOD_CLOSE :
+ case management::Connection::METHOD_CLOSE :
mgmtClosing = true;
if (mgmtObject.get()) mgmtObject->set_closing(1);
out->activateOutput();
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index e6e3d4d15e..9e713140dd 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -42,7 +42,7 @@
#include "ConnectionState.h"
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/Client.h"
+#include "qpid/management/Connection.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -88,6 +88,7 @@ class Connection : public sys::ConnectionInputHandler,
std::string getAuthMechanism();
std::string getAuthCredentials();
void notifyConnectionForced(const std::string& text);
+ void setUserId(const string& uid);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
@@ -100,7 +101,7 @@ class Connection : public sys::ConnectionInputHandler,
bool mgmtClosing;
const std::string mgmtId;
boost::function0<void> ioCallback;
- management::Client::shared_ptr mgmtObject;
+ management::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
};
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 691d47d866..698f8123e8 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -56,7 +56,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
- void setUserId(const string& uid) { userId = uid; }
+ virtual void setUserId(const string& uid) { userId = uid; }
const string& getUserId() const { return userId; }
Broker& getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 72021b8d98..84a5362766 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -54,8 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
Binding::shared_ptr binding (new Binding (routingKey, queue, this));
bindings[routingKey].push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else{
@@ -78,8 +78,8 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
bindings.erase(routingKey);
}
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index df723d2c8f..3483562292 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -53,8 +53,8 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/,
Binding::shared_ptr binding (new Binding ("", queue, this));
bindings.push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else {
@@ -73,8 +73,8 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
if (i != bindings.end()) {
bindings.erase(i);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 5196099ed5..20d9617c8f 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -90,8 +90,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
bindings.push_back(headerMap);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
} else {
@@ -115,8 +115,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
if (i != bindings.end()) {
bindings.erase(i);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 6bcfcf77a3..08b9d8fe3e 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -341,8 +341,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
- iargs.i_dest, iargs.i_key, iargs.i_src_is_queue,
- iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes);
+ iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
+ iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes);
if (result.second && iargs.i_durable)
store->create(*result.first);
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 455cc8452e..0703c276cf 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -87,8 +87,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
std::string& src,
std::string& dest,
std::string& key,
- bool is_queue,
- bool is_local,
+ bool isQueue,
+ bool isLocal,
std::string& tag,
std::string& excludes)
{
@@ -110,14 +110,14 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
management::ArgsLinkBridge args;
Bridge::shared_ptr bridge;
- args.i_durable = durable;
- args.i_src = src;
- args.i_dest = dest;
- args.i_key = key;
- args.i_src_is_queue = is_queue;
- args.i_src_is_local = is_local;
- args.i_tag = tag;
- args.i_excludes = excludes;
+ args.i_durable = durable;
+ args.i_src = src;
+ args.i_dest = dest;
+ args.i_key = key;
+ args.i_srcIsQueue = isQueue;
+ args.i_srcIsLocal = isLocal;
+ args.i_tag = tag;
+ args.i_excludes = excludes;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
@@ -237,4 +237,14 @@ std::string LinkRegistry::getAuthCredentials(const std::string& key)
return result;
}
+std::string LinkRegistry::getAuthIdentity(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l == links.end())
+ return string();
+
+ return l->second->getUsername();
+}
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index f902490ed3..242c0d58ba 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -81,8 +81,8 @@ namespace broker {
std::string& src,
std::string& dest,
std::string& key,
- bool is_queue,
- bool is_local,
+ bool isQueue,
+ bool isLocal,
std::string& id,
std::string& excludes);
@@ -113,6 +113,7 @@ namespace broker {
void notifyConnectionForced (const std::string& key, const std::string& text);
std::string getAuthMechanism (const std::string& key);
std::string getAuthCredentials (const std::string& key);
+ std::string getAuthIdentity (const std::string& key);
};
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 2c9717caa0..0b26762697 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -130,19 +130,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete();
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
}
}else {
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
@@ -157,13 +153,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
}
if (store && !msg->isContentLoaded()) {
@@ -176,13 +170,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject.get() != 0) {
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
mgmtObject->inc_msgDepth ();
- mgmtObject->inc_byteDepth (msg->contentSize ());
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
@@ -367,8 +359,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){
consumerCount++;
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
- mgmtObject->inc_consumers ();
+ mgmtObject->inc_consumerCount ();
}
}
@@ -378,8 +369,7 @@ void Queue::cancel(Consumer& c){
consumerCount--;
if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
- mgmtObject->dec_consumers ();
+ mgmtObject->dec_consumerCount ();
}
}
@@ -409,11 +399,9 @@ void Queue::pop(){
if (policy.get()) policy->dequeued(msg.payload->contentSize());
if (mgmtObject.get() != 0){
- sys::Mutex::ScopedLock mutex(mgmtObject->getLock());
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
mgmtObject->dec_msgDepth ();
- mgmtObject->dec_byteDepth (msg.payload->contentSize());
if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
@@ -682,7 +670,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
if (inst) {
ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj.get() != 0)
- mgmtObject->set_storeRef(childObj->getObjectId());
+ childObj->setReference(mgmtObject->getObjectId());
}
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dd8267a7d8..1ddb3f3026 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -111,7 +111,7 @@ void SessionState::attach(SessionHandler& h) {
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
}
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 1c4fa2ea7a..a16421b090 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -138,8 +138,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
Binding::shared_ptr binding (new Binding (routingKey, queue, this));
bindings[routingPattern].push_back(binding);
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings();
+ mgmtExchange->inc_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount();
}
return true;
}
@@ -159,8 +159,8 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
- dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings();
+ mgmtExchange->dec_bindingCount();
+ dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount();
}
return true;
}
diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp
index 1d8f2ae8d8..8c4d4f79a4 100644
--- a/cpp/src/qpid/broker/XmlExchange.cpp
+++ b/cpp/src/qpid/broker/XmlExchange.cpp
@@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const
QPID_LOG(trace, "Bound successfully with query: " << queryText );
if (mgmtExchange.get() != 0) {
- mgmtExchange->inc_bindings ();
+ mgmtExchange->inc_bindingCount();
}
return true;
} else{
@@ -128,7 +128,7 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons
bindingsMap.erase(routingKey);
}
if (mgmtExchange.get() != 0) {
- mgmtExchange->dec_bindings ();
+ mgmtExchange->dec_bindingCount();
}
return true;
} else {
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 0ddbd62350..a2802cf932 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -630,7 +630,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_sessionId (sessionId);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_sysId (systemId);
+ agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
remoteAgents[sessionId] = agent;
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 6af5412b99..68d7e5c886 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -37,3 +37,6 @@ void ManagementObject::writeTimestamps (Buffer& buf)
buf.putLongLong (destroyTime);
buf.putLongLong (objectId);
}
+
+void ManagementObject::setReference(uint64_t) {}
+
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 2661cf2d96..1d54d606a4 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -92,6 +92,7 @@ class ManagementObject
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
+ virtual void setReference(uint64_t objectId);
virtual std::string getClassName (void) = 0;
virtual std::string getPackageName (void) = 0;
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
new file mode 100644
index 0000000000..d598b49427
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicCount.h
@@ -0,0 +1,52 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include "ScopedIncrement.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount {
+ public:
+ typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement;
+ typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement;
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 0f52165587..b92df89839 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -172,7 +172,7 @@ class FederationTests(TestBase010):
link = mgmt.get_object("link")
mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout",
- "key":"", "tag":"", "excludes":"", "src_is_queue":1})
+ "key":"", "tag":"", "excludes":"", "srcIsQueue":1})
sleep(6)
bridge = mgmt.get_object("bridge")