diff options
Diffstat (limited to 'qpid/cpp/src/qpid/management')
18 files changed, 4802 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/management/Args.h b/qpid/cpp/src/qpid/management/Args.h new file mode 100644 index 0000000000..5d1cb7e01d --- /dev/null +++ b/qpid/cpp/src/qpid/management/Args.h @@ -0,0 +1,44 @@ +#ifndef _Args_ +#define _Args_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + + +namespace qpid { +namespace management { + +class Args +{ + public: + + virtual ~Args (void) = 0; + +}; + +inline Args::~Args (void) {} + +class ArgsNone : public Args +{ +}; + +}} + + +#endif /*!_Args_*/ diff --git a/qpid/cpp/src/qpid/management/Buffer.cpp b/qpid/cpp/src/qpid/management/Buffer.cpp new file mode 100644 index 0000000000..0ad6e9a851 --- /dev/null +++ b/qpid/cpp/src/qpid/management/Buffer.cpp @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/Buffer.h" +#include "qpid/framing/Buffer.h" +#include "qpid/amqp_0_10/Codecs.h" + +using namespace std; + +namespace qpid { +namespace management { + +Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {} +Buffer::~Buffer() { delete impl; } +void Buffer::reset() { impl->reset(); } +uint32_t Buffer::available() { return impl->available(); } +uint32_t Buffer::getSize() { return impl->getSize(); } +uint32_t Buffer::getPosition() { return impl->getPosition(); } +void Buffer::setPosition(uint32_t p) { impl->setPosition(p); } +char* Buffer::getPointer() { return impl->getPointer(); } +void Buffer::putOctet(uint8_t i) { impl->putOctet(i); } +void Buffer::putShort(uint16_t i) { impl->putShort(i); } +void Buffer::putLong(uint32_t i) { impl->putLong(i); } +void Buffer::putLongLong(uint64_t i) { impl->putLongLong(i); } +void Buffer::putInt8(int8_t i) { impl->putInt8(i); } +void Buffer::putInt16(int16_t i) { impl->putInt16(i); } +void Buffer::putInt32(int32_t i) { impl->putInt32(i); } +void Buffer::putInt64(int64_t i) { impl->putInt64(i); } +void Buffer::putFloat(float i) { impl->putFloat(i); } +void Buffer::putDouble(double i) { impl->putDouble(i); } +void Buffer::putBin128(const uint8_t* i) { impl->putBin128(i); } +uint8_t Buffer::getOctet() { return impl->getOctet(); } +uint16_t Buffer::getShort() { return impl->getShort(); } +uint32_t Buffer::getLong() { return impl->getLong(); } +uint64_t Buffer::getLongLong() { return impl->getLongLong(); } +int8_t Buffer:: getInt8() { return impl-> getInt8(); } +int16_t Buffer::getInt16() { return impl->getInt16(); } +int32_t Buffer::getInt32() { return impl->getInt32(); } +int64_t Buffer::getInt64() { return impl->getInt64(); } +float Buffer::getFloat() { return impl->getFloat(); } +double Buffer::getDouble() { return impl->getDouble(); } +void Buffer::putShortString(const string& i) { impl->putShortString(i); } +void Buffer::putMediumString(const string& i) { impl->putMediumString(i); } +void Buffer::putLongString(const string& i) { impl->putLongString(i); } +void Buffer::getShortString(string& i) { impl->getShortString(i); } +void Buffer::getMediumString(string& i) { impl->getMediumString(i); } +void Buffer::getLongString(string& i) { impl->getLongString(i); } +void Buffer::getBin128(uint8_t* i) { impl->getBin128(i); } +void Buffer::putRawData(const string& i) { impl->putRawData(i); } +void Buffer::getRawData(string& s, uint32_t size) { impl->getRawData(s, size); } +void Buffer::putRawData(const uint8_t* data, size_t size) { impl->putRawData(data, size); } +void Buffer::getRawData(uint8_t* data, size_t size) { impl->getRawData(data, size); } + +void Buffer::putMap(const types::Variant::Map& i) +{ + string encoded; + amqp_0_10::MapCodec::encode(i, encoded); + impl->putRawData(encoded); +} + +void Buffer::putList(const types::Variant::List& i) +{ + string encoded; + amqp_0_10::ListCodec::encode(i, encoded); + impl->putRawData(encoded); +} + +void Buffer::getMap(types::Variant::Map& map) +{ + string encoded; + uint32_t saved = impl->getPosition(); + uint32_t length = impl->getLong(); + impl->setPosition(saved); + impl->getRawData(encoded, length + sizeof(uint32_t)); + amqp_0_10::MapCodec::decode(encoded, map); +} + +void Buffer::getList(types::Variant::List& list) +{ + string encoded; + uint32_t saved = impl->getPosition(); + uint32_t length = impl->getLong(); + impl->setPosition(saved); + impl->getRawData(encoded, length + sizeof(uint32_t)); + amqp_0_10::ListCodec::decode(encoded, list); +} + +}} diff --git a/qpid/cpp/src/qpid/management/Buffer.h b/qpid/cpp/src/qpid/management/Buffer.h new file mode 100644 index 0000000000..1ac52bf276 --- /dev/null +++ b/qpid/cpp/src/qpid/management/Buffer.h @@ -0,0 +1,105 @@ +#ifndef _Management_Buffer_ +#define _Management_Buffer_ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/CommonImportExport.h" +#include "qpid/types/Exception.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qpid { +namespace framing { + class Buffer; +} + +namespace management { + +struct OutOfBounds : qpid::types::Exception { + OutOfBounds() : qpid::types::Exception(std::string("Out of Bounds")) {} +}; + + +/** + * This class is a wrapper around qpid::framing::Buffer that does not include any dependencies + * from boost or from qpid::framing. + */ +class Buffer +{ +public: + QPID_COMMON_EXTERN Buffer(char* data=0, uint32_t size=0); + QPID_COMMON_EXTERN ~Buffer(); + + QPID_COMMON_EXTERN void reset(); + + QPID_COMMON_EXTERN uint32_t available(); + QPID_COMMON_EXTERN uint32_t getSize(); + QPID_COMMON_EXTERN uint32_t getPosition(); + QPID_COMMON_EXTERN void setPosition(uint32_t); + QPID_COMMON_EXTERN char* getPointer(); + + QPID_COMMON_EXTERN void putOctet(uint8_t i); + QPID_COMMON_EXTERN void putShort(uint16_t i); + QPID_COMMON_EXTERN void putLong(uint32_t i); + QPID_COMMON_EXTERN void putLongLong(uint64_t i); + QPID_COMMON_EXTERN void putInt8(int8_t i); + QPID_COMMON_EXTERN void putInt16(int16_t i); + QPID_COMMON_EXTERN void putInt32(int32_t i); + QPID_COMMON_EXTERN void putInt64(int64_t i); + QPID_COMMON_EXTERN void putFloat(float f); + QPID_COMMON_EXTERN void putDouble(double f); + QPID_COMMON_EXTERN void putBin128(const uint8_t* b); + + QPID_COMMON_EXTERN uint8_t getOctet(); + QPID_COMMON_EXTERN uint16_t getShort(); + QPID_COMMON_EXTERN uint32_t getLong(); + QPID_COMMON_EXTERN uint64_t getLongLong(); + QPID_COMMON_EXTERN int8_t getInt8(); + QPID_COMMON_EXTERN int16_t getInt16(); + QPID_COMMON_EXTERN int32_t getInt32(); + QPID_COMMON_EXTERN int64_t getInt64(); + QPID_COMMON_EXTERN float getFloat(); + QPID_COMMON_EXTERN double getDouble(); + + QPID_COMMON_EXTERN void putShortString(const std::string& s); + QPID_COMMON_EXTERN void putMediumString(const std::string& s); + QPID_COMMON_EXTERN void putLongString(const std::string& s); + QPID_COMMON_EXTERN void getShortString(std::string& s); + QPID_COMMON_EXTERN void getMediumString(std::string& s); + QPID_COMMON_EXTERN void getLongString(std::string& s); + QPID_COMMON_EXTERN void getBin128(uint8_t* b); + + QPID_COMMON_EXTERN void putMap(const types::Variant::Map& map); + QPID_COMMON_EXTERN void putList(const types::Variant::List& list); + QPID_COMMON_EXTERN void getMap(types::Variant::Map& map); + QPID_COMMON_EXTERN void getList(types::Variant::List& list); + + QPID_COMMON_EXTERN void putRawData(const std::string& s); + QPID_COMMON_EXTERN void getRawData(std::string& s, uint32_t size); + + QPID_COMMON_EXTERN void putRawData(const uint8_t* data, size_t size); + QPID_COMMON_EXTERN void getRawData(uint8_t* data, size_t size); + +private: + framing::Buffer* impl; +}; + +}} // namespace qpid::management + +#endif diff --git a/qpid/cpp/src/qpid/management/ConnectionSettings.cpp b/qpid/cpp/src/qpid/management/ConnectionSettings.cpp new file mode 100644 index 0000000000..1421a26867 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ConnectionSettings.cpp @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/management/ConnectionSettings.h" +#include "qpid/Version.h" + +qpid::management::ConnectionSettings::ConnectionSettings() : + protocol("tcp"), + host("localhost"), + port(5672), + locale("en_US"), + heartbeat(0), + maxChannels(32767), + maxFrameSize(65535), + bounds(2), + tcpNoDelay(false), + service(qpid::saslName), + minSsf(0), + maxSsf(256) +{} + +qpid::management::ConnectionSettings::~ConnectionSettings() {} + diff --git a/qpid/cpp/src/qpid/management/ConnectionSettings.h b/qpid/cpp/src/qpid/management/ConnectionSettings.h new file mode 100644 index 0000000000..b631ffa658 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ConnectionSettings.h @@ -0,0 +1,118 @@ +#ifndef _management_ConnectionSettings_h +#define _management_ConnectionSettings_h +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonImportExport.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qpid { +namespace management { + +/** + * Settings for a Connection. + */ +struct ConnectionSettings { + + QPID_COMMON_EXTERN ConnectionSettings(); + QPID_COMMON_EXTERN virtual ~ConnectionSettings(); + + /** + * The protocol used for the connection (defaults to 'tcp') + */ + std::string protocol; + + /** + * The host (or ip address) to connect to (defaults to 'localhost'). + */ + std::string host; + /** + * The port to connect to (defaults to 5672). + */ + uint16_t port; + /** + * Allows an AMQP 'virtual host' to be specified for the + * connection. + */ + std::string virtualhost; + + /** + * The username to use when authenticating the connection. If not + * specified the current users login is used if available. + */ + std::string username; + /** + * The password to use when authenticating the connection. + */ + std::string password; + /** + * The SASL mechanism to use when authenticating the connection; + * the options are currently PLAIN or ANONYMOUS. + */ + std::string mechanism; + /** + * Allows a locale to be specified for the connection. + */ + std::string locale; + /** + * Allows a heartbeat frequency to be specified + */ + uint16_t heartbeat; + /** + * The maximum number of channels that the client will request for + * use on this connection. + */ + uint16_t maxChannels; + /** + * The maximum frame size that the client will request for this + * connection. + */ + uint16_t maxFrameSize; + /** + * Limit the size of the connections send buffer . The buffer + * is limited to bounds * maxFrameSize. + */ + unsigned int bounds; + /** + * If true, TCP_NODELAY will be set for the connection. + */ + bool tcpNoDelay; + /** + * SASL service name + */ + std::string service; + /** + * Minimum acceptable strength of any SASL negotiated security + * layer. 0 means no security layer required. + */ + unsigned int minSsf; + /** + * Maximum acceptable strength of any SASL negotiated security + * layer. 0 means no security layer allowed. + */ + unsigned int maxSsf; +}; + +}} + +#endif + diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp new file mode 100644 index 0000000000..651215ffb5 --- /dev/null +++ b/qpid/cpp/src/qpid/management/Manageable.cpp @@ -0,0 +1,53 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" + +using namespace qpid::management; +using std::string; + +string Manageable::StatusText (status_t status, string text) +{ + if ((status & STATUS_USER) == STATUS_USER) + return text; + + switch (status) + { + case STATUS_OK : return "OK"; + case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; + case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; + case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; + case STATUS_PARAMETER_INVALID : return "InvalidParameter"; + case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; + case STATUS_FORBIDDEN : return "Forbidden"; + } + + return "??"; +} + +Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&) +{ + return STATUS_UNKNOWN_METHOD; +} + +bool Manageable::AuthorizeMethod(uint32_t, Args&, const std::string&) +{ + return true; +} + diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h new file mode 100644 index 0000000000..ede5c29e43 --- /dev/null +++ b/qpid/cpp/src/qpid/management/Manageable.h @@ -0,0 +1,81 @@ +#ifndef _Manageable_ +#define _Manageable_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/ManagementObject.h" +#include "qpid/management/Args.h" +#include <string> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace management { + +class QPID_COMMON_EXTERN Manageable +{ + public: + + virtual ~Manageable(void) = 0; + + // status_t is a type used to pass completion status from the method handler. + // + typedef uint32_t status_t; + static std::string StatusText(status_t status, std::string text = std::string()); + + static const status_t STATUS_OK = 0; + static const status_t STATUS_UNKNOWN_OBJECT = 1; + static const status_t STATUS_UNKNOWN_METHOD = 2; + static const status_t STATUS_NOT_IMPLEMENTED = 3; + static const status_t STATUS_PARAMETER_INVALID = 4; + static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; + static const status_t STATUS_FORBIDDEN = 6; + static const status_t STATUS_EXCEPTION = 7; + static const status_t STATUS_USER = 0x00010000; + + // Every "Manageable" object must hold a reference to exactly one + // management object. This object is always of a class derived from + // the pure-virtual "ManagementObject". + // + // This accessor function returns a pointer to the management object. + // +#ifdef _IN_QPID_BROKER + virtual ManagementObject::shared_ptr GetManagementObject() const = 0; +#else + virtual ManagementObject* GetManagementObject() const = 0; +#endif + + // Every "Manageable" object must implement ManagementMethod. This + // function is called when a remote management client invokes a method + // on this object. The input and output arguments are specific to the + // method being called and must be down-cast to the appropriate sub class + // before use. + virtual status_t ManagementMethod(uint32_t methodId, Args& args, std::string& text); + + // This optional method can be overridden to allow the agent application to + // authorize method invocations. Return true iff the authenticated user identified + // in userId us authorized to execute the method. + virtual bool AuthorizeMethod(uint32_t methodId, Args& args, const std::string& userId); +}; + +inline Manageable::~Manageable(void) {} + +}} + +#endif /*!_Manageable_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp new file mode 100644 index 0000000000..516babce61 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -0,0 +1,2832 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +// NOTE on use of log levels: The criteria for using trace vs. debug +// is to use trace for log messages that are generated for each +// unbatched stats/props notification and debug for everything else. + +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/PollableQueue.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/AclModule.h" +#include "qpid/broker/Protocol.h" +#include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/List.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <list> +#include <iostream> +#include <fstream> +#include <sstream> +#include <typeinfo> + +#include <boost/bind.hpp> +#include <boost/function.hpp> + +namespace qpid { +namespace management { + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid; +using namespace std; +namespace _qmf = qmf::org::apache::qpid::broker; + + +namespace { +const size_t qmfV1BufferSize(65536); +const string defaultVendorName("vendor"); +const string defaultProductName("product"); + +// Create a valid binding key substring by +// replacing all '.' chars with '_' +const string keyifyNameStr(const string& name) +{ + string n2 = name; + + size_t pos = n2.find('.'); + while (pos != n2.npos) { + n2.replace(pos, 1, "_"); + pos = n2.find('.', pos); + } + return n2; +} + +struct ScopedManagementContext +{ + const Connection* context; + + ScopedManagementContext(const Connection* p) : context(p) + { + if (p) setManagementExecutionContext(*p); + } + + management::ObjectId getObjectId() const + { + return context ? context->getObjectId() : management::ObjectId(); + } + std::string getUserId() const + { + return context ? context->getUserId() : std::string(); + } + std::string getMgmtId() const + { + return context ? context->getMgmtId() : std::string(); + } + + + ~ScopedManagementContext() + { + resetManagementExecutionContext(); + } +}; + +typedef boost::function0<void> FireFunction; +struct Periodic : public qpid::sys::TimerTask +{ + FireFunction fireFunction; + qpid::sys::Timer* timer; + + Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds); + virtual ~Periodic (); + void fire (); +}; + +Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds) + : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), + fireFunction(f), timer(t) {} + +Periodic::~Periodic() {} + +void Periodic::fire() +{ + setupNextFire(); + timer->add(this); + fireFunction(); +} + +} + + +static Variant::Map mapEncodeSchemaId(const string& pname, + const string& cname, + const string& type, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_type"] = type; + map_["_hash"] = qpid::types::Uuid(md5Sum); + return map_; +} + + +ManagementAgent::RemoteAgent::~RemoteAgent () +{ + QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); + if (mgmtObject != 0) { + mgmtObject->resourceDestroy(); + agent.deleteObjectNow(mgmtObject->getObjectId()); + mgmtObject.reset(); + } +} + +ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : + threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0), + startTime(sys::now()), + suppressed(false), disallowAllV1Methods(false), + vendorNameKey(defaultVendorName), productNameKey(defaultProductName), + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100) +{ + nextObjectId = 1; + brokerBank = 1; + bootSequence = 1; + nextRemoteBank = 10; + nextRequestSequence = 1; + clientWasAdded = false; + attrMap["_vendor"] = defaultVendorName; + attrMap["_product"] = defaultProductName; + + memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker")); + addObject(memstat, "amqp-broker"); +} + +ManagementAgent::~ManagementAgent () +{ + { + sys::Mutex::ScopedLock lock (userLock); + + // Reset the shared pointers to exchanges. If this is not done now, the exchanges + // will stick around until dExchange and mExchange are implicitly destroyed (long + // after this destructor completes). Those exchanges hold references to management + // objects that will be invalid. + dExchange.reset(); + mExchange.reset(); + v2Topic.reset(); + v2Direct.reset(); + + remoteAgents.clear(); + } +} + +void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval, + qpid::broker::Broker* _broker, int _threads) +{ + dataDir = _dataDir; + publish = _publish; + interval = _interval; + broker = _broker; + threadPoolSize = _threads; + ManagementObject::maxThreads = threadPoolSize; + sendQueue.reset( + new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller())); + sendQueue->start(); + timer = &broker->getTimer(); + timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval)); + + protocols = &broker->getProtocolRegistry(); + // Get from file or generate and save to file. + if (dataDir.empty()) + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); + + if (inFile.good()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close(); + if (uuid.isNull()) { + uuid.generate(); + QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid); + } else + QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid); + + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. + bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; + writeData(); + } + else + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); + writeData(); + } + + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); + } +} + +void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) +{ + if (vendor.find(':') != vendor.npos) { + throw Exception("vendor string cannot contain a ':' character."); + } + if (product.find(':') != product.npos) { + throw Exception("product string cannot contain a ':' character."); + } + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + if (uuid.isNull()) + { + throw Exception("ManagementAgent::configure() must be called if default name is used."); + } + inst = uuid.str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; + + vendorNameKey = keyifyNameStr(vendor); + productNameKey = keyifyNameStr(product); + instanceNameKey = keyifyNameStr(inst); +} + + +void ManagementAgent::getName(string& vendor, string& product, string& instance) +{ + vendor = std::string(attrMap["_vendor"]); + product = std::string(attrMap["_product"]); + instance = std::string(attrMap["_instance"]); +} + + +const std::string& ManagementAgent::getAddress() +{ + return name_address; +} + + +void ManagementAgent::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close(); + } +} + +void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + v2Topic = _texchange; + v2Direct = _dexchange; +} + +void ManagementAgent::registerClass (const string& packageName, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementAgent::registerEvent (const string& packageName, + const string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); +} + +// Deprecated: V1 objects +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent) +{ + uint16_t sequence; + uint64_t objectNum; + + sys::Mutex::ScopedLock lock(addLock); + sequence = persistent ? 0 : bootSequence; + objectNum = persistId ? persistId : nextObjectId++; + + ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum); + objId.setV2Key(*object); // let object generate the v2 key + + object->setObjectId(objId); + + newManagementObjects.push_back(object); + QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); + return objId; +} + + + +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, + const string& key, + bool persistent) +{ + uint16_t sequence; + + sequence = persistent ? 0 : bootSequence; + + ObjectId objId(0 /*flags*/, sequence, brokerBank); + if (key.empty()) { + objId.setV2Key(*object); // let object generate the key + } else { + objId.setV2Key(key); + } + + object->setObjectId(objId); + { + sys::Mutex::ScopedLock lock(addLock); + newManagementObjects.push_back(object); + } + QPID_LOG(debug, "Management object added: " << objId.getV2Key()); + return objId; +} + +void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) +{ + static const std::string severityStr[] = { + "emerg", "alert", "crit", "error", "warn", + "note", "info", "debug" + }; + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + + if (qmf1Support) { + char buffer[qmfV1BufferSize]; + Buffer outBuffer(buffer, qmfV1BufferSize); + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(sys::Duration::FromEpoch()); + outBuffer.putOctet(sev); + string sBuf; + event.encode(sBuf); + outBuffer.putRawData(sBuf); + sendBuffer(outBuffer, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + "_event", + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) + << "." << keyifyNameStr(event.getEventName()) + << "." << severityStr[sev] + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + + string content; + Variant::List list_; + list_.push_back(map_); + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str()); + QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + } +} + +void ManagementAgent::clientAdded (const string& routingKey) +{ + sys::Mutex::ScopedLock lock(userLock); + + // + // If this routing key is not relevant to object updates, exit. + // + if ((routingKey.compare(0, 1, "#") != 0) && + (routingKey.compare(0, 9, "console.#") != 0) && + (routingKey.compare(0, 12, "console.obj.") != 0)) + return; + + // + // Mark local objects for full-update. + // + clientWasAdded = true; + + // + // If the routing key is relevant for local objects only, don't involve + // any of the remote agents. + // + if (routingKey.compare(0, 39, "console.obj.*.*.org.apache.qpid.broker.") == 0) + return; + + std::list<std::string> rkeys; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + rkeys.push_back(aIter->second->routingKey); + } + + while (rkeys.size()) { + char localBuffer[16]; + Buffer outBuffer(localBuffer, 16); + + encodeHeader(outBuffer, 'x'); + sendBuffer(outBuffer, dExchange, rkeys.front()); + QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); + rkeys.pop_front(); + } +} + +void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('2'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +void ManagementAgent::sendBuffer(Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey) +{ + if (suppressed) { + QPID_LOG(debug, "Suppressing management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + + size_t length = buf.getPosition(); + buf.reset(); + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + transfer->getFrames().append(method); + transfer->getFrames().append(header); + + MessageProperties* props = + transfer->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + + DeliveryProperties* dp = + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + + transfer->getFrames().append(content); + transfer->setIsManagementMessage(true); + Message msg(transfer, transfer); + sendQueue->push(make_pair(exchange, msg)); + buf.reset(); +} + + +void ManagementAgent::sendBuffer(Buffer& buf, + const string& exchange, + const string& routingKey) +{ + qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); + if (ex.get() != 0) + sendBuffer(buf, ex, routingKey); +} + + +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey, + uint64_t ttl_msec) +{ + Variant::Map::const_iterator i; + + if (suppressed) { + QPID_LOG(debug, "Suppressing management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody(data))); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + transfer->getFrames().append(method); + transfer->getFrames().append(header); + + MessageProperties* props = + transfer->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(data.length()); + if (!cid.empty()) { + props->setCorrelationId(cid); + } + props->setContentType(content_type); + props->setAppId("qmf2"); + + for (i = headers.begin(); i != headers.end(); ++i) { + props->getApplicationHeaders().setString(i->first, i->second.asString()); + } + + DeliveryProperties* dp = + transfer->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + if (ttl_msec) { + dp->setTtl(ttl_msec); + } + transfer->getFrames().append(content); + transfer->computeRequiredCredit(); + transfer->setIsManagementMessage(true); + transfer->computeExpiration(); + Message msg(transfer, transfer); + + sendQueue->push(make_pair(exchange, msg)); +} + + +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + const string& exchange, + const string& routingKey, + uint64_t ttl_msec) +{ + qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); + if (ex.get() != 0) + sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec); +} + + +/** Objects that have been added since the last periodic poll are temporarily + * saved in the newManagementObjects list. This allows objects to be + * added without needing to block on the userLock (objectLock is used instead). + * These new objects need to be integrated into the object database + * (managementObjects) *before* they can be properly managed. This routine + * performs the integration. + * + * Note well: objects on the newManagementObjects list may have been + * marked as "deleted", and, possibly re-added. This would result in + * duplicate object ids. To avoid clashes, don't put deleted objects + * into the active object database. + */ +void ManagementAgent::moveNewObjects() +{ + sys::Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock objLock (objectLock); + while (!newManagementObjects.empty()) { + ManagementObject::shared_ptr object = newManagementObjects.back(); + newManagementObjects.pop_back(); + + if (object->isDeleted()) { + DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + } else { // add to active object list, check for duplicates. + ObjectId oid = object->getObjectId(); + ManagementObjectMap::iterator destIter = managementObjects.find(oid); + if (destIter != managementObjects.end()) { + // duplicate found. It is OK if the old object has been marked + // deleted, just replace the old with the new. + ManagementObject::shared_ptr oldObj = destIter->second; + if (!oldObj->isDeleted()) { + // Duplicate non-deleted objects? This is a user error - oids must be unique. + // for now, leak the old object (safer than deleting - may still be referenced) + // and complain loudly... + QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + oldObj->resourceDestroy(); + } + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + // QPID-3666: be sure to replace the -index- also, as non-key members of + // the index object may be different for the new object! So erase the + // entry, rather than []= assign here: + managementObjects.erase(destIter); + } + managementObjects[oid] = object; + } + } +} + +void ManagementAgent::periodicProcessing (void) +{ +#define HEADROOM 4096 + sys::Mutex::ScopedLock lock (userLock); + debugSnapshot("Management agent periodic processing"); + string routingKey; + string sBuf; + + moveNewObjects(); + + // + // If we're publishing updates, get the latest memory statistics and uptime now + // + if (publish) { + uint64_t uptime = sys::Duration(startTime, sys::now()); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat.get()); + } + + // + // Use a copy of the management object map to avoid holding the objectLock + // + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); + } + + // + // Clear the been-here flag on all objects in the map. + // + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = *iter; + object->setFlags(0); + if (clientWasAdded) { + object->setForcePublish(true); + } + } + + clientWasAdded = false; + + // first send the pending deletes before sending updates. This prevents a + // "false delete" scenario: if an object was deleted then re-added during + // the last poll cycle, it will have a delete entry and an active entry. + // if we sent the active update first, _then_ the delete update, clients + // would incorrectly think the object was deleted. See QPID-2997 + // + bool objectsDeleted = moveDeletedObjects(); + PendingDeletedObjsMap localPendingDeletedObjs; + { + sys::Mutex::ScopedLock objLock(objectLock); + localPendingDeletedObjs.swap(pendingDeletedObjs); + } + + // + // If we are not publishing updates, just clear the pending deletes. There's no + // need to tell anybody. + // + if (!publish) + localPendingDeletedObjs.clear(); + + ResizableBuffer msgBuffer(qmfV1BufferSize); + if (!localPendingDeletedObjs.empty()) { + for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin(); + mIter != localPendingDeletedObjs.end(); + mIter++) { + std::string packageName; + std::string className; + msgBuffer.reset(); + uint32_t v1Objs = 0; + uint32_t v2Objs = 0; + Variant::List list_; + + size_t pos = mIter->first.find(":"); + packageName = mIter->first.substr(0, pos); + className = mIter->first.substr(pos+1); + + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space. + std::string oid = (*lIter)->objectId; + if (!(*lIter)->encodedV1Config.empty()) { + encodeHeader(msgBuffer, 'c'); + msgBuffer.putRawData((*lIter)->encodedV1Config); + QPID_LOG(trace, "Deleting V1 properties " << oid + << " len=" << (*lIter)->encodedV1Config.size()); + v1Objs++; + } + if (!(*lIter)->encodedV1Inst.empty()) { + encodeHeader(msgBuffer, 'i'); + msgBuffer.putRawData((*lIter)->encodedV1Inst); + QPID_LOG(trace, "Deleting V1 statistics " << oid + << " len=" << (*lIter)->encodedV1Inst.size()); + v1Objs++; + } + if (v1Objs >= maxReplyObjs) { + v1Objs = 0; + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" + << key.str() << " len=" << contentSize); + } + + if (!(*lIter)->encodedV2.empty()) { + QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); + list_.push_back((*lIter)->encodedV2); + if (++v2Objs >= maxReplyObjs) { + v2Objs = 0; + + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } + } // end current list + + // send any remaining objects... + + if (v1Objs) { + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + } + + if (!list_.empty()) { + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } // end map + } + + // + // Process the entire object map. + // + // If publish is disabled, don't send any updates. + // + while (publish) { + msgBuffer.reset(); + Variant::List list_; + uint32_t pcount; + uint32_t scount; + uint32_t v1Objs, v2Objs; + ManagementObjectVector::iterator baseIter; + std::string packageName; + std::string className; + + for (baseIter = localManagementObjects.begin(); + baseIter != localManagementObjects.end(); + baseIter++) { + ManagementObject::shared_ptr baseObject = *baseIter; + // + // Skip until we find a base object requiring processing... + // + if (baseObject->getFlags() == 0) { + packageName = baseObject->getPackageName(); + className = baseObject->getClassName(); + break; + } + } + + if (baseIter == localManagementObjects.end()) + break; // done - all objects processed + + pcount = scount = 0; + v1Objs = 0; + v2Objs = 0; + list_.clear(); + msgBuffer.reset(); + + for (ManagementObjectVector::iterator iter = baseIter; + iter != localManagementObjects.end(); + iter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space + ManagementObject::shared_ptr baseObject = *baseIter; + ManagementObject::shared_ptr object = *iter; + bool send_stats, send_props; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + // skip any objects marked deleted since our first pass. Deal with them + // on the next periodic cycle... + if (object->isDeleted()) { + continue; + } + + send_props = (object->getConfigChanged() || object->getForcePublish()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_props && qmf1Support) { + size_t pos = msgBuffer.getPosition(); + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 properties " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); + ++v1Objs; + } + + if (send_stats && qmf1Support) { + size_t pos = msgBuffer.getPosition(); + encodeHeader(msgBuffer, 'i'); + sBuf.clear(); + object->writeStatistics(sBuf); + msgBuffer.putRawData(sBuf); + QPID_LOG(trace, "Changed V1 statistics " + << object->getObjectId().getV2Key() + << " len=" << msgBuffer.getPosition()-pos); + ++v1Objs; + } + + if ((send_stats || send_props) && qmf2Support) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->writeTimestamps(map_); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + v2Objs++; + QPID_LOG(trace, "Changed V2" + << (send_stats? " statistics":"") + << (send_props? " properties":"") + << " map=" << map_); + } + + if (send_props) pcount++; + if (send_stats) scount++; + + object->setForcePublish(false); + + if ((qmf1Support && (v1Objs >= maxReplyObjs)) || + (qmf2Support && (v2Objs >= maxReplyObjs))) + break; // have enough objects, send an indication... + } + } + + if (pcount || scount) { + if (qmf1Support) { + if (msgBuffer.getPosition() > 0) { + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); + QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << contentSize); + } + } + + if (qmf2Support) { + string content; + ListCodec::encode(list_, content); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); + QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() + << " props=" << pcount + << " stats=" << scount + << " len=" << content.length()); + } + } + } + } // end processing updates for all objects + + if (objectsDeleted) { + sys::Mutex::ScopedLock lock (userLock); + deleteOrphanedAgentsLH(); + } + + // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. + + if (qmf1Support) { + char msgChars[qmfV1BufferSize]; + Buffer msgBuffer(msgChars, qmfV1BufferSize); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(sys::Duration::FromEpoch()); + + routingKey = "console.heartbeat.1.0"; + sendBuffer(msgBuffer, mExchange, routingKey); + QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); + } + + if (qmf2Support) { + std::stringstream addr_key; + + addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey; + if (!instanceNameKey.empty()) + addr_key << "." << instanceNameKey; + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; + + string content; + MapCodec::encode(map, content); + + // Set TTL (in msecs) on outgoing heartbeat indications based on the interval + // time to prevent stale heartbeats from getting to the consoles. + sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); + + QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); + } +} + +void ManagementAgent::deleteObjectNow(const ObjectId& oid) +{ + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(oid); + if (iter == managementObjects.end()) + return; + object = iter->second; + if (!object->isDeleted()) + return; + managementObjects.erase(oid); + } + +#define DNOW_BUFSIZE 2048 + char msgChars[DNOW_BUFSIZE]; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + Variant::List list_; + stringstream v1key, v2key; + + if (publish && qmf1Support) { + string sBuf; + + v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + encodeHeader(msgBuffer, 'c'); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + } + + if (publish && qmf2Support) { + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->writeTimestamps(map_); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + v2key << "agent.ind.data." << keyifyNameStr(object->getPackageName()) + << "." << keyifyNameStr(object->getClassName()) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + v2key << "." << instanceNameKey; + } + + object.reset(); + + // object deleted, ok to drop lock now. + + if (publish && qmf1Support) { + sendBuffer(msgBuffer, mExchange, v1key.str()); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); + } + + if (publish && qmf2Support) { + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0); + QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); + } +} + +void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence, + uint32_t code, const string& text) +{ + ResizableBuffer outBuffer (qmfV1BufferSize); + + encodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << + replyToKey << " seq=" << sequence); +} + +void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid, + const string& text, uint32_t code, bool viaLocal) +{ + static const string addr_exchange("qmf.default.direct"); + + Variant::Map map; + Variant::Map headers; + Variant::Map values; + string content; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = viaLocal ? "broker" : name_address; + + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; + + MapCodec::encode(map, content); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); + + QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); +} + +bool ManagementAgent::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/, + const bool topic, + int qmfVersion) +{ + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + if (topic && qmfVersion == 1) { + + // qmf1 is bound only to the topic management exchange. + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# + + if (routingKey == "broker") { + dispatchAgentCommand(msg); + return false; + } + + if (routingKey.length() > 6) { + + if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommand(msg); + return false; + } + + if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessage(msg); + } + + if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommand(msg); + return true; + } + } + } + + if (qmfVersion == 2) { + + if (topic) { + // Intercept messages bound to: + // "console.ind.locate.# - process these messages, and also allow them to be forwarded. + if (routingKey == "console.request.agent_locate") { + dispatchAgentCommand(msg); + return true; + } + + } else { // direct exchange + + // Intercept messages bound to: + // "broker" - generic alias for the local broker + // "<name_address>" - the broker agent's proper name + // and do not forward them futher + if (routingKey == "broker" || routingKey == name_address) { + dispatchAgentCommand(msg, routingKey == "broker"); + return false; + } + } + } + + return true; +} + +void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const string& userId) +{ + moveNewObjects(); + + string methodName; + string packageName; + string className; + uint8_t hash[16]; + ResizableBuffer outBuffer (qmfV1BufferSize); + AclModule* acl = broker->getAcl(); + string inArgs; + + string sBuf; + inBuffer.getRawData(sBuf, 16); + ObjectId objId; + objId.decode(sBuf); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + inBuffer.getRawData(inArgs, inBuffer.available()); + + QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + methodName << " replyTo=" << replyToKey); + + encodeHeader(outBuffer, 'm', sequence); + + if (disallowAllV1Methods) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); + return; + } + + DisallowedMethods::const_iterator i = disallowed.find(make_pair(className, methodName)); + if (i != disallowed.end()) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(i->second); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; + } + + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; + } + } + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (!object || object->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + if ((object->getPackageName() != packageName) || + (object->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); + } + else { + uint32_t pos = outBuffer.getPosition(); + try { + string outBuf; + object->doMethod(methodName, inArgs, outBuf, userId); + outBuffer.putRawData(outBuf); + } catch(exception& e) { + outBuffer.setPosition(pos);; + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putMediumString(e.what()); + } + } + } + + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); +} + + +void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk, + const string& cid, const string& userId, bool viaLocal) +{ + moveNewObjects(); + + string methodName; + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + string content; + string error; + uint32_t errorCode(0); + + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = viaLocal ? "broker" : name_address; + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + Manageable::STATUS_PARAMETER_INVALID, viaLocal); + return; + } + + ObjectId objId; + Variant::Map inArgs; + Variant::Map callMap; + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + } catch(exception& e) { + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + return; + } + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (!object || object->isDeleted()) { + stringstream estr; + estr << "No object found with ID=" << objId; + sendException(rte, rtk, cid, estr.str(), 1, viaLocal); + return; + } + + // validate + AclModule* acl = broker->getAcl(); + DisallowedMethods::const_iterator i; + + i = disallowed.find(make_pair(object->getClassName(), methodName)); + if (i != disallowed.end()) { + sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); + return; + } + + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); + params[acl::PROP_SCHEMACLASS] = object->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, viaLocal); + return; + } + } + + // invoke the method + + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName() + << ":" << object->getClassName() << " method=" << + methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); + + try { + object->doMethod(methodName, inArgs, callMap, userId); + errorCode = callMap["_status_code"].asUint32(); + if (errorCode == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else + error = callMap["_status_text"].asString(); + } catch(exception& e) { + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + return; + } + + if (errorCode != 0) { + sendException(rte, rtk, cid, error, errorCode, viaLocal); + return; + } + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); + QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); +} + + +void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence) +{ + ResizableBuffer outBuffer (qmfV1BufferSize); + + QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); + + encodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); +} + +void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence) +{ + QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); + ResizableBuffer outBuffer (qmfV1BufferSize); + + { + sys::Mutex::ScopedLock lock(userLock); + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + } + } + + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); + } + + sendCommandComplete(replyToKey, sequence); +} + +void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +{ + string packageName; + + inBuffer.getShortString(packageName); + + QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + + sys::Mutex::ScopedLock lock(userLock); + findOrAddPackageLH(packageName); +} + +void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +{ + string packageName; + + inBuffer.getShortString(packageName); + + QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + + typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; + std::list<_ckeyType> classes; + { + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap &cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) { + if (cIter->second.hasSchema()) { + classes.push_back(make_pair(cIter->first, cIter->second.kind)); + } + } + } + } + + while (classes.size()) { + ResizableBuffer outBuffer(qmfV1BufferSize); + + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); + classes.pop_front(); + } + sendCommandComplete(replyToKey, sequence); +} + +void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t) +{ + string packageName; + SchemaClassKey key; + + uint8_t kind = inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), replyTo=" << replyToKey); + + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { + ResizableBuffer outBuffer (qmfV1BufferSize); + uint32_t sequence = nextRequestSequence++; + + // Schema Request + encodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + key.encode(outBuffer); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), to=" << replyToKey << " seq=" << sequence); + + if (cIter != pIter->second.end()) + pIter->second.erase(key); + + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); + } +} + +void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) { + string schema; + writeSchemaCall(schema); + buf.putRawData(schema); + } else + buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); +} + +void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + key.decode(inBuffer); + + QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); + + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + ResizableBuffer outBuffer(qmfV1BufferSize); + SchemaClass& classInfo = cIter->second; + + if (classInfo.hasSchema()) { + encodeHeader(outBuffer, 's', sequence); + classInfo.appendSchema(outBuffer); + sendBuffer(outBuffer, rte, rtk); + QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); + } + else + sendCommandComplete(rtk, sequence, 1, "Schema not available"); + } + else + sendCommandComplete(rtk, sequence, 1, "Class key not found"); + } + else + sendCommandComplete(rtk, sequence, 1, "Package not found"); +} + +void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + uint32_t pos = inBuffer.getPosition(); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + key.decode(inBuffer); + inBuffer.setPosition(pos);; + + QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { + size_t length = validateSchema(inBuffer, cIter->second.kind); + if (length == 0) { + QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); + cMap.erase(key); + } else { + cIter->second.data.resize(length); + inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); + + // Publish a class-indication message + ResizableBuffer outBuffer(qmfV1BufferSize); + + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); + sendBuffer(outBuffer, mExchange, "schema.class"); + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << + " to=schema.class"); + } + } + } +} + +bool ManagementAgent::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->agentBank == bank) + return true; + return false; +} + +uint32_t ManagementAgent::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementAgent::deleteOrphanedAgentsLH() +{ + list<ObjectId> deleteList; + + for (RemoteAgentMap::const_iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + if (iter->first == aIter->first && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) + deleteList.push_back(aIter->first); + } + + for (list<ObjectId>::const_iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) + remoteAgents.erase(*dIter); +} + +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ObjectId& connectionRef) +{ + string label; + uint32_t requestedBrokerBank, requestedAgentBank; + uint32_t assignedBank; + Uuid systemId; + + moveNewObjects(); + + sys::Mutex::ScopedLock lock(userLock); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { + // There already exists an agent on this session. Reject the request. + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); + return; + } + + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + + QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << + " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); + + assignedBank = assignBankLH(requestedAgentBank); + + boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this)); + agent->brokerBank = brokerBank; + agent->agentBank = assignedBank; + agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; + agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); + agent->mgmtObject->set_connectionRef(agent->connectionRef); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); + addObject (agent->mgmtObject, 0); + remoteAgents[connectionRef] = agent; + + QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); + + // Send an Attach Response + ResizableBuffer outBuffer (qmfV1BufferSize); + + encodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); + outBuffer.putLong (assignedBank); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << + " to=" << replyToKey << " seq=" << sequence); +} + +void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const string& userId) +{ + FieldTable ft; + FieldTable::ValuePtr value; + AclModule* acl = broker->getAcl(); + + moveNewObjects(); + + ft.decode(inBuffer); + + QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); + + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) { + value = ft.get("_objectid"); + if (value.get() == 0 || !value->convertsTo<string>()) + return; + + ObjectId selector(value->get<string>()); + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(selector); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (object) { + ResizableBuffer outBuffer (qmfV1BufferSize); + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMACLASS] = object->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), ¶ms)) { + throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId)); + } + } + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + string sBuf; + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + } + } + sendCommandComplete(replyToKey, sequence); + return; + } + + string className (value->get<string>()); + std::list<ManagementObject::shared_ptr> matches; + + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, ¶ms)) { + throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId)); + } + } + + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat.get()); + + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + } + + + // build up a set of all objects to be dumped + { + sys::Mutex::ScopedLock lock(objectLock); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) { + matches.push_back(object); + } + } + } + + // send them + ResizableBuffer outBuffer (qmfV1BufferSize); + while (matches.size()) { + ManagementObject::shared_ptr object = matches.front(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + string sProps, sStats; + object->writeProperties(sProps); + object->writeStatistics(sStats, true); + + size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. + if (len > qmfV1BufferSize) { + QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!"); + } else { + if (outBuffer.available() < len) { // not enough room in current buffer, send it. + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. + } + encodeHeader(outBuffer, 'g', sequence); + outBuffer.putRawData(sProps); + outBuffer.putRawData(sStats); + } + } + matches.pop_front(); + } + + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + } + + sendCommandComplete(replyToKey, sequence); +} + + +void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, const std::string& userId, bool viaLocal) +{ + moveNewObjects(); + + Variant::Map inMap; + Variant::Map::const_iterator i; + Variant::Map headers; + AclModule* acl = broker->getAcl(); + + MapCodec::decode(body, inMap); + QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = viaLocal ? "broker" : name_address; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(rte, rtk, cid, "_what element missing in Query"); + return; + } + + if (i->second.getType() != qpid::types::VAR_STRING) { + sendException(rte, rtk, cid, "_what element is not a string"); + return; + } + + if (i->second.asString() != "OBJECT") { + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } + + string className; + string packageName; + + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); + + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + } + + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat.get()); + + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + } + + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + Variant::List list_; + ObjectId objId(i->second.asMap()); + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock (objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + if (object) { + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMACLASS] = object->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), ¶ms)) { + throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId)); + } + } + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + + object->writeTimestamps(map_); + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + list_.push_back(map_); + } + + string content; + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); + return; + } + } else { + // send class-based result. + if (acl != 0) { + map<acl::Property, string> params; + params[acl::PROP_SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, ¶ms)) { + throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId)); + } + } + Variant::List _list; + Variant::List _subList; + unsigned int objCount = 0; + + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); + } + + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = *iter; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + + if (!object->isDeleted()) { + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->writeTimestamps(map_); + object->mapEncodeValues(values, true, true); // write both stats and properties + object->getObjectId().mapEncode(oidMap); + + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + _subList.push_back(map_); + if (++objCount >= maxReplyObjs) { + objCount = 0; + _list.push_back(_subList); + _subList.clear(); + } + } + } + } + + if (_subList.size()) + _list.push_back(_subList); + + headers["partial"] = Variant(); + string content; + while (_list.size() > 1) { + ListCodec::encode(_list.front().asList(), content); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); + _list.pop_front(); + QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); + } + headers.erase("partial"); + ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); + return; + } + + // Unrecognized query - Send empty message to indicate CommandComplete + string content; + ListCodec::encode(Variant::List(), content); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); +} + + +void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid) +{ + QPID_LOG(debug, "RCVD AgentLocateRequest"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); + map["_values"].asMap()["_heartbeat_interval"] = interval; + map["_values"].asMap()["_epoch"] = bootSequence; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); + clientWasAdded = true; + + QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); +} + + +bool ManagementAgent::authorizeAgentMessage(Message& msg) +{ + sys::Mutex::ScopedLock lock(userLock); + ResizableBuffer inBuffer (qmfV1BufferSize); + uint32_t sequence = 0; + bool methodReq = false; + bool mapMsg = false; + string packageName; + string className; + string methodName; + string cid; + + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); + // + // If the message is larger than our working buffer size (or if it + // could not be converted to an 0-10 messgae-transfer), we can't + // determine if it's authorized or not. In this case, return true + // (authorized) if there is no ACL in place, otherwise return + // false; + // + if (!transfer || transfer->getContentSize() > qmfV1BufferSize) + return broker->getAcl() == 0; + + inBuffer.putRawData(transfer->getContent()); + uint32_t bufferLen = inBuffer.getPosition(); + inBuffer.reset(); + + const framing::MessageProperties* p = + transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; + + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; + + if (headers && p->getAppId() == "qmf2") + { + mapMsg = true; + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (headers->getAsString("qmf.opcode") == "_method_request") + { + methodReq = true; + + // extract object id and method name + + string body; + inBuffer.getRawData(body, bufferLen); + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + + ObjectId objId; + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + QPID_LOG(warning, + "Missing fields in QMF authorize req received."); + return false; + } + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + } catch(exception& /*e*/) { + QPID_LOG(warning, + "Badly formatted QMF authorize req received."); + return false; + } + + // look up schema for object to get package and class name + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " << + objId); + return false; + } + + packageName = iter->second->getPackageName(); + className = iter->second->getClassName(); + } + } else { // old style binary message format + + uint8_t opcode; + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + methodReq = true; + + // extract method name & schema package and class name + + uint8_t hash[16]; + inBuffer.getLongLong(); // skip over object id + inBuffer.getLongLong(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + } + } + + if (methodReq) { + map<acl::Property, string> params; + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = msg.getUserId(); + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) + return true; + + // authorization failed, send reply if replyTo present + + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); + const framing::MessageProperties* p = + transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + string rte = rt.getExchange(); + string rtk = rt.getRoutingKey(); + string cid; + if (p && p->hasCorrelationId()) + cid = p->getCorrelationId(); + + if (mapMsg) { + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, false); + } else { + + ResizableBuffer outBuffer(qmfV1BufferSize); + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + sendBuffer(outBuffer, rte, rtk); + } + + QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + } + + return false; + } + + return true; +} + +void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) +{ + string rte; + string rtk; + + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); + const framing::MessageProperties* p = transfer ? + transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + rte = rt.getExchange(); + rtk = rt.getRoutingKey(); + } + else + return; + + ResizableBuffer inBuffer(qmfV1BufferSize); + uint8_t opcode; + + if (transfer->getContentSize() > qmfV1BufferSize) { + QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << + transfer->getContentSize()); + return; + } + + inBuffer.putRawData(transfer->getContent()); + uint32_t bufferLen = inBuffer.getPosition(); + inBuffer.reset(); + + ScopedManagementContext context(msg.getPublisher()); + const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; + if (headers && p->getAppId() == "qmf2") + { + string opcode = headers->getAsString("qmf.opcode"); + string contentType = headers->getAsString("qmf.content"); + string body; + string cid; + inBuffer.getRawData(body, bufferLen); + { + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (opcode == "_method_request") + return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal); + else if (opcode == "_query_request") + return handleGetQuery(body, rte, rtk, cid, context.getUserId(), viaLocal); + else if (opcode == "_agent_locate_request") + return handleLocateRequest(body, rte, rtk, cid); + } + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + while (inBuffer.getPosition() < bufferLen) { + uint32_t sequence; + if (!checkHeader(inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence); + else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence); + else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence); + else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId()); + else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence, context.getMgmtId()); + else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId()); + } +} + +ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert(pair<string, ClassMap>(name, ClassMap())); + QPID_LOG (debug, "ManagementAgent added package " << name); + + // Publish a package-indication message + ResizableBuffer outBuffer (qmfV1BufferSize); + + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); + sendBuffer(outBuffer, mExchange, "schema.package"); + QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); + + return result.first; +} + +void ManagementAgent::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy(&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << + key.name); + + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); + cIter = cMap.find(key); +} + +void ManagementAgent::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString((*pIter).first); +} + +void ManagementAgent::encodeClassIndication(Buffer& buf, + const std::string packageName, + const SchemaClassKey key, + uint8_t kind) +{ + buf.putOctet(kind); + buf.putShortString(packageName); + key.encode(buf); +} + +size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; +} + +size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint8_t superType = 0; //inBuffer.getOctet(); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + + if (superType == 1) { + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + } + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getAsInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.setPosition(start); // restore original position + return end - start; +} + +size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint8_t superType = 0; //inBuffer.getOctet(); + + uint16_t argCount = inBuffer.getShort(); + + if (superType == 1) { + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + } + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.setPosition(start); // restore original position + return end - start; +} + +ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) +{ + ManagementObjectMap::iterator iter = managementObjects.begin(); + for (; iter != managementObjects.end(); iter++) { + if (oid.equalV1(iter->first)) + break; + } + + return iter; +} + +void ManagementAgent::disallow(const string& className, const string& methodName, const string& message) { + disallowed[make_pair(className, methodName)] = message; +} + +void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const { + _map["_cname"] = name; + _map["_hash"] = qpid::types::Uuid(hash); +} + +void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; + + if ((i = _map.find("_cname")) != _map.end()) { + name = i->second.asString(); + } + + if ((i = _map.find("_hash")) != _map.end()) { + const qpid::types::Uuid& uuid = i->second.asUuid(); + memcpy(hash, uuid.data(), uuid.size()); + } +} + +void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const { + buffer.checkAvailable(encodedBufSize()); + buffer.putShortString(name); + buffer.putBin128(hash); +} + +void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) { + buffer.checkAvailable(encodedBufSize()); + buffer.getShortString(name); + buffer.getBin128(hash); +} + +uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const { + return 1 + name.size() + 16 /* bin128 */; +} + +void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const { + _map["_type"] = kind; + _map["_pending_sequence"] = pendingSequence; + _map["_data"] = data; +} + +void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; + + if ((i = _map.find("_type")) != _map.end()) { + kind = i->second; + } + if ((i = _map.find("_pending_sequence")) != _map.end()) { + pendingSequence = i->second; + } + if ((i = _map.find("_data")) != _map.end()) { + data = i->second.asString(); + } +} + +void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { + Variant::Map _objId, _values; + + map_["_brokerBank"] = brokerBank; + map_["_agentBank"] = agentBank; + map_["_routingKey"] = routingKey; + + connectionRef.mapEncode(_objId); + map_["_object_id"] = _objId; + + mgmtObject->mapEncodeValues(_values, true, false); + map_["_values"] = _values; +} + +void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { + Variant::Map::const_iterator i; + + if ((i = map_.find("_brokerBank")) != map_.end()) { + brokerBank = i->second; + } + + if ((i = map_.find("_agentBank")) != map_.end()) { + agentBank = i->second; + } + + if ((i = map_.find("_routingKey")) != map_.end()) { + routingKey = i->second.getString(); + } + + if ((i = map_.find("_object_id")) != map_.end()) { + connectionRef.mapDecode(i->second.asMap()); + } + + mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this)); + + if ((i = map_.find("_values")) != map_.end()) { + mgmtObject->mapDecodeValues(i->second.asMap()); + } + + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. + mgmtObject->set_connectionRef(connectionRef); +} + +namespace { +bool isDeletedMap(const ManagementObjectMap::value_type& value) { + return value.second->isDeleted(); +} + +bool isDeletedVector(const ManagementObjectVector::value_type& value) { + return value->isDeleted(); +} + +string summarizeMap(const char* name, const ManagementObjectMap& map) { + ostringstream o; + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap); + o << map.size() << " " << name << " (" << deleted << " deleted), "; + return o.str(); +} + +string summarizeVector(const char* name, const ManagementObjectVector& map) { + ostringstream o; + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector); + o << map.size() << " " << name << " (" << deleted << " deleted), "; + return o.str(); +} + +string dumpMap(const ManagementObjectMap& map) { + ostringstream o; + for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) { + o << endl << " " << i->second->getObjectId().getV2Key() + << (i->second->isDeleted() ? " (deleted)" : ""); + } + return o.str(); +} + +string dumpVector(const ManagementObjectVector& map) { + ostringstream o; + for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) { + o << endl << " " << (*i)->getObjectId().getV2Key() + << ((*i)->isDeleted() ? " (deleted)" : ""); + } + return o.str(); +} + +} // namespace + +string ManagementAgent::summarizeAgents() { + ostringstream msg; + if (!remoteAgents.empty()) { + msg << remoteAgents.size() << " agents("; + for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); + i != remoteAgents.end(); ++i) + msg << " " << i->second->routingKey; + msg << "), "; + } + return msg.str(); +} + + +void ManagementAgent::debugSnapshot(const char* title) { + sys::Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock objLock (objectLock); + QPID_LOG(debug, title << ": management snapshot: " + << packages.size() << " packages, " + << summarizeMap("objects", managementObjects) + << summarizeVector("new objects ", newManagementObjects) + << pendingDeletedObjs.size() << " pending deletes" + << summarizeAgents()); + + QPID_LOG_IF(trace, managementObjects.size(), + title << ": objects" << dumpMap(managementObjects)); + QPID_LOG_IF(trace, newManagementObjects.size(), + title << ": new objects" << dumpVector(newManagementObjects)); +} + + +Variant::Map ManagementAgent::toMap(const FieldTable& from) +{ + Variant::Map map; + qpid::amqp_0_10::translate(from, map); + return map; +} + +// construct a DeletedObject from a management object. +ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2) + : packageName(src->getPackageName()), + className(src->getClassName()) +{ + bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish())); + + stringstream oid; + oid << src->getObjectId(); + objectId = oid.str(); + + if (v1) { + src->writeProperties(encodedV1Config); + if (send_stats) { + src->writeStatistics(encodedV1Inst); + } + } + + if (v2) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + src->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(), + src->getClassName(), + "_data", + src->getMd5Sum()); + src->writeTimestamps(map_); + src->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + encodedV2 = map_; + } +} + +// Remove Deleted objects, and save for later publishing... +bool ManagementAgent::moveDeletedObjects() { + typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList; + + sys::Mutex::ScopedLock lock (objectLock); + + DeleteList deleteList; + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + ++iter) + { + ManagementObject::shared_ptr object = iter->second; + if (object->isDeleted()) deleteList.push_back(*iter); + } + + // Iterate in reverse over deleted object list + for (DeleteList::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) + { + ManagementObject::shared_ptr delObj = iter->second; + assert(delObj->isDeleted()); + DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); + + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + managementObjects.erase(iter->first); + } + return !deleteList.empty(); +} + +ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents( + const EventQueue::Batch& batch) +{ + EventQueue::Batch::const_iterator i; + for (i = batch.begin(); i != batch.end(); ++i) { + DeliverableMessage deliverable (i->second, 0); + try { + i->first->route(deliverable); + } catch(exception& e) { + QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what()); + } + } + return i; +} + +namespace { +QPID_TSS const Connection* currentPublisher = 0; +} + +void setManagementExecutionContext(const Connection& p) +{ + currentPublisher = &p; +} + +void resetManagementExecutionContext() +{ + currentPublisher = 0; +} + +const Connection* getCurrentPublisher() +{ + return currentPublisher; +} + +}} + diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..81bf542766 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,388 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/management/ManagementEvent.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Agent.h" +#include "qmf/org/apache/qpid/broker/Memory.h" +#include "qpid/sys/MemStat.h" +#include "qpid/sys/PollableQueue.h" +#include "qpid/types/Variant.h" +#include <qpid/framing/AMQFrame.h> +#include <qpid/framing/ResizableBuffer.h> +#include <boost/shared_ptr.hpp> +#include <memory> +#include <string> +#include <map> + +namespace qpid { +namespace broker { +class Connection; +class ProtocolRegistry; +} +namespace sys { +class Timer; +} +namespace management { + +class ManagementAgent +{ +private: + + int threadPoolSize; + +public: + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + + ManagementAgent (const bool qmfV1, const bool qmfV2); + virtual ~ManagementAgent (); + + /** Called before plugins are initialized */ + void configure (const std::string& dataDir, bool publish, uint16_t interval, + qpid::broker::Broker* broker, int threadPoolSize); + + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); + void getName(std::string& vendor, std::string& product, std::string& instance); + const std::string& getAddress(); + + void setInterval(uint16_t _interval) { interval = _interval; } + void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, + qpid::broker::Exchange::shared_ptr directExchange); + + int getMaxThreads () { return threadPoolSize; } + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + const std::string& key, + bool persistent = false); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + + bool dispatchCommand (qpid::broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args, + const bool topic, + int qmfVersion); + + /** Disallow a method. Attempts to call it will receive an exception with message. */ + void disallow(const std::string& className, const std::string& methodName, const std::string& message); + + uint16_t getBootSequence(void) { return bootSequence; } + void setBootSequence(uint16_t b) { bootSequence = b; writeData(); } + + const framing::Uuid& getUuid() const { return uuid; } + void setUuid(const framing::Uuid& id) { uuid = id; writeData(); } + + static types::Variant::Map toMap(const framing::FieldTable& from); + + class DeletedObject { + public: + typedef boost::shared_ptr<DeletedObject> shared_ptr; + DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); + ~DeletedObject() {}; + const std::string getKey() const { + // used to batch up objects of the same class type + return std::string(packageName + std::string(":") + className); + } + + private: + friend class ManagementAgent; + + std::string packageName; + std::string className; + std::string objectId; + + std::string encodedV1Config; // qmfv1 properties + std::string encodedV1Inst; // qmfv1 statistics + qpid::types::Variant::Map encodedV2; + }; + + typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; + +private: + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + ManagementAgent& agent; + uint32_t brokerBank; + uint32_t agentBank; + std::string routingKey; + ObjectId connectionRef; + qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + + virtual ~RemoteAgent (); + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedBufSize() const; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + + struct SchemaClass + { + uint8_t kind; + ManagementObject::writeSchemaCall_t writeSchemaCall; + std::string data; + uint32_t pendingSequence; + + SchemaClass(uint8_t _kind=0, uint32_t seq=0) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } + void appendSchema (framing::Buffer& buf); + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + + // + // Protected by objectLock + // + ManagementObjectMap managementObjects; + + // + // Protected by addLock + // + ManagementObjectVector newManagementObjects; + + framing::Uuid uuid; + + // + // Lock ordering: userLock -> addLock -> objectLock + // + sys::Mutex userLock; + sys::Mutex addLock; + sys::Mutex objectLock; + + qpid::broker::Exchange::shared_ptr mExchange; + qpid::broker::Exchange::shared_ptr dExchange; + qpid::broker::Exchange::shared_ptr v2Topic; + qpid::broker::Exchange::shared_ptr v2Direct; + std::string dataDir; + bool publish; + uint16_t interval; + qpid::broker::Broker* broker; + qpid::sys::Timer* timer; + qpid::broker::ProtocolRegistry* protocols; + uint16_t bootSequence; + uint32_t nextObjectId; + uint32_t brokerBank; + uint32_t nextRemoteBank; + uint32_t nextRequestSequence; + bool clientWasAdded; + const qpid::sys::AbsTime startTime; + bool suppressed; + + typedef std::pair<std::string,std::string> MethodName; + typedef std::map<MethodName, std::string> DisallowedMethods; + DisallowedMethods disallowed; + bool disallowAllV1Methods; + + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + std::string vendorNameKey; // "." --> "_" + std::string productNameKey; // "." --> "_" + std::string instanceNameKey; // "." --> "_" + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxReplyObjs; + + // list of objects that have been deleted, but have yet to be published + // one final time. + // Indexed by a string composed of the object's package and class name. + // Protected by objectLock. + typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; + PendingDeletedObjsMap pendingDeletedObjs; + + // Pollable queue to serialize event messages + typedef std::pair<boost::shared_ptr<broker::Exchange>, + broker::Message> ExchangeAndMessage; + typedef sys::PollableQueue<ExchangeAndMessage> EventQueue; + + // + // Memory statistics object + // + qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; + + void writeData (); + void periodicProcessing (void); + void deleteObjectNow(const ObjectId& oid); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch); + void sendBuffer(framing::Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBuffer(framing::Buffer& buf, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjects(); + bool moveDeletedObjects(); + + bool authorizeAgentMessage(qpid::broker::Message& msg); + void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false); + + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void encodeClassIndication (framing::Buffer& buf, + const std::string packageName, + const struct SchemaClassKey key, + uint8_t kind); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void deleteOrphanedAgentsLH(); + void sendCommandComplete(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId); + void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId); + void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId); + void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal); + void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal); + void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + + + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); + + std::string summarizeAgents(); + void debugSnapshot(const char* title); + std::auto_ptr<EventQueue> sendQueue; +}; + +void setManagementExecutionContext(const broker::Connection&); +void resetManagementExecutionContext(); +const broker::Connection* getCurrentPublisher(); +}} + +#endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp new file mode 100644 index 0000000000..8ede6940b0 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementDirectExchange.h" +#include "qpid/log/Statement.h" +#include <assert.h> + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), + DirectExchange(_name, _parent, b), + managementAgent(0) {} +ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, false, _args, _parent, b), + DirectExchange(_name, _durable, false, _args, _parent, b), + managementAgent(0) {} + +void ManagementDirectExchange::route(Deliverable& msg) +{ + bool routeIt = true; + + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion); + + if (routeIt) + DirectExchange::route(msg); +} + +void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; + assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange +} + + +ManagementDirectExchange::~ManagementDirectExchange() {} + +const std::string ManagementDirectExchange::typeName("management-direct"); + diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h new file mode 100644 index 0000000000..582354d723 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementDirectExchange_ +#define _ManagementDirectExchange_ + +#include "qpid/broker/DirectExchange.h" +#include "qpid/management/ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementDirectExchange : public virtual DirectExchange +{ + private: + management::ManagementAgent* managementAgent; + int qmfVersion; + + public: + static const std::string typeName; + + ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementDirectExchange(const std::string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); + + virtual std::string getType() const { return typeName; } + + virtual void route(Deliverable& msg); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); + + virtual ~ManagementDirectExchange(); +}; + + +} +} + +#endif diff --git a/qpid/cpp/src/qpid/management/ManagementEvent.h b/qpid/cpp/src/qpid/management/ManagementEvent.h new file mode 100644 index 0000000000..e80175096f --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementEvent.h @@ -0,0 +1,53 @@ +#ifndef _ManagementEvent_ +#define _ManagementEvent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementObject.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qpid { +namespace management { + +class ManagementAgent; + +class ManagementEvent : public ManagementItem { + public: + static const uint8_t MD5_LEN = 16; + //typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t)(std::string&); + virtual ~ManagementEvent() {} + + virtual writeSchemaCall_t getWriteSchemaCall(void) = 0; + //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall(void) = 0; + virtual std::string& getEventName() const = 0; + virtual std::string& getPackageName() const = 0; + virtual uint8_t* getMd5Sum() const = 0; + virtual uint8_t getSeverity() const = 0; + virtual void encode(std::string&) const = 0; + virtual void mapEncode(qpid::types::Variant::Map&) const = 0; +}; + +}} + +#endif /*!_ManagementEvent_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp new file mode 100644 index 0000000000..019963e832 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -0,0 +1,385 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + +#include <stdlib.h> + +using namespace std; +using namespace qpid; +using namespace qpid::management; + +void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) +{ + first = + ((uint64_t) (broker & 0x000fffff)) << 28 | + ((uint64_t) (bank & 0x0fffffff)); +} + +// Deprecated +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) + : agent(0), agentEpoch(seq) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; + second = object; +} + + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) + : agent(0), second(0), agentEpoch(seq) +{ + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) + : agent(_agent), second(0), agentEpoch(seq) +{ + + first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48; +} + + +ObjectId::ObjectId(istream& in) : agent(0) +{ + string text; + in >> text; + fromString(text); +} + +ObjectId::ObjectId(const string& text) : agent(0) +{ + fromString(text); +} + +void ObjectId::fromString(const string& text) +{ +#define FIELDS 5 +#if defined (_WIN32) && !defined (atoll) +# define atoll(X) _atoi64(X) +#endif + + // format: + // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> + // V2: Not used + + string copy(text.c_str()); + char* cText; + char* field[FIELDS]; + bool atFieldStart = true; + int idx = 0; + + cText = const_cast<char*>(copy.c_str()); + for (char* cursor = cText; *cursor; cursor++) { + if (atFieldStart) { + if (idx >= FIELDS) + throw Exception("Invalid ObjectId format"); + field[idx++] = cursor; + atFieldStart = false; + } else { + if (*cursor == '-') { + *cursor = '\0'; + atFieldStart = true; + } + } + } + + if (idx != FIELDS) + throw Exception("Invalid ObjectId format"); + + agentEpoch = atoll(field[1]); + + first = (atoll(field[0]) << 60) + + (atoll(field[1]) << 48) + + (atoll(field[2]) << 28); + + agentName = string(field[3]); + second = atoll(field[4]); +} + + +bool ObjectId::operator==(const ObjectId &other) const +{ + return v2Key == other.v2Key; +} + +bool ObjectId::operator<(const ObjectId &other) const +{ + return v2Key < other.v2Key; +} + +bool ObjectId::equalV1(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + return first == otherFirst && second == other.second; +} + +// encode as V1-format binary +void ObjectId::encode(string& buffer) const +{ + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + if (agent == 0) + body.putLongLong(first); + else + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); +} + +// decode as V1-format binary +void ObjectId::decode(const string& buffer) +{ + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); + v2Key = boost::lexical_cast<string>(second); +} + +// generate the V2 key from the index fields defined +// in the schema. +void ObjectId::setV2Key(const ManagementObject& object) +{ + stringstream oname; + oname << object.getPackageName() << ":" << object.getClassName() << ":" << object.getKey(); + v2Key = oname.str(); +} + + +// encode as V2-format map +void ObjectId::mapEncode(types::Variant::Map& map) const +{ + map["_object_name"] = v2Key; + if (!agentName.empty()) + map["_agent_name"] = agentName; + if (agentEpoch) + map["_agent_epoch"] = agentEpoch; +} + +// decode as v2-format map +void ObjectId::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_object_name")) != map.end()) + v2Key = i->second.asString(); + else + throw Exception("Required _object_name field missing."); + + if ((i = map.find("_agent_name")) != map.end()) + agentName = i->second.asString(); + + if ((i = map.find("_agent_epoch")) != map.end()) + agentEpoch = i->second.asInt64(); +} + + +ObjectId::operator types::Variant::Map() const +{ + types::Variant::Map m; + mapEncode(m); + return m; +} + + + +namespace qpid { +namespace management { + +ostream& operator<<(ostream& out, const ObjectId& i) +{ + uint64_t virtFirst = i.first; + if (i.agent) + virtFirst |= i.agent->getFirst(); + + out << ((virtFirst & 0xF000000000000000LL) >> 60) << + "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << + "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << + "-" << i.agentName << + "-" << i.second << + "(" << i.v2Key << ")"; + return out; +} + +}} + +ManagementObject::ManagementObject(Manageable* _core) : + createTime(qpid::sys::Duration::FromEpoch()), + destroyTime(0), updateTime(createTime), configChanged(true), + instChanged(true), deleted(false), + coreObject(_core), flags(0), forcePublish(false) {} + +void ManagementObject::setUpdateTime() +{ + updateTime = sys::Duration::FromEpoch(); +} + +void ManagementObject::resourceDestroy() +{ + QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); + destroyTime = sys::Duration::FromEpoch(); + deleted = true; +} + +int ManagementObject::maxThreads = 1; +int ManagementObject::nextThreadIndex = 0; + +void ManagementObject::writeTimestamps (string& buf) const +{ + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + string oid; + objectId.encode(oid); + buf += oid; +} + +void ManagementObject::readTimestamps (const string& buf) +{ + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + string unused; + uint8_t unusedUuid[16]; + + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); +} + +uint32_t ManagementObject::writeTimestampsSize() const +{ + return 1 + getPackageName().length() + // str8 + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} + + +void ManagementObject::writeTimestamps (types::Variant::Map& map) const +{ + // types::Variant::Map oid, sid; + + // sid["_package_name"] = getPackageName(); + // sid["_class_name"] = getClassName(); + // sid["_hash"] = qpid::types::Uuid(getMd5Sum()); + // map["_schema_id"] = sid; + + // objectId.mapEncode(oid); + // map["_object_id"] = oid; + + map["_update_ts"] = updateTime; + map["_create_ts"] = createTime; + map["_delete_ts"] = destroyTime; +} + +void ManagementObject::readTimestamps (const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_update_ts")) != map.end()) + updateTime = i->second.asUint64(); + if ((i = map.find("_create_ts")) != map.end()) + createTime = i->second.asUint64(); + if ((i = map.find("_delete_ts")) != map.end()) + destroyTime = i->second.asUint64(); +} + + +void ManagementObject::setReference(ObjectId) {} + +int ManagementObject::getThreadIndex() { + static QPID_TSS int thisIndex = -1; + if (thisIndex == -1) { + Mutex::ScopedLock mutex(accessLock); + thisIndex = nextThreadIndex; + if (nextThreadIndex < maxThreads - 1) + nextThreadIndex++; + } + return thisIndex; +} + + +// void ManagementObject::mapEncode(types::Variant::Map& map, +// bool includeProperties, +// bool includeStatistics) +// { +// types::Variant::Map values; + +// writeTimestamps(map); + +// mapEncodeValues(values, includeProperties, includeStatistics); +// map["_values"] = values; +// } + +// void ManagementObject::mapDecode(const types::Variant::Map& map) +// { +// types::Variant::Map::const_iterator i; + +// readTimestamps(map); + +// if ((i = map.find("_values")) != map.end()) +// mapDecodeValues(i->second.asMap()); +// } diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h new file mode 100644 index 0000000000..93fbec7bc7 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -0,0 +1,246 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/CommonImportExport.h" + +#include "qpid/management/Mutex.h" +#include "qpid/types/Variant.h" +#include <map> +#include <vector> + +#ifdef _IN_QPID_BROKER +#include <boost/shared_ptr.hpp> +#endif + +namespace qpid { +namespace management { + +class Manageable; +class ObjectId; +class ManagementObject; + + +class AgentAttachment { + friend class ObjectId; +private: + uint64_t first; +public: + AgentAttachment() : first(0) {} + QPID_COMMON_EXTERN void setBanks(uint32_t broker, uint32_t bank); + uint64_t getFirst() const { return first; } +}; + + +class ObjectId { +protected: + const AgentAttachment* agent; + uint64_t first; + uint64_t second; + uint64_t agentEpoch; + std::string v2Key; + std::string agentName; + void fromString(const std::string&); +public: + QPID_COMMON_INLINE_EXTERN ObjectId() : agent(0), first(0), second(0), agentEpoch(0) {} + QPID_COMMON_INLINE_EXTERN ObjectId(const types::Variant& map) : + agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); } + QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker); + QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq); + QPID_COMMON_EXTERN ObjectId(std::istream&); + QPID_COMMON_EXTERN ObjectId(const std::string&); + QPID_COMMON_INLINE_EXTERN ObjectId(const std::string& agentAddress, const std::string& key, + uint64_t epoch=0) : agent(0), first(0), second(0), + agentEpoch(epoch), v2Key(key), agentName(agentAddress) {} + + // Deprecated: + QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object); + QPID_COMMON_EXTERN bool operator==(const ObjectId &other) const; + QPID_COMMON_EXTERN bool operator<(const ObjectId &other) const; + QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map) const; + QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); + QPID_COMMON_EXTERN operator types::Variant::Map() const; + QPID_COMMON_INLINE_EXTERN uint32_t encodedSize() const { return 16; }; + QPID_COMMON_EXTERN void encode(std::string& buffer) const; + QPID_COMMON_EXTERN void decode(const std::string& buffer); + QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; + QPID_COMMON_INLINE_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; } + QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object); + QPID_COMMON_INLINE_EXTERN void setAgentName(const std::string& _name) { agentName = _name; } + QPID_COMMON_INLINE_EXTERN const std::string& getAgentName() const { return agentName; } + QPID_COMMON_INLINE_EXTERN const std::string& getV2Key() const { return v2Key; } + friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&); +}; + +class ManagementItem { +public: + static const uint8_t TYPE_U8 = 1; + static const uint8_t TYPE_U16 = 2; + static const uint8_t TYPE_U32 = 3; + static const uint8_t TYPE_U64 = 4; + static const uint8_t TYPE_SSTR = 6; + static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_ABSTIME = 8; + static const uint8_t TYPE_DELTATIME = 9; + static const uint8_t TYPE_REF = 10; + static const uint8_t TYPE_BOOL = 11; + static const uint8_t TYPE_FLOAT = 12; + static const uint8_t TYPE_DOUBLE = 13; + static const uint8_t TYPE_UUID = 14; + static const uint8_t TYPE_FTABLE = 15; + static const uint8_t TYPE_S8 = 16; + static const uint8_t TYPE_S16 = 17; + static const uint8_t TYPE_S32 = 18; + static const uint8_t TYPE_S64 = 19; + static const uint8_t TYPE_LIST = 21; + + static const uint8_t ACCESS_RC = 1; + static const uint8_t ACCESS_RW = 2; + static const uint8_t ACCESS_RO = 3; + + static const uint8_t DIR_I = 1; + static const uint8_t DIR_O = 2; + static const uint8_t DIR_IO = 3; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; + + const static uint8_t CLASS_KIND_TABLE = 1; + const static uint8_t CLASS_KIND_EVENT = 2; + + + +public: + virtual ~ManagementItem() {} +}; + +class QPID_COMMON_CLASS_EXTERN ManagementObject : public ManagementItem +{ +protected: + + uint64_t createTime; + uint64_t destroyTime; + uint64_t updateTime; + ObjectId objectId; + mutable bool configChanged; + mutable bool instChanged; + bool deleted; + Manageable* coreObject; + mutable Mutex accessLock; + uint32_t flags; + + static int nextThreadIndex; + bool forcePublish; + + QPID_COMMON_EXTERN int getThreadIndex(); + QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const; + QPID_COMMON_EXTERN void readTimestamps(const std::string& buf); + QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; + + public: +#ifdef _IN_QPID_BROKER + typedef boost::shared_ptr<ManagementObject> shared_ptr; +#endif + + QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; + QPID_COMMON_EXTERN static int maxThreads; + //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); + typedef void (*writeSchemaCall_t) (std::string&); + + QPID_COMMON_EXTERN ManagementObject(Manageable* _core); + virtual ~ManagementObject() {} + + virtual writeSchemaCall_t getWriteSchemaCall() = 0; + virtual std::string getKey() const = 0; + + // Encode & Decode the property and statistics values + // for this object. + virtual void mapEncodeValues(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) = 0; + virtual void mapDecodeValues(const types::Variant::Map& map) = 0; + virtual void doMethod(std::string& methodName, + const types::Variant::Map& inMap, + types::Variant::Map& outMap, + const std::string& userId) = 0; + QPID_COMMON_EXTERN void writeTimestamps(types::Variant::Map& map) const; + QPID_COMMON_EXTERN void readTimestamps(const types::Variant::Map& buf); + + /** + * The following five methods are not pure-virtual because they will only + * be overridden in cases where QMFv1 is to be supported. + */ + virtual uint32_t writePropertiesSize() const { return 0; } + virtual void readProperties(const std::string&) {} + virtual void writeProperties(std::string&) const {} + virtual void writeStatistics(std::string&, bool = false) {} + virtual void doMethod(std::string&, const std::string&, std::string&, const std::string&) {} + + QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId); + + virtual std::string& getClassName() const = 0; + virtual std::string& getPackageName() const = 0; + virtual uint8_t* getMd5Sum() const = 0; + + void setObjectId(ObjectId oid) { objectId = oid; } + ObjectId getObjectId() { return objectId; } + inline bool getConfigChanged() { return configChanged; } + virtual bool getInstChanged() { return instChanged; } + virtual bool hasInst() { return true; } + inline void setForcePublish(bool f) { forcePublish = f; } + inline bool getForcePublish() { return forcePublish; } + QPID_COMMON_EXTERN void setUpdateTime(); + QPID_COMMON_EXTERN void resourceDestroy(); + inline bool isDeleted() { return deleted; } + inline void setFlags(uint32_t f) { flags = f; } + inline uint32_t getFlags() { return flags; } + bool isSameClass(ManagementObject& other) { + for (int idx = 0; idx < MD5_LEN; idx++) + if (other.getMd5Sum()[idx] != getMd5Sum()[idx]) + return false; + return other.getClassName() == getClassName() && + other.getPackageName() == getPackageName(); + } + + // QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); } + // QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); } + //QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); } + + // Encode/Decode the entire object as a map + //QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map, + //bool includeProperties=true, + //bool includeStatistics=true); + + //QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); +}; + +#ifdef _IN_QPID_BROKER +typedef std::map<ObjectId, ManagementObject::shared_ptr> ManagementObjectMap; +typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; +#endif + +}} + + + +#endif /*!_ManagementObject_*/ diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp new file mode 100644 index 0000000000..0241d5a404 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), + TopicExchange(_name, _parent, b), + managementAgent(0) {} +ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, + bool _durable, + const FieldTable& _args, + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, false, _args, _parent, b), + TopicExchange(_name, _durable, false, _args, _parent, b), + managementAgent(0) {} + +void ManagementTopicExchange::route(Deliverable& msg) +{ + bool routeIt = true; + + // Intercept management agent commands + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion); + + if (routeIt) + TopicExchange::route(msg); +} + +bool ManagementTopicExchange::bind(Queue::shared_ptr queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args) +{ + if (qmfVersion == 1) + managementAgent->clientAdded(routingKey); + return TopicExchange::bind(queue, routingKey, args); +} + +void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv) +{ + managementAgent = agent; + qmfVersion = qv; +} + + +ManagementTopicExchange::~ManagementTopicExchange() {} + +const std::string ManagementTopicExchange::typeName("management-topic"); + diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h new file mode 100644 index 0000000000..f5192a0936 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementTopicExchange_ +#define _ManagementTopicExchange_ + +#include "qpid/broker/TopicExchange.h" +#include "qpid/management/ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementTopicExchange : public virtual TopicExchange +{ + private: + management::ManagementAgent* managementAgent; + int qmfVersion; + + public: + static const std::string typeName; + + ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const std::string& _name, bool _durable, + const qpid::framing::FieldTable& _args, + Manageable* _parent = 0, Broker* broker = 0); + + virtual std::string getType() const { return typeName; } + + virtual void route(Deliverable& msg); + + virtual bool bind(Queue::shared_ptr queue, + const std::string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); + + virtual ~ManagementTopicExchange(); +}; + + +} +} + +#endif diff --git a/qpid/cpp/src/qpid/management/Mutex.cpp b/qpid/cpp/src/qpid/management/Mutex.cpp new file mode 100644 index 0000000000..f05abb01dc --- /dev/null +++ b/qpid/cpp/src/qpid/management/Mutex.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/management/Mutex.h" +#include "qpid/sys/Mutex.h" + +using namespace std; +using namespace qpid::management; + +Mutex::Mutex() : impl(new sys::Mutex()) {} +Mutex::~Mutex() { delete impl; } +void Mutex::lock() { impl->lock(); } +void Mutex::unlock() { impl->unlock(); } + diff --git a/qpid/cpp/src/qpid/management/Mutex.h b/qpid/cpp/src/qpid/management/Mutex.h new file mode 100644 index 0000000000..67ae04bae9 --- /dev/null +++ b/qpid/cpp/src/qpid/management/Mutex.h @@ -0,0 +1,67 @@ +#ifndef _management_Mutex_h +#define _management_Mutex_h + +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/CommonImportExport.h" + +namespace qpid { + namespace sys { + class Mutex; + } + + namespace management { + + /** + * Scoped lock template: calls lock() in ctor, unlock() in dtor. + * L can be any class with lock() and unlock() functions. + */ + template <class L> class ScopedLockTemplate { + public: + ScopedLockTemplate(L& l) : mutex(l) { mutex.lock(); } + ~ScopedLockTemplate() { mutex.unlock(); } + private: + L& mutex; + }; + + template <class L> class ScopedUnlockTemplate { + public: + ScopedUnlockTemplate(L& l) : mutex(l) { mutex.unlock(); } + ~ScopedUnlockTemplate() { mutex.lock(); } + private: + L& mutex; + }; + + class Mutex { + public: + typedef ScopedLockTemplate<Mutex> ScopedLock; + typedef ScopedUnlockTemplate<Mutex> ScopedUnlock; + + QPID_COMMON_EXTERN Mutex(); + QPID_COMMON_EXTERN ~Mutex(); + QPID_COMMON_EXTERN void lock(); + QPID_COMMON_EXTERN void unlock(); + private: + sys::Mutex* impl; + }; + } +} + +#endif + |