diff options
Diffstat (limited to 'cpp')
23 files changed, 244 insertions, 138 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py index 7a9f23ea76..6e48003ab2 100755 --- a/cpp/managementgen/schema.py +++ b/cpp/managementgen/schema.py @@ -94,9 +94,23 @@ class SchemaType: if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); + elif self.accessor == "counterByOne": + stream.write (" inline void inc_" + varName + " (){\n"); + stream.write (" ++" + varName + ";\n") + if changeFlag != None: + stream.write (" " + changeFlag + " = true;\n") + stream.write (" }\n"); + stream.write (" inline void dec_" + varName + " (){\n"); + stream.write (" --" + varName + ";\n") + if changeFlag != None: + stream.write (" " + changeFlag + " = true;\n") + stream.write (" }\n"); elif self.accessor == "counter": stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" " + varName + " += by;\n") + stream.write (" if (by == 1)\n") + stream.write (" ++" + varName + ";\n") + stream.write (" else\n") + stream.write (" " + varName + " += by;\n") if self.style == "wm": stream.write (" if (" + varName + "High < " + varName + ")\n") stream.write (" " + varName + "High = " + varName + ";\n") @@ -104,7 +118,10 @@ class SchemaType: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" " + varName + " -= by;\n") + stream.write (" if (by == 1)\n") + stream.write (" " + varName + "--;\n") + stream.write (" else\n") + stream.write (" " + varName + " -= by;\n") if self.style == "wm": stream.write (" if (" + varName + "Low > " + varName + ")\n") stream.write (" " + varName + "Low = " + varName + ";\n") @@ -196,16 +213,18 @@ class Type: #===================================================================================== class SchemaConfig: def __init__ (self, node, typespec): - self.name = None - self.type = None - self.access = "RO" - self.isIndex = 0 - self.isParentRef = 0 - self.unit = None - self.min = None - self.max = None - self.maxLen = None - self.desc = None + self.name = None + self.type = None + self.ref = None + self.access = "RO" + self.isIndex = 0 + self.isParentRef = 0 + self.isGeneralRef = 0 + self.unit = None + self.min = None + self.max = None + self.maxLen = None + self.desc = None attrs = node.attributes for idx in range (attrs.length): @@ -216,6 +235,9 @@ class SchemaConfig: elif key == 'type': self.type = Type (val, typespec) + + elif key == 'references': + self.ref = val elif key == 'access': self.access = val @@ -230,6 +252,11 @@ class SchemaConfig: raise ValueError ("Expected 'y' in parentRef attribute") self.isParentRef = 1 + elif key == 'isGeneralReference': + if val != 'y': + raise ValueError ("Expected 'y' in isGeneralReference attribute") + self.isGeneralRef = 1 + elif key == 'unit': self.unit = val @@ -246,12 +273,12 @@ class SchemaConfig: self.desc = val else: - raise ValueError ("Unknown attribute in configElement '%s'" % key) + raise ValueError ("Unknown attribute in property '%s'" % key) if self.name == None: - raise ValueError ("Missing 'name' attribute in configElement") + raise ValueError ("Missing 'name' attribute in property") if self.type == None: - raise ValueError ("Missing 'type' attribute in configElement") + raise ValueError ("Missing 'type' attribute in property") def getName (self): return self.name @@ -319,12 +346,12 @@ class SchemaInst: self.desc = val else: - raise ValueError ("Unknown attribute in instElement '%s'" % key) + raise ValueError ("Unknown attribute in statistic '%s'" % key) if self.name == None: - raise ValueError ("Missing 'name' attribute in instElement") + raise ValueError ("Missing 'name' attribute in statistic") if self.type == None: - raise ValueError ("Missing 'type' attribute in instElement") + raise ValueError ("Missing 'type' attribute in statistic") def getName (self): return self.name @@ -388,6 +415,8 @@ class SchemaInst: def genInitialize (self, stream): val = self.type.type.init + if self.type.type.accessor == "counterByOne": + return if self.type.type.style != "mma": stream.write (" " + self.name + " = " + val + ";\n") if self.type.type.style == "wm": @@ -589,13 +618,13 @@ class SchemaEvent: class SchemaClass: def __init__ (self, package, node, typespec, fragments, options): - self.packageName = package - self.configElements = [] - self.instElements = [] - self.methods = [] - self.events = [] - self.options = options - self.md5Sum = md5.new () + self.packageName = package + self.properties = [] + self.statistics = [] + self.methods = [] + self.events = [] + self.options = options + self.md5Sum = md5.new () self.hash (node) @@ -605,13 +634,13 @@ class SchemaClass: children = node.childNodes for child in children: if child.nodeType == Node.ELEMENT_NODE: - if child.nodeName == 'configElement': + if child.nodeName == 'property': sub = SchemaConfig (child, typespec) - self.configElements.append (sub) + self.properties.append (sub) - elif child.nodeName == 'instElement': + elif child.nodeName == 'statistic': sub = SchemaInst (child, typespec) - self.instElements.append (sub) + self.statistics.append (sub) elif child.nodeName == 'method': sub = SchemaMethod (self, child, typespec) @@ -644,10 +673,10 @@ class SchemaClass: name = attrs['name'].nodeValue for fragment in fragments: if fragment.name == name: - for config in fragment.configElements: - self.configElements.append (config) - for inst in fragment.instElements: - self.instElements.append (inst) + for config in fragment.properties: + self.properties.append (config) + for inst in fragment.statistics: + self.statistics.append (inst) for method in fragment.methods: self.methods.append (method) for event in fragment.events: @@ -675,33 +704,33 @@ class SchemaClass: # match the substitution keywords in the template files. #=================================================================================== def genAccessorMethods (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.access != "RC": config.genAccessor (stream) - for inst in self.instElements: + for inst in self.statistics: inst.genAccessor (stream) def genConfigCount (self, stream, variables): - stream.write ("%d" % len (self.configElements)) + stream.write ("%d" % len (self.properties)) def genConfigDeclarations (self, stream, variables): - for element in self.configElements: + for element in self.properties: element.genDeclaration (stream) def genConfigElementSchema (self, stream, variables): - for config in self.configElements: + for config in self.properties: config.genSchema (stream) def genConstructorArgs (self, stream, variables): # Constructor args are config elements with read-create access result = "" - for element in self.configElements: + for element in self.properties: if element.isConstructorArg (): stream.write (", ") element.genFormalParam (stream) def genConstructorInits (self, stream, variables): - for element in self.configElements: + for element in self.properties: if element.isConstructorArg (): stream.write ("," + element.getName () + "(_" + element.getName () + ")") @@ -729,21 +758,21 @@ class SchemaClass: pass ########################################################################### def genHiLoStatResets (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genHiLoStatResets (stream) def genInitializeElements (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genInitialize (stream) def genInstChangedStub (self, stream, variables): - if len (self.instElements) == 0: + if len (self.statistics) == 0: stream.write (" // Stub for getInstChanged. There are no inst elements\n") stream.write (" bool getInstChanged (void) { return false; }\n") def genInstCount (self, stream, variables): count = 0 - for inst in self.instElements: + for inst in self.statistics: count = count + 1 if inst.type.type.style == "wm": count = count + 2 @@ -752,11 +781,11 @@ class SchemaClass: stream.write ("%d" % count) def genInstDeclarations (self, stream, variables): - for element in self.instElements: + for element in self.statistics: element.genDeclaration (stream) def genInstElementSchema (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genSchema (stream) def genMethodArgIncludes (self, stream, variables): @@ -794,6 +823,10 @@ class SchemaClass: arg.name, "outBuf") + ";\n") stream.write (" return;\n }\n") + def genSetGeneralReferenceDeclaration (self, stream, variables): + for prop in self.properties: + if prop.isGeneralRef: + stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n") def genMethodIdDeclarations (self, stream, variables): number = 1 @@ -822,13 +855,13 @@ class SchemaClass: stream.write (self.name.upper ()) def genParentArg (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.isParentRef == 1: stream.write (", Manageable* _parent") return def genParentRefAssignment (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.isParentRef == 1: stream.write (config.getName () + \ " = _parent->GetManagementObject ()->getObjectId ();") @@ -842,11 +875,11 @@ class SchemaClass: stream.write (hex (ord (sum[idx]))) def genWriteConfig (self, stream, variables): - for config in self.configElements: + for config in self.properties: config.genWrite (stream); def genWriteInst (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genWrite (stream); diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 733e29188e..699d8217b6 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -75,9 +75,9 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count buf.putShort (/*MGEN:Class.EventCount*/); // Event Count - // Config Elements + // Properties /*MGEN:Class.ConfigElementSchema*/ - // Inst Elements + // Statistics /*MGEN:Class.InstElementSchema*/ // Methods /*MGEN:Class.MethodSchema*/ diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index 628a70d2d9..441571a174 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -26,6 +26,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" +#include "qpid/sys/AtomicCount.h" namespace qpid { namespace management { @@ -38,9 +39,9 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject static std::string className; static uint8_t md5Sum[16]; - // Configuration Elements + // Properties /*MGEN:Class.ConfigDeclarations*/ - // Instrumentation Elements + // Statistics /*MGEN:Class.InstDeclarations*/ // Private Methods static void writeSchema (qpid::framing::Buffer& buf); @@ -61,6 +62,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); + /*MGEN:Class.SetGeneralReferenceDeclaration*/ + std::string getPackageName (void) { return packageName; } std::string getClassName (void) { return className; } uint8_t* getMd5Sum (void) { return md5Sum; } @@ -72,6 +75,5 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject }; }} - #endif /*!_MANAGEMENT_/*MGEN:Class.NameUpper*/_*/ diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index b0e006aebc..53b828a709 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -528,6 +528,7 @@ nobase_include_HEADERS = \ qpid/sys/AggregateOutput.h \ qpid/sys/AsynchIO.h \ qpid/sys/AsynchIOHandler.h \ + qpid/sys/AtomicCount.h \ qpid/sys/AtomicValue.h \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index f3e103dfaf..18b2c52dad 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -39,7 +39,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_src_is_queue, args.i_src_is_local, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { @@ -61,10 +61,10 @@ void Bridge::create(ConnectionState& c) session->attach(name, false); session->commandPoint(0,0); - if (args.i_src_is_local) { + if (args.i_srcIsLocal) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { - if (args.i_src_is_queue) { + if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); @@ -79,7 +79,7 @@ void Bridge::create(ConnectionState& c) queueSettings.setString("qpid.trace.exclude", args.i_excludes); } - bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? + bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); @@ -150,8 +150,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); buffer.putShortString(args.i_key); - buffer.putOctet(args.i_src_is_queue ? 1 : 0); - buffer.putOctet(args.i_src_is_local ? 1 : 0); + buffer.putOctet(args.i_srcIsQueue ? 1 : 0); + buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); } @@ -165,8 +165,8 @@ uint32_t Bridge::encodedSize() const + args.i_src.size() + 1 + args.i_dest.size() + 1 + args.i_key.size() + 1 - + 1 // src_is_queue - + 1 // src_is_local + + 1 // srcIsQueue + + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ea3d3547f5..61319f3c09 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -55,34 +55,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), links(broker_.getLinks()) { - Manageable* parent = broker.GetVhostObject (); + Manageable* parent = broker.GetVhostObject(); if (isLink) - links.notifyConnection (mgmtId, this); + links.notifyConnection(mgmtId, this); if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); - if (agent.get () != 0) - mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); - agent->addObject (mgmtObject); + if (agent.get() != 0) + mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + agent->addObject(mgmtObject); } } -void Connection::requestIOProcessing (boost::function0<void> callback) +void Connection::requestIOProcessing(boost::function0<void> callback) { ioCallback = callback; out->activateOutput(); } -Connection::~Connection () +Connection::~Connection() { if (mgmtObject.get() != 0) mgmtObject->resourceDestroy(); if (isLink) - links.notifyClosed (mgmtId); + links.notifyClosed(mgmtId); } void Connection::received(framing::AMQFrame& frame){ @@ -98,21 +98,21 @@ void Connection::received(framing::AMQFrame& frame){ recordFromClient(frame); } -void Connection::recordFromServer (framing::AMQFrame& frame) +void Connection::recordFromServer(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesToClient (); - mgmtObject->inc_bytesToClient (frame.size ()); + mgmtObject->inc_framesToClient(); + mgmtObject->inc_bytesToClient(frame.size()); } } -void Connection::recordFromClient (framing::AMQFrame& frame) +void Connection::recordFromClient(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesFromClient (); - mgmtObject->inc_bytesFromClient (frame.size ()); + mgmtObject->inc_framesFromClient(); + mgmtObject->inc_bytesFromClient(frame.size()); } } @@ -129,6 +129,14 @@ string Connection::getAuthCredentials() if (!isLink) return string(); + if (mgmtObject.get() != 0) + { + if (links.getAuthMechanism(mgmtId) == "ANONYMOUS") + mgmtObject->set_authIdentity("anonymous"); + else + mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId)); + } + return links.getAuthCredentials(mgmtId); } @@ -138,6 +146,12 @@ void Connection::notifyConnectionForced(const string& text) links.notifyConnectionForced(mgmtId, text); } +void Connection::setUserId(const string& userId) +{ + ConnectionState::setUserId(userId); + mgmtObject->set_authIdentity(userId); +} + void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { @@ -177,7 +191,7 @@ bool Connection::doOutput() ioCallback = 0; if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); + close(403, "Closed by Management Request", 0, 0); else //then do other output as needed: return outputTasks.doOutput(); @@ -202,20 +216,20 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObject (void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { return dynamic_pointer_cast<ManagementObject>(mgmtObject); } -Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) +Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]"); switch (methodId) { - case management::Client::METHOD_CLOSE : + case management::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index e6e3d4d15e..9e713140dd 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -42,7 +42,7 @@ #include "ConnectionState.h" #include "SessionHandler.h" #include "qpid/management/Manageable.h" -#include "qpid/management/Client.h" +#include "qpid/management/Connection.h" #include <boost/ptr_container/ptr_map.hpp> @@ -88,6 +88,7 @@ class Connection : public sys::ConnectionInputHandler, std::string getAuthMechanism(); std::string getAuthCredentials(); void notifyConnectionForced(const std::string& text); + void setUserId(const string& uid); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; @@ -100,7 +101,7 @@ class Connection : public sys::ConnectionInputHandler, bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; - management::Client::shared_ptr mgmtObject; + management::Connection::shared_ptr mgmtObject; LinkRegistry& links; }; diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 691d47d866..698f8123e8 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -56,7 +56,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setStagingThreshold(uint64_t st) { stagingThreshold = st; } - void setUserId(const string& uid) { userId = uid; } + virtual void setUserId(const string& uid) { userId = uid; } const string& getUserId() const { return userId; } Broker& getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 72021b8d98..84a5362766 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -54,8 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingKey].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else{ @@ -78,8 +78,8 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c bindings.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index df723d2c8f..3483562292 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -53,8 +53,8 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, Binding::shared_ptr binding (new Binding ("", queue, this)); bindings.push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -73,8 +73,8 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 5196099ed5..20d9617c8f 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -90,8 +90,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co bindings.push_back(headerMap); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -115,8 +115,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6bcfcf77a3..08b9d8fe3e 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -341,8 +341,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_src_is_queue, - iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes); + iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes); if (result.second && iargs.i_durable) store->create(*result.first); diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 455cc8452e..0703c276cf 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -87,8 +87,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& tag, std::string& excludes) { @@ -110,14 +110,14 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, management::ArgsLinkBridge args; Bridge::shared_ptr bridge; - args.i_durable = durable; - args.i_src = src; - args.i_dest = dest; - args.i_key = key; - args.i_src_is_queue = is_queue; - args.i_src_is_local = is_local; - args.i_tag = tag; - args.i_excludes = excludes; + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_srcIsQueue = isQueue; + args.i_srcIsLocal = isLocal; + args.i_tag = tag; + args.i_excludes = excludes; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), @@ -237,4 +237,14 @@ std::string LinkRegistry::getAuthCredentials(const std::string& key) return result; } +std::string LinkRegistry::getAuthIdentity(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l == links.end()) + return string(); + + return l->second->getUsername(); +} + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index f902490ed3..242c0d58ba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -81,8 +81,8 @@ namespace broker { std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& id, std::string& excludes); @@ -113,6 +113,7 @@ namespace broker { void notifyConnectionForced (const std::string& key, const std::string& text); std::string getAuthMechanism (const std::string& key); std::string getAuthCredentials (const std::string& key); + std::string getAuthIdentity (const std::string& key); }; } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 2c9717caa0..0b26762697 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -130,19 +130,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } }else { if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -157,13 +153,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } if (store && !msg->isContentLoaded()) { @@ -176,13 +170,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -367,8 +359,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->inc_consumers (); + mgmtObject->inc_consumerCount (); } } @@ -378,8 +369,7 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->dec_consumers (); + mgmtObject->dec_consumerCount (); } } @@ -409,11 +399,9 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); - mgmtObject->dec_byteDepth (msg.payload->contentSize()); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); @@ -682,7 +670,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { if (inst) { ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj.get() != 0) - mgmtObject->set_storeRef(childObj->getObjectId()); + childObj->setReference(mgmtObject->getObjectId()); } } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dd8267a7d8..1ddb3f3026 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -111,7 +111,7 @@ void SessionState::attach(SessionHandler& h) { if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 1c4fa2ea7a..a16421b090 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -138,8 +138,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingPattern].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } @@ -159,8 +159,8 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co qv.erase(q); if(qv.empty()) bindings.erase(bi); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index 1d8f2ae8d8..8c4d4f79a4 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const QPID_LOG(trace, "Bound successfully with query: " << queryText ); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); + mgmtExchange->inc_bindingCount(); } return true; } else{ @@ -128,7 +128,7 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons bindingsMap.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); + mgmtExchange->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 0ddbd62350..a2802cf932 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -630,7 +630,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_sysId (systemId); + agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); remoteAgents[sessionId] = agent; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 6af5412b99..68d7e5c886 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -37,3 +37,6 @@ void ManagementObject::writeTimestamps (Buffer& buf) buf.putLongLong (destroyTime); buf.putLongLong (objectId); } + +void ManagementObject::setReference(uint64_t) {} + diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 2661cf2d96..1d54d606a4 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -92,6 +92,7 @@ class ManagementObject virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; + virtual void setReference(uint64_t objectId); virtual std::string getClassName (void) = 0; virtual std::string getPackageName (void) = 0; diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h new file mode 100644 index 0000000000..d598b49427 --- /dev/null +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -0,0 +1,52 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include "ScopedIncrement.h" + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount { + public: + typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement; + typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 0f52165587..b92df89839 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -172,7 +172,7 @@ class FederationTests(TestBase010): link = mgmt.get_object("link") mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", - "key":"", "tag":"", "excludes":"", "src_is_queue":1}) + "key":"", "tag":"", "excludes":"", "srcIsQueue":1}) sleep(6) bridge = mgmt.get_object("bridge") |