summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/management
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/management')
-rw-r--r--qpid/cpp/src/qpid/management/Args.h44
-rw-r--r--qpid/cpp/src/qpid/management/Buffer.cpp105
-rw-r--r--qpid/cpp/src/qpid/management/Buffer.h105
-rw-r--r--qpid/cpp/src/qpid/management/ConnectionSettings.cpp40
-rw-r--r--qpid/cpp/src/qpid/management/ConnectionSettings.h118
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.cpp53
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.h81
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp2832
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h388
-rw-r--r--qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp65
-rw-r--r--qpid/cpp/src/qpid/management/ManagementDirectExchange.h57
-rw-r--r--qpid/cpp/src/qpid/management/ManagementEvent.h53
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp385
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h246
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp73
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.h61
-rw-r--r--qpid/cpp/src/qpid/management/Mutex.cpp29
-rw-r--r--qpid/cpp/src/qpid/management/Mutex.h67
18 files changed, 4802 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/management/Args.h b/qpid/cpp/src/qpid/management/Args.h
new file mode 100644
index 0000000000..5d1cb7e01d
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Args.h
@@ -0,0 +1,44 @@
+#ifndef _Args_
+#define _Args_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+
+namespace qpid {
+namespace management {
+
+class Args
+{
+ public:
+
+ virtual ~Args (void) = 0;
+
+};
+
+inline Args::~Args (void) {}
+
+class ArgsNone : public Args
+{
+};
+
+}}
+
+
+#endif /*!_Args_*/
diff --git a/qpid/cpp/src/qpid/management/Buffer.cpp b/qpid/cpp/src/qpid/management/Buffer.cpp
new file mode 100644
index 0000000000..0ad6e9a851
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Buffer.cpp
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/Buffer.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/amqp_0_10/Codecs.h"
+
+using namespace std;
+
+namespace qpid {
+namespace management {
+
+Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {}
+Buffer::~Buffer() { delete impl; }
+void Buffer::reset() { impl->reset(); }
+uint32_t Buffer::available() { return impl->available(); }
+uint32_t Buffer::getSize() { return impl->getSize(); }
+uint32_t Buffer::getPosition() { return impl->getPosition(); }
+void Buffer::setPosition(uint32_t p) { impl->setPosition(p); }
+char* Buffer::getPointer() { return impl->getPointer(); }
+void Buffer::putOctet(uint8_t i) { impl->putOctet(i); }
+void Buffer::putShort(uint16_t i) { impl->putShort(i); }
+void Buffer::putLong(uint32_t i) { impl->putLong(i); }
+void Buffer::putLongLong(uint64_t i) { impl->putLongLong(i); }
+void Buffer::putInt8(int8_t i) { impl->putInt8(i); }
+void Buffer::putInt16(int16_t i) { impl->putInt16(i); }
+void Buffer::putInt32(int32_t i) { impl->putInt32(i); }
+void Buffer::putInt64(int64_t i) { impl->putInt64(i); }
+void Buffer::putFloat(float i) { impl->putFloat(i); }
+void Buffer::putDouble(double i) { impl->putDouble(i); }
+void Buffer::putBin128(const uint8_t* i) { impl->putBin128(i); }
+uint8_t Buffer::getOctet() { return impl->getOctet(); }
+uint16_t Buffer::getShort() { return impl->getShort(); }
+uint32_t Buffer::getLong() { return impl->getLong(); }
+uint64_t Buffer::getLongLong() { return impl->getLongLong(); }
+int8_t Buffer:: getInt8() { return impl-> getInt8(); }
+int16_t Buffer::getInt16() { return impl->getInt16(); }
+int32_t Buffer::getInt32() { return impl->getInt32(); }
+int64_t Buffer::getInt64() { return impl->getInt64(); }
+float Buffer::getFloat() { return impl->getFloat(); }
+double Buffer::getDouble() { return impl->getDouble(); }
+void Buffer::putShortString(const string& i) { impl->putShortString(i); }
+void Buffer::putMediumString(const string& i) { impl->putMediumString(i); }
+void Buffer::putLongString(const string& i) { impl->putLongString(i); }
+void Buffer::getShortString(string& i) { impl->getShortString(i); }
+void Buffer::getMediumString(string& i) { impl->getMediumString(i); }
+void Buffer::getLongString(string& i) { impl->getLongString(i); }
+void Buffer::getBin128(uint8_t* i) { impl->getBin128(i); }
+void Buffer::putRawData(const string& i) { impl->putRawData(i); }
+void Buffer::getRawData(string& s, uint32_t size) { impl->getRawData(s, size); }
+void Buffer::putRawData(const uint8_t* data, size_t size) { impl->putRawData(data, size); }
+void Buffer::getRawData(uint8_t* data, size_t size) { impl->getRawData(data, size); }
+
+void Buffer::putMap(const types::Variant::Map& i)
+{
+ string encoded;
+ amqp_0_10::MapCodec::encode(i, encoded);
+ impl->putRawData(encoded);
+}
+
+void Buffer::putList(const types::Variant::List& i)
+{
+ string encoded;
+ amqp_0_10::ListCodec::encode(i, encoded);
+ impl->putRawData(encoded);
+}
+
+void Buffer::getMap(types::Variant::Map& map)
+{
+ string encoded;
+ uint32_t saved = impl->getPosition();
+ uint32_t length = impl->getLong();
+ impl->setPosition(saved);
+ impl->getRawData(encoded, length + sizeof(uint32_t));
+ amqp_0_10::MapCodec::decode(encoded, map);
+}
+
+void Buffer::getList(types::Variant::List& list)
+{
+ string encoded;
+ uint32_t saved = impl->getPosition();
+ uint32_t length = impl->getLong();
+ impl->setPosition(saved);
+ impl->getRawData(encoded, length + sizeof(uint32_t));
+ amqp_0_10::ListCodec::decode(encoded, list);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/management/Buffer.h b/qpid/cpp/src/qpid/management/Buffer.h
new file mode 100644
index 0000000000..1ac52bf276
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Buffer.h
@@ -0,0 +1,105 @@
+#ifndef _Management_Buffer_
+#define _Management_Buffer_
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/CommonImportExport.h"
+#include "qpid/types/Exception.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+ class Buffer;
+}
+
+namespace management {
+
+struct OutOfBounds : qpid::types::Exception {
+ OutOfBounds() : qpid::types::Exception(std::string("Out of Bounds")) {}
+};
+
+
+/**
+ * This class is a wrapper around qpid::framing::Buffer that does not include any dependencies
+ * from boost or from qpid::framing.
+ */
+class Buffer
+{
+public:
+ QPID_COMMON_EXTERN Buffer(char* data=0, uint32_t size=0);
+ QPID_COMMON_EXTERN ~Buffer();
+
+ QPID_COMMON_EXTERN void reset();
+
+ QPID_COMMON_EXTERN uint32_t available();
+ QPID_COMMON_EXTERN uint32_t getSize();
+ QPID_COMMON_EXTERN uint32_t getPosition();
+ QPID_COMMON_EXTERN void setPosition(uint32_t);
+ QPID_COMMON_EXTERN char* getPointer();
+
+ QPID_COMMON_EXTERN void putOctet(uint8_t i);
+ QPID_COMMON_EXTERN void putShort(uint16_t i);
+ QPID_COMMON_EXTERN void putLong(uint32_t i);
+ QPID_COMMON_EXTERN void putLongLong(uint64_t i);
+ QPID_COMMON_EXTERN void putInt8(int8_t i);
+ QPID_COMMON_EXTERN void putInt16(int16_t i);
+ QPID_COMMON_EXTERN void putInt32(int32_t i);
+ QPID_COMMON_EXTERN void putInt64(int64_t i);
+ QPID_COMMON_EXTERN void putFloat(float f);
+ QPID_COMMON_EXTERN void putDouble(double f);
+ QPID_COMMON_EXTERN void putBin128(const uint8_t* b);
+
+ QPID_COMMON_EXTERN uint8_t getOctet();
+ QPID_COMMON_EXTERN uint16_t getShort();
+ QPID_COMMON_EXTERN uint32_t getLong();
+ QPID_COMMON_EXTERN uint64_t getLongLong();
+ QPID_COMMON_EXTERN int8_t getInt8();
+ QPID_COMMON_EXTERN int16_t getInt16();
+ QPID_COMMON_EXTERN int32_t getInt32();
+ QPID_COMMON_EXTERN int64_t getInt64();
+ QPID_COMMON_EXTERN float getFloat();
+ QPID_COMMON_EXTERN double getDouble();
+
+ QPID_COMMON_EXTERN void putShortString(const std::string& s);
+ QPID_COMMON_EXTERN void putMediumString(const std::string& s);
+ QPID_COMMON_EXTERN void putLongString(const std::string& s);
+ QPID_COMMON_EXTERN void getShortString(std::string& s);
+ QPID_COMMON_EXTERN void getMediumString(std::string& s);
+ QPID_COMMON_EXTERN void getLongString(std::string& s);
+ QPID_COMMON_EXTERN void getBin128(uint8_t* b);
+
+ QPID_COMMON_EXTERN void putMap(const types::Variant::Map& map);
+ QPID_COMMON_EXTERN void putList(const types::Variant::List& list);
+ QPID_COMMON_EXTERN void getMap(types::Variant::Map& map);
+ QPID_COMMON_EXTERN void getList(types::Variant::List& list);
+
+ QPID_COMMON_EXTERN void putRawData(const std::string& s);
+ QPID_COMMON_EXTERN void getRawData(std::string& s, uint32_t size);
+
+ QPID_COMMON_EXTERN void putRawData(const uint8_t* data, size_t size);
+ QPID_COMMON_EXTERN void getRawData(uint8_t* data, size_t size);
+
+private:
+ framing::Buffer* impl;
+};
+
+}} // namespace qpid::management
+
+#endif
diff --git a/qpid/cpp/src/qpid/management/ConnectionSettings.cpp b/qpid/cpp/src/qpid/management/ConnectionSettings.cpp
new file mode 100644
index 0000000000..1421a26867
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ConnectionSettings.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/management/ConnectionSettings.h"
+#include "qpid/Version.h"
+
+qpid::management::ConnectionSettings::ConnectionSettings() :
+ protocol("tcp"),
+ host("localhost"),
+ port(5672),
+ locale("en_US"),
+ heartbeat(0),
+ maxChannels(32767),
+ maxFrameSize(65535),
+ bounds(2),
+ tcpNoDelay(false),
+ service(qpid::saslName),
+ minSsf(0),
+ maxSsf(256)
+{}
+
+qpid::management::ConnectionSettings::~ConnectionSettings() {}
+
diff --git a/qpid/cpp/src/qpid/management/ConnectionSettings.h b/qpid/cpp/src/qpid/management/ConnectionSettings.h
new file mode 100644
index 0000000000..b631ffa658
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ConnectionSettings.h
@@ -0,0 +1,118 @@
+#ifndef _management_ConnectionSettings_h
+#define _management_ConnectionSettings_h
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonImportExport.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace management {
+
+/**
+ * Settings for a Connection.
+ */
+struct ConnectionSettings {
+
+ QPID_COMMON_EXTERN ConnectionSettings();
+ QPID_COMMON_EXTERN virtual ~ConnectionSettings();
+
+ /**
+ * The protocol used for the connection (defaults to 'tcp')
+ */
+ std::string protocol;
+
+ /**
+ * The host (or ip address) to connect to (defaults to 'localhost').
+ */
+ std::string host;
+ /**
+ * The port to connect to (defaults to 5672).
+ */
+ uint16_t port;
+ /**
+ * Allows an AMQP 'virtual host' to be specified for the
+ * connection.
+ */
+ std::string virtualhost;
+
+ /**
+ * The username to use when authenticating the connection. If not
+ * specified the current users login is used if available.
+ */
+ std::string username;
+ /**
+ * The password to use when authenticating the connection.
+ */
+ std::string password;
+ /**
+ * The SASL mechanism to use when authenticating the connection;
+ * the options are currently PLAIN or ANONYMOUS.
+ */
+ std::string mechanism;
+ /**
+ * Allows a locale to be specified for the connection.
+ */
+ std::string locale;
+ /**
+ * Allows a heartbeat frequency to be specified
+ */
+ uint16_t heartbeat;
+ /**
+ * The maximum number of channels that the client will request for
+ * use on this connection.
+ */
+ uint16_t maxChannels;
+ /**
+ * The maximum frame size that the client will request for this
+ * connection.
+ */
+ uint16_t maxFrameSize;
+ /**
+ * Limit the size of the connections send buffer . The buffer
+ * is limited to bounds * maxFrameSize.
+ */
+ unsigned int bounds;
+ /**
+ * If true, TCP_NODELAY will be set for the connection.
+ */
+ bool tcpNoDelay;
+ /**
+ * SASL service name
+ */
+ std::string service;
+ /**
+ * Minimum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer required.
+ */
+ unsigned int minSsf;
+ /**
+ * Maximum acceptable strength of any SASL negotiated security
+ * layer. 0 means no security layer allowed.
+ */
+ unsigned int maxSsf;
+};
+
+}}
+
+#endif
+
diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp
new file mode 100644
index 0000000000..651215ffb5
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Manageable.cpp
@@ -0,0 +1,53 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+
+using namespace qpid::management;
+using std::string;
+
+string Manageable::StatusText (status_t status, string text)
+{
+ if ((status & STATUS_USER) == STATUS_USER)
+ return text;
+
+ switch (status)
+ {
+ case STATUS_OK : return "OK";
+ case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
+ case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
+ case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
+ case STATUS_PARAMETER_INVALID : return "InvalidParameter";
+ case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
+ case STATUS_FORBIDDEN : return "Forbidden";
+ }
+
+ return "??";
+}
+
+Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
+{
+ return STATUS_UNKNOWN_METHOD;
+}
+
+bool Manageable::AuthorizeMethod(uint32_t, Args&, const std::string&)
+{
+ return true;
+}
+
diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h
new file mode 100644
index 0000000000..ede5c29e43
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Manageable.h
@@ -0,0 +1,81 @@
+#ifndef _Manageable_
+#define _Manageable_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/ManagementObject.h"
+#include "qpid/management/Args.h"
+#include <string>
+#include "qpid/CommonImportExport.h"
+
+namespace qpid {
+namespace management {
+
+class QPID_COMMON_EXTERN Manageable
+{
+ public:
+
+ virtual ~Manageable(void) = 0;
+
+ // status_t is a type used to pass completion status from the method handler.
+ //
+ typedef uint32_t status_t;
+ static std::string StatusText(status_t status, std::string text = std::string());
+
+ static const status_t STATUS_OK = 0;
+ static const status_t STATUS_UNKNOWN_OBJECT = 1;
+ static const status_t STATUS_UNKNOWN_METHOD = 2;
+ static const status_t STATUS_NOT_IMPLEMENTED = 3;
+ static const status_t STATUS_PARAMETER_INVALID = 4;
+ static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
+ static const status_t STATUS_FORBIDDEN = 6;
+ static const status_t STATUS_EXCEPTION = 7;
+ static const status_t STATUS_USER = 0x00010000;
+
+ // Every "Manageable" object must hold a reference to exactly one
+ // management object. This object is always of a class derived from
+ // the pure-virtual "ManagementObject".
+ //
+ // This accessor function returns a pointer to the management object.
+ //
+#ifdef _IN_QPID_BROKER
+ virtual ManagementObject::shared_ptr GetManagementObject() const = 0;
+#else
+ virtual ManagementObject* GetManagementObject() const = 0;
+#endif
+
+ // Every "Manageable" object must implement ManagementMethod. This
+ // function is called when a remote management client invokes a method
+ // on this object. The input and output arguments are specific to the
+ // method being called and must be down-cast to the appropriate sub class
+ // before use.
+ virtual status_t ManagementMethod(uint32_t methodId, Args& args, std::string& text);
+
+ // This optional method can be overridden to allow the agent application to
+ // authorize method invocations. Return true iff the authenticated user identified
+ // in userId us authorized to execute the method.
+ virtual bool AuthorizeMethod(uint32_t methodId, Args& args, const std::string& userId);
+};
+
+inline Manageable::~Manageable(void) {}
+
+}}
+
+#endif /*!_Manageable_*/
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
new file mode 100644
index 0000000000..516babce61
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -0,0 +1,2832 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+// NOTE on use of log levels: The criteria for using trace vs. debug
+// is to use trace for log messages that are generated for each
+// unbatched stats/props notification and debug for everything else.
+
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/PollableQueue.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/List.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <list>
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <typeinfo>
+
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace management {
+
+using boost::intrusive_ptr;
+using qpid::framing::Uuid;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
+using namespace qpid::framing;
+using namespace qpid::broker;
+using namespace qpid;
+using namespace std;
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+namespace {
+const size_t qmfV1BufferSize(65536);
+const string defaultVendorName("vendor");
+const string defaultProductName("product");
+
+// Create a valid binding key substring by
+// replacing all '.' chars with '_'
+const string keyifyNameStr(const string& name)
+{
+ string n2 = name;
+
+ size_t pos = n2.find('.');
+ while (pos != n2.npos) {
+ n2.replace(pos, 1, "_");
+ pos = n2.find('.', pos);
+ }
+ return n2;
+}
+
+struct ScopedManagementContext
+{
+ const Connection* context;
+
+ ScopedManagementContext(const Connection* p) : context(p)
+ {
+ if (p) setManagementExecutionContext(*p);
+ }
+
+ management::ObjectId getObjectId() const
+ {
+ return context ? context->getObjectId() : management::ObjectId();
+ }
+ std::string getUserId() const
+ {
+ return context ? context->getUserId() : std::string();
+ }
+ std::string getMgmtId() const
+ {
+ return context ? context->getMgmtId() : std::string();
+ }
+
+
+ ~ScopedManagementContext()
+ {
+ resetManagementExecutionContext();
+ }
+};
+
+typedef boost::function0<void> FireFunction;
+struct Periodic : public qpid::sys::TimerTask
+{
+ FireFunction fireFunction;
+ qpid::sys::Timer* timer;
+
+ Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+};
+
+Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds)
+ : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
+ fireFunction(f), timer(t) {}
+
+Periodic::~Periodic() {}
+
+void Periodic::fire()
+{
+ setupNextFire();
+ timer->add(this);
+ fireFunction();
+}
+
+}
+
+
+static Variant::Map mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const string& type,
+ const uint8_t *md5Sum)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_type"] = type;
+ map_["_hash"] = qpid::types::Uuid(md5Sum);
+ return map_;
+}
+
+
+ManagementAgent::RemoteAgent::~RemoteAgent ()
+{
+ QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+ if (mgmtObject != 0) {
+ mgmtObject->resourceDestroy();
+ agent.deleteObjectNow(mgmtObject->getObjectId());
+ mgmtObject.reset();
+ }
+}
+
+ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0),
+ startTime(sys::now()),
+ suppressed(false), disallowAllV1Methods(false),
+ vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100)
+{
+ nextObjectId = 1;
+ brokerBank = 1;
+ bootSequence = 1;
+ nextRemoteBank = 10;
+ nextRequestSequence = 1;
+ clientWasAdded = false;
+ attrMap["_vendor"] = defaultVendorName;
+ attrMap["_product"] = defaultProductName;
+
+ memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"));
+ addObject(memstat, "amqp-broker");
+}
+
+ManagementAgent::~ManagementAgent ()
+{
+ {
+ sys::Mutex::ScopedLock lock (userLock);
+
+ // Reset the shared pointers to exchanges. If this is not done now, the exchanges
+ // will stick around until dExchange and mExchange are implicitly destroyed (long
+ // after this destructor completes). Those exchanges hold references to management
+ // objects that will be invalid.
+ dExchange.reset();
+ mExchange.reset();
+ v2Topic.reset();
+ v2Direct.reset();
+
+ remoteAgents.clear();
+ }
+}
+
+void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval,
+ qpid::broker::Broker* _broker, int _threads)
+{
+ dataDir = _dataDir;
+ publish = _publish;
+ interval = _interval;
+ broker = _broker;
+ threadPoolSize = _threads;
+ ManagementObject::maxThreads = threadPoolSize;
+ sendQueue.reset(
+ new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller()));
+ sendQueue->start();
+ timer = &broker->getTimer();
+ timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
+
+ protocols = &broker->getProtocolRegistry();
+ // Get from file or generate and save to file.
+ if (dataDir.empty())
+ {
+ uuid.generate();
+ QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
+ << uuid);
+ }
+ else
+ {
+ string filename(dataDir + "/.mbrokerdata");
+ ifstream inFile(filename.c_str ());
+
+ if (inFile.good())
+ {
+ inFile >> uuid;
+ inFile >> bootSequence;
+ inFile >> nextRemoteBank;
+ inFile.close();
+ if (uuid.isNull()) {
+ uuid.generate();
+ QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);
+ } else
+ QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);
+
+ // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
+ bootSequence++;
+ if (bootSequence & 0xF000)
+ bootSequence = 1;
+ writeData();
+ }
+ else
+ {
+ uuid.generate();
+ QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
+ writeData();
+ }
+
+ QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
+ }
+}
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+ if (vendor.find(':') != vendor.npos) {
+ throw Exception("vendor string cannot contain a ':' character.");
+ }
+ if (product.find(':') != product.npos) {
+ throw Exception("product string cannot contain a ':' character.");
+ }
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ if (uuid.isNull())
+ {
+ throw Exception("ManagementAgent::configure() must be called if default name is used.");
+ }
+ inst = uuid.str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+
+ vendorNameKey = keyifyNameStr(vendor);
+ productNameKey = keyifyNameStr(product);
+ instanceNameKey = keyifyNameStr(inst);
+}
+
+
+void ManagementAgent::getName(string& vendor, string& product, string& instance)
+{
+ vendor = std::string(attrMap["_vendor"]);
+ product = std::string(attrMap["_product"]);
+ instance = std::string(attrMap["_instance"]);
+}
+
+
+const std::string& ManagementAgent::getAddress()
+{
+ return name_address;
+}
+
+
+void ManagementAgent::writeData ()
+{
+ string filename (dataDir + "/.mbrokerdata");
+ ofstream outFile (filename.c_str ());
+
+ if (outFile.good())
+ {
+ outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
+ outFile.close();
+ }
+}
+
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
+{
+ mExchange = _mexchange;
+ dExchange = _dexchange;
+}
+
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+ qpid::broker::Exchange::shared_ptr _dexchange)
+{
+ v2Topic = _texchange;
+ v2Direct = _dexchange;
+}
+
+void ManagementAgent::registerClass (const string& packageName,
+ const string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementAgent::registerEvent (const string& packageName,
+ const string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
+}
+
+// Deprecated: V1 objects
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent)
+{
+ uint16_t sequence;
+ uint64_t objectNum;
+
+ sys::Mutex::ScopedLock lock(addLock);
+ sequence = persistent ? 0 : bootSequence;
+ objectNum = persistId ? persistId : nextObjectId++;
+
+ ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum);
+ objId.setV2Key(*object); // let object generate the v2 key
+
+ object->setObjectId(objId);
+
+ newManagementObjects.push_back(object);
+ QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
+ return objId;
+}
+
+
+
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object,
+ const string& key,
+ bool persistent)
+{
+ uint16_t sequence;
+
+ sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objId(0 /*flags*/, sequence, brokerBank);
+ if (key.empty()) {
+ objId.setV2Key(*object); // let object generate the key
+ } else {
+ objId.setV2Key(key);
+ }
+
+ object->setObjectId(objId);
+ {
+ sys::Mutex::ScopedLock lock(addLock);
+ newManagementObjects.push_back(object);
+ }
+ QPID_LOG(debug, "Management object added: " << objId.getV2Key());
+ return objId;
+}
+
+void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
+{
+ static const std::string severityStr[] = {
+ "emerg", "alert", "crit", "error", "warn",
+ "note", "info", "debug"
+ };
+ uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+
+ if (qmf1Support) {
+ char buffer[qmfV1BufferSize];
+ Buffer outBuffer(buffer, qmfV1BufferSize);
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(sys::Duration::FromEpoch());
+ outBuffer.putOctet(sev);
+ string sBuf;
+ event.encode(sBuf);
+ outBuffer.putRawData(sBuf);
+ sendBuffer(outBuffer, mExchange,
+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ "_event",
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(sys::Duration::FromEpoch());
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ stringstream key;
+ key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+ << "." << keyifyNameStr(event.getEventName())
+ << "." << severityStr[sev]
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+
+ string content;
+ Variant::List list_;
+ list_.push_back(map_);
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str());
+ QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+}
+
+void ManagementAgent::clientAdded (const string& routingKey)
+{
+ sys::Mutex::ScopedLock lock(userLock);
+
+ //
+ // If this routing key is not relevant to object updates, exit.
+ //
+ if ((routingKey.compare(0, 1, "#") != 0) &&
+ (routingKey.compare(0, 9, "console.#") != 0) &&
+ (routingKey.compare(0, 12, "console.obj.") != 0))
+ return;
+
+ //
+ // Mark local objects for full-update.
+ //
+ clientWasAdded = true;
+
+ //
+ // If the routing key is relevant for local objects only, don't involve
+ // any of the remote agents.
+ //
+ if (routingKey.compare(0, 39, "console.obj.*.*.org.apache.qpid.broker.") == 0)
+ return;
+
+ std::list<std::string> rkeys;
+
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++) {
+ rkeys.push_back(aIter->second->routingKey);
+ }
+
+ while (rkeys.size()) {
+ char localBuffer[16];
+ Buffer outBuffer(localBuffer, 16);
+
+ encodeHeader(outBuffer, 'x');
+ sendBuffer(outBuffer, dExchange, rkeys.front());
+ QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
+ rkeys.pop_front();
+ }
+}
+
+void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('2');
+ buf.putOctet (opcode);
+ buf.putLong (seq);
+}
+
+bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
+
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
+}
+
+void ManagementAgent::sendBuffer(Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey)
+{
+ if (suppressed) {
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
+ return;
+ }
+ if (exchange.get() == 0) return;
+
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+
+ size_t length = buf.getPosition();
+ buf.reset();
+ content.castBody<AMQContentBody>()->decode(buf, length);
+
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
+
+ MessageProperties* props =
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(length);
+
+ DeliveryProperties* dp =
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dp->setRoutingKey(routingKey);
+
+ transfer->getFrames().append(content);
+ transfer->setIsManagementMessage(true);
+ Message msg(transfer, transfer);
+ sendQueue->push(make_pair(exchange, msg));
+ buf.reset();
+}
+
+
+void ManagementAgent::sendBuffer(Buffer& buf,
+ const string& exchange,
+ const string& routingKey)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBuffer(buf, ex, routingKey);
+}
+
+
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
+{
+ Variant::Map::const_iterator i;
+
+ if (suppressed) {
+ QPID_LOG(debug, "Suppressing management message to " << routingKey);
+ return;
+ }
+ if (exchange.get() == 0) return;
+
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer);
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody(data)));
+
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
+
+ MessageProperties* props =
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(data.length());
+ if (!cid.empty()) {
+ props->setCorrelationId(cid);
+ }
+ props->setContentType(content_type);
+ props->setAppId("qmf2");
+
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ props->getApplicationHeaders().setString(i->first, i->second.asString());
+ }
+
+ DeliveryProperties* dp =
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dp->setRoutingKey(routingKey);
+ if (ttl_msec) {
+ dp->setTtl(ttl_msec);
+ }
+ transfer->getFrames().append(content);
+ transfer->computeRequiredCredit();
+ transfer->setIsManagementMessage(true);
+ transfer->computeExpiration();
+ Message msg(transfer, transfer);
+
+ sendQueue->push(make_pair(exchange, msg));
+}
+
+
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ const string& exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+}
+
+
+/** Objects that have been added since the last periodic poll are temporarily
+ * saved in the newManagementObjects list. This allows objects to be
+ * added without needing to block on the userLock (objectLock is used instead).
+ * These new objects need to be integrated into the object database
+ * (managementObjects) *before* they can be properly managed. This routine
+ * performs the integration.
+ *
+ * Note well: objects on the newManagementObjects list may have been
+ * marked as "deleted", and, possibly re-added. This would result in
+ * duplicate object ids. To avoid clashes, don't put deleted objects
+ * into the active object database.
+ */
+void ManagementAgent::moveNewObjects()
+{
+ sys::Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock objLock (objectLock);
+ while (!newManagementObjects.empty()) {
+ ManagementObject::shared_ptr object = newManagementObjects.back();
+ newManagementObjects.pop_back();
+
+ if (object->isDeleted()) {
+ DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
+ } else { // add to active object list, check for duplicates.
+ ObjectId oid = object->getObjectId();
+ ManagementObjectMap::iterator destIter = managementObjects.find(oid);
+ if (destIter != managementObjects.end()) {
+ // duplicate found. It is OK if the old object has been marked
+ // deleted, just replace the old with the new.
+ ManagementObject::shared_ptr oldObj = destIter->second;
+ if (!oldObj->isDeleted()) {
+ // Duplicate non-deleted objects? This is a user error - oids must be unique.
+ // for now, leak the old object (safer than deleting - may still be referenced)
+ // and complain loudly...
+ QPID_LOG(error, "Detected two management objects with the same identifier: " << oid);
+ oldObj->resourceDestroy();
+ }
+ DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
+ // QPID-3666: be sure to replace the -index- also, as non-key members of
+ // the index object may be different for the new object! So erase the
+ // entry, rather than []= assign here:
+ managementObjects.erase(destIter);
+ }
+ managementObjects[oid] = object;
+ }
+ }
+}
+
+void ManagementAgent::periodicProcessing (void)
+{
+#define HEADROOM 4096
+ sys::Mutex::ScopedLock lock (userLock);
+ debugSnapshot("Management agent periodic processing");
+ string routingKey;
+ string sBuf;
+
+ moveNewObjects();
+
+ //
+ // If we're publishing updates, get the latest memory statistics and uptime now
+ //
+ if (publish) {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
+ }
+
+ //
+ // Use a copy of the management object map to avoid holding the objectLock
+ //
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
+ }
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
+ iter++) {
+ ManagementObject::shared_ptr object = *iter;
+ object->setFlags(0);
+ if (clientWasAdded) {
+ object->setForcePublish(true);
+ }
+ }
+
+ clientWasAdded = false;
+
+ // first send the pending deletes before sending updates. This prevents a
+ // "false delete" scenario: if an object was deleted then re-added during
+ // the last poll cycle, it will have a delete entry and an active entry.
+ // if we sent the active update first, _then_ the delete update, clients
+ // would incorrectly think the object was deleted. See QPID-2997
+ //
+ bool objectsDeleted = moveDeletedObjects();
+ PendingDeletedObjsMap localPendingDeletedObjs;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ localPendingDeletedObjs.swap(pendingDeletedObjs);
+ }
+
+ //
+ // If we are not publishing updates, just clear the pending deletes. There's no
+ // need to tell anybody.
+ //
+ if (!publish)
+ localPendingDeletedObjs.clear();
+
+ ResizableBuffer msgBuffer(qmfV1BufferSize);
+ if (!localPendingDeletedObjs.empty()) {
+ for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin();
+ mIter != localPendingDeletedObjs.end();
+ mIter++) {
+ std::string packageName;
+ std::string className;
+ msgBuffer.reset();
+ uint32_t v1Objs = 0;
+ uint32_t v2Objs = 0;
+ Variant::List list_;
+
+ size_t pos = mIter->first.find(":");
+ packageName = mIter->first.substr(0, pos);
+ className = mIter->first.substr(pos+1);
+
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
+ std::string oid = (*lIter)->objectId;
+ if (!(*lIter)->encodedV1Config.empty()) {
+ encodeHeader(msgBuffer, 'c');
+ msgBuffer.putRawData((*lIter)->encodedV1Config);
+ QPID_LOG(trace, "Deleting V1 properties " << oid
+ << " len=" << (*lIter)->encodedV1Config.size());
+ v1Objs++;
+ }
+ if (!(*lIter)->encodedV1Inst.empty()) {
+ encodeHeader(msgBuffer, 'i');
+ msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ QPID_LOG(trace, "Deleting V1 statistics " << oid
+ << " len=" << (*lIter)->encodedV1Inst.size());
+ v1Objs++;
+ }
+ if (v1Objs >= maxReplyObjs) {
+ v1Objs = 0;
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
+ << key.str() << " len=" << contentSize);
+ }
+
+ if (!(*lIter)->encodedV2.empty()) {
+ QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
+ list_.push_back((*lIter)->encodedV2);
+ if (++v2Objs >= maxReplyObjs) {
+ v2Objs = 0;
+
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ }
+ } // end current list
+
+ // send any remaining objects...
+
+ if (v1Objs) {
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ }
+
+ if (!list_.empty()) {
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ } // end map
+ }
+
+ //
+ // Process the entire object map.
+ //
+ // If publish is disabled, don't send any updates.
+ //
+ while (publish) {
+ msgBuffer.reset();
+ Variant::List list_;
+ uint32_t pcount;
+ uint32_t scount;
+ uint32_t v1Objs, v2Objs;
+ ManagementObjectVector::iterator baseIter;
+ std::string packageName;
+ std::string className;
+
+ for (baseIter = localManagementObjects.begin();
+ baseIter != localManagementObjects.end();
+ baseIter++) {
+ ManagementObject::shared_ptr baseObject = *baseIter;
+ //
+ // Skip until we find a base object requiring processing...
+ //
+ if (baseObject->getFlags() == 0) {
+ packageName = baseObject->getPackageName();
+ className = baseObject->getClassName();
+ break;
+ }
+ }
+
+ if (baseIter == localManagementObjects.end())
+ break; // done - all objects processed
+
+ pcount = scount = 0;
+ v1Objs = 0;
+ v2Objs = 0;
+ list_.clear();
+ msgBuffer.reset();
+
+ for (ManagementObjectVector::iterator iter = baseIter;
+ iter != localManagementObjects.end();
+ iter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
+ ManagementObject::shared_ptr baseObject = *baseIter;
+ ManagementObject::shared_ptr object = *iter;
+ bool send_stats, send_props;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ // skip any objects marked deleted since our first pass. Deal with them
+ // on the next periodic cycle...
+ if (object->isDeleted()) {
+ continue;
+ }
+
+ send_props = (object->getConfigChanged() || object->getForcePublish());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_props && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
+ encodeHeader(msgBuffer, 'c');
+ sBuf.clear();
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 properties "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
+ }
+
+ if (send_stats && qmf1Support) {
+ size_t pos = msgBuffer.getPosition();
+ encodeHeader(msgBuffer, 'i');
+ sBuf.clear();
+ object->writeStatistics(sBuf);
+ msgBuffer.putRawData(sBuf);
+ QPID_LOG(trace, "Changed V1 statistics "
+ << object->getObjectId().getV2Key()
+ << " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
+ }
+
+ if ((send_stats || send_props) && qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+ v2Objs++;
+ QPID_LOG(trace, "Changed V2"
+ << (send_stats? " statistics":"")
+ << (send_props? " properties":"")
+ << " map=" << map_);
+ }
+
+ if (send_props) pcount++;
+ if (send_stats) scount++;
+
+ object->setForcePublish(false);
+
+ if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+ (qmf2Support && (v2Objs >= maxReplyObjs)))
+ break; // have enough objects, send an indication...
+ }
+ }
+
+ if (pcount || scount) {
+ if (qmf1Support) {
+ if (msgBuffer.getPosition() > 0) {
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << contentSize);
+ }
+ }
+
+ if (qmf2Support) {
+ string content;
+ ListCodec::encode(list_, content);
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
+ QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
+ << " props=" << pcount
+ << " stats=" << scount
+ << " len=" << content.length());
+ }
+ }
+ }
+ } // end processing updates for all objects
+
+ if (objectsDeleted) {
+ sys::Mutex::ScopedLock lock (userLock);
+ deleteOrphanedAgentsLH();
+ }
+
+ // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
+
+ if (qmf1Support) {
+ char msgChars[qmfV1BufferSize];
+ Buffer msgBuffer(msgChars, qmfV1BufferSize);
+ encodeHeader(msgBuffer, 'h');
+ msgBuffer.putLongLong(sys::Duration::FromEpoch());
+
+ routingKey = "console.heartbeat.1.0";
+ sendBuffer(msgBuffer, mExchange, routingKey);
+ QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
+ }
+
+ if (qmf2Support) {
+ std::stringstream addr_key;
+
+ addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ addr_key << "." << instanceNameKey;
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch());
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
+
+ string content;
+ MapCodec::encode(map, content);
+
+ // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
+ // time to prevent stale heartbeats from getting to the consoles.
+ sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
+
+ QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
+ }
+}
+
+void ManagementAgent::deleteObjectNow(const ObjectId& oid)
+{
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(oid);
+ if (iter == managementObjects.end())
+ return;
+ object = iter->second;
+ if (!object->isDeleted())
+ return;
+ managementObjects.erase(oid);
+ }
+
+#define DNOW_BUFSIZE 2048
+ char msgChars[DNOW_BUFSIZE];
+ Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
+ Variant::List list_;
+ stringstream v1key, v2key;
+
+ if (publish && qmf1Support) {
+ string sBuf;
+
+ v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ }
+
+ if (publish && qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+ v2key << "agent.ind.data." << keyifyNameStr(object->getPackageName())
+ << "." << keyifyNameStr(object->getClassName())
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ v2key << "." << instanceNameKey;
+ }
+
+ object.reset();
+
+ // object deleted, ok to drop lock now.
+
+ if (publish && qmf1Support) {
+ sendBuffer(msgBuffer, mExchange, v1key.str());
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
+ }
+
+ if (publish && qmf2Support) {
+ Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ string content;
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0);
+ QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
+ }
+}
+
+void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence,
+ uint32_t code, const string& text)
+{
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+
+ encodeHeader (outBuffer, 'z', sequence);
+ outBuffer.putLong (code);
+ outBuffer.putShortString (text);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+ replyToKey << " seq=" << sequence);
+}
+
+void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid,
+ const string& text, uint32_t code, bool viaLocal)
+{
+ static const string addr_exchange("qmf.default.direct");
+
+ Variant::Map map;
+ Variant::Map headers;
+ Variant::Map values;
+ string content;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = viaLocal ? "broker" : name_address;
+
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
+
+ MapCodec::encode(map, content);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
+
+ QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
+}
+
+bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
+ const string& routingKey,
+ const FieldTable* /*args*/,
+ const bool topic,
+ int qmfVersion)
+{
+ Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
+
+ if (topic && qmfVersion == 1) {
+
+ // qmf1 is bound only to the topic management exchange.
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.1.0.#
+ // broker
+ // schema.#
+
+ if (routingKey == "broker") {
+ dispatchAgentCommand(msg);
+ return false;
+ }
+
+ if (routingKey.length() > 6) {
+
+ if (routingKey.compare(0, 9, "agent.1.0") == 0) {
+ dispatchAgentCommand(msg);
+ return false;
+ }
+
+ if (routingKey.compare(0, 8, "agent.1.") == 0) {
+ return authorizeAgentMessage(msg);
+ }
+
+ if (routingKey.compare(0, 7, "schema.") == 0) {
+ dispatchAgentCommand(msg);
+ return true;
+ }
+ }
+ }
+
+ if (qmfVersion == 2) {
+
+ if (topic) {
+ // Intercept messages bound to:
+ // "console.ind.locate.# - process these messages, and also allow them to be forwarded.
+ if (routingKey == "console.request.agent_locate") {
+ dispatchAgentCommand(msg);
+ return true;
+ }
+
+ } else { // direct exchange
+
+ // Intercept messages bound to:
+ // "broker" - generic alias for the local broker
+ // "<name_address>" - the broker agent's proper name
+ // and do not forward them futher
+ if (routingKey == "broker" || routingKey == name_address) {
+ dispatchAgentCommand(msg, routingKey == "broker");
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const string& userId)
+{
+ moveNewObjects();
+
+ string methodName;
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+ AclModule* acl = broker->getAcl();
+ string inArgs;
+
+ string sBuf;
+ inBuffer.getRawData(sBuf, 16);
+ ObjectId objId;
+ objId.decode(sBuf);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+ inBuffer.getRawData(inArgs, inBuffer.available());
+
+ QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ methodName << " replyTo=" << replyToKey);
+
+ encodeHeader(outBuffer, 'm', sequence);
+
+ if (disallowAllV1Methods) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2");
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
+ return;
+ }
+
+ DisallowedMethods::const_iterator i = disallowed.find(make_pair(className, methodName));
+ if (i != disallowed.end()) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(i->second);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ return;
+ }
+
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMAPACKAGE] = packageName;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ return;
+ }
+ }
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (!object || object->isDeleted()) {
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ } else {
+ if ((object->getPackageName() != packageName) ||
+ (object->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
+ outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
+ }
+ else {
+ uint32_t pos = outBuffer.getPosition();
+ try {
+ string outBuf;
+ object->doMethod(methodName, inArgs, outBuf, userId);
+ outBuffer.putRawData(outBuf);
+ } catch(exception& e) {
+ outBuffer.setPosition(pos);;
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putMediumString(e.what());
+ }
+ }
+ }
+
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
+}
+
+
+void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk,
+ const string& cid, const string& userId, bool viaLocal)
+{
+ moveNewObjects();
+
+ string methodName;
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+ string content;
+ string error;
+ uint32_t errorCode(0);
+
+ Variant::Map outMap;
+ Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.agent"] = viaLocal ? "broker" : name_address;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ Manageable::STATUS_PARAMETER_INVALID, viaLocal);
+ return;
+ }
+
+ ObjectId objId;
+ Variant::Map inArgs;
+ Variant::Map callMap;
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+ } catch(exception& e) {
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ return;
+ }
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (!object || object->isDeleted()) {
+ stringstream estr;
+ estr << "No object found with ID=" << objId;
+ sendException(rte, rtk, cid, estr.str(), 1, viaLocal);
+ return;
+ }
+
+ // validate
+ AclModule* acl = broker->getAcl();
+ DisallowedMethods::const_iterator i;
+
+ i = disallowed.find(make_pair(object->getClassName(), methodName));
+ if (i != disallowed.end()) {
+ sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+ return;
+ }
+
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ Manageable::STATUS_FORBIDDEN, viaLocal);
+ return;
+ }
+ }
+
+ // invoke the method
+
+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName()
+ << ":" << object->getClassName() << " method=" <<
+ methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
+
+ try {
+ object->doMethod(methodName, inArgs, callMap, userId);
+ errorCode = callMap["_status_code"].asUint32();
+ if (errorCode == 0) {
+ outMap["_arguments"] = Variant::Map();
+ for (Variant::Map::const_iterator iter = callMap.begin();
+ iter != callMap.end(); iter++)
+ if (iter->first != "_status_code" && iter->first != "_status_text")
+ outMap["_arguments"].asMap()[iter->first] = iter->second;
+ } else
+ error = callMap["_status_text"].asString();
+ } catch(exception& e) {
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ return;
+ }
+
+ if (errorCode != 0) {
+ sendException(rte, rtk, cid, error, errorCode, viaLocal);
+ return;
+ }
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
+ QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
+}
+
+
+void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence)
+{
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+
+ QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
+
+ encodeHeader (outBuffer, 'b', sequence);
+ uuid.encode (outBuffer);
+
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
+}
+
+void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence)
+{
+ QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+
+ {
+ sys::Mutex::ScopedLock lock(userLock);
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
+ }
+ }
+
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
+ }
+
+ sendCommandComplete(replyToKey, sequence);
+}
+
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+{
+ string packageName;
+
+ inBuffer.getShortString(packageName);
+
+ QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+
+ sys::Mutex::ScopedLock lock(userLock);
+ findOrAddPackageLH(packageName);
+}
+
+void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+{
+ string packageName;
+
+ inBuffer.getShortString(packageName);
+
+ QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+
+ typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
+ std::list<_ckeyType> classes;
+ {
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
+ {
+ ClassMap &cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
+ cIter++) {
+ if (cIter->second.hasSchema()) {
+ classes.push_back(make_pair(cIter->first, cIter->second.kind));
+ }
+ }
+ }
+ }
+
+ while (classes.size()) {
+ ResizableBuffer outBuffer(qmfV1BufferSize);
+
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
+
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+ "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
+ classes.pop_front();
+ }
+ sendCommandComplete(replyToKey, sequence);
+}
+
+void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ uint8_t kind = inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ "), replyTo=" << replyToKey);
+
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ ClassMap::iterator cIter = pIter->second.find(key);
+ if (cIter == pIter->second.end() || !cIter->second.hasSchema()) {
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+ uint32_t sequence = nextRequestSequence++;
+
+ // Schema Request
+ encodeHeader (outBuffer, 'S', sequence);
+ outBuffer.putShortString(packageName);
+ key.encode(outBuffer);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ "), to=" << replyToKey << " seq=" << sequence);
+
+ if (cIter != pIter->second.end())
+ pIter->second.erase(key);
+
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
+ }
+}
+
+void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
+{
+ // If the management package is attached locally (embedded in the broker or
+ // linked in via plug-in), call the schema handler directly. If the package
+ // is from a remote management agent, send the stored schema information.
+
+ if (writeSchemaCall != 0) {
+ string schema;
+ writeSchemaCall(schema);
+ buf.putRawData(schema);
+ } else
+ buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
+}
+
+void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString (packageName);
+ key.decode(inBuffer);
+
+ QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
+
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap& cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ ResizableBuffer outBuffer(qmfV1BufferSize);
+ SchemaClass& classInfo = cIter->second;
+
+ if (classInfo.hasSchema()) {
+ encodeHeader(outBuffer, 's', sequence);
+ classInfo.appendSchema(outBuffer);
+ sendBuffer(outBuffer, rte, rtk);
+ QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
+ }
+ else
+ sendCommandComplete(rtk, sequence, 1, "Schema not available");
+ }
+ else
+ sendCommandComplete(rtk, sequence, 1, "Class key not found");
+ }
+ else
+ sendCommandComplete(rtk, sequence, 1, "Package not found");
+}
+
+void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ uint32_t pos = inBuffer.getPosition();
+ inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ key.decode(inBuffer);
+ inBuffer.setPosition(pos);;
+
+ QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap& cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
+ size_t length = validateSchema(inBuffer, cIter->second.kind);
+ if (length == 0) {
+ QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
+ cMap.erase(key);
+ } else {
+ cIter->second.data.resize(length);
+ inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
+
+ // Publish a class-indication message
+ ResizableBuffer outBuffer(qmfV1BufferSize);
+
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind);
+ sendBuffer(outBuffer, mExchange, "schema.class");
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+ " to=schema.class");
+ }
+ }
+ }
+}
+
+bool ManagementAgent::bankInUse (uint32_t bank)
+{
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++)
+ if (aIter->second->agentBank == bank)
+ return true;
+ return false;
+}
+
+uint32_t ManagementAgent::allocateNewBank ()
+{
+ while (bankInUse (nextRemoteBank))
+ nextRemoteBank++;
+
+ uint32_t allocated = nextRemoteBank++;
+ writeData ();
+ return allocated;
+}
+
+uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank)
+{
+ if (requestedBank == 0 || bankInUse (requestedBank))
+ return allocateNewBank ();
+ return requestedBank;
+}
+
+void ManagementAgent::deleteOrphanedAgentsLH()
+{
+ list<ObjectId> deleteList;
+
+ for (RemoteAgentMap::const_iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
+ bool found = false;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ if (iter->first == aIter->first && !iter->second->isDeleted()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ deleteList.push_back(aIter->first);
+ }
+
+ for (list<ObjectId>::const_iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
+ remoteAgents.erase(*dIter);
+}
+
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ObjectId& connectionRef)
+{
+ string label;
+ uint32_t requestedBrokerBank, requestedAgentBank;
+ uint32_t assignedBank;
+ Uuid systemId;
+
+ moveNewObjects();
+
+ sys::Mutex::ScopedLock lock(userLock);
+ deleteOrphanedAgentsLH();
+ RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
+ if (aIter != remoteAgents.end()) {
+ // There already exists an agent on this session. Reject the request.
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
+ return;
+ }
+
+ inBuffer.getShortString(label);
+ systemId.decode(inBuffer);
+ requestedBrokerBank = inBuffer.getLong();
+ requestedAgentBank = inBuffer.getLong();
+
+ QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+ " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
+
+ assignedBank = assignBankLH(requestedAgentBank);
+
+ boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
+ agent->brokerBank = brokerBank;
+ agent->agentBank = assignedBank;
+ agent->routingKey = replyToKey;
+ agent->connectionRef = connectionRef;
+ agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
+ agent->mgmtObject->set_connectionRef(agent->connectionRef);
+ agent->mgmtObject->set_label (label);
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
+ agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
+ agent->mgmtObject->set_brokerBank (brokerBank);
+ agent->mgmtObject->set_agentBank (assignedBank);
+ addObject (agent->mgmtObject, 0);
+ remoteAgents[connectionRef] = agent;
+
+ QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+
+ // Send an Attach Response
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+
+ encodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (brokerBank);
+ outBuffer.putLong (assignedBank);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+ " to=" << replyToKey << " seq=" << sequence);
+}
+
+void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const string& userId)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+ AclModule* acl = broker->getAcl();
+
+ moveNewObjects();
+
+ ft.decode(inBuffer);
+
+ QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
+
+ value = ft.get("_class");
+ if (value.get() == 0 || !value->convertsTo<string>()) {
+ value = ft.get("_objectid");
+ if (value.get() == 0 || !value->convertsTo<string>())
+ return;
+
+ ObjectId selector(value->get<string>());
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(selector);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (object) {
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
+ }
+ }
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ string sBuf;
+ encodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ }
+ }
+ sendCommandComplete(replyToKey, sequence);
+ return;
+ }
+
+ string className (value->get<string>());
+ std::list<ManagementObject::shared_ptr> matches;
+
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
+ }
+ }
+
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
+
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
+
+ // build up a set of all objects to be dumped
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className) {
+ matches.push_back(object);
+ }
+ }
+ }
+
+ // send them
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+ while (matches.size()) {
+ ManagementObject::shared_ptr object = matches.front();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ string sProps, sStats;
+ object->writeProperties(sProps);
+ object->writeStatistics(sStats, true);
+
+ size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
+ if (len > qmfV1BufferSize) {
+ QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!");
+ } else {
+ if (outBuffer.available() < len) { // not enough room in current buffer, send it.
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
+ }
+ encodeHeader(outBuffer, 'g', sequence);
+ outBuffer.putRawData(sProps);
+ outBuffer.putRawData(sStats);
+ }
+ }
+ matches.pop_front();
+ }
+
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ }
+
+ sendCommandComplete(replyToKey, sequence);
+}
+
+
+void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, const std::string& userId, bool viaLocal)
+{
+ moveNewObjects();
+
+ Variant::Map inMap;
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+ AclModule* acl = broker->getAcl();
+
+ MapCodec::decode(body, inMap);
+ QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = viaLocal ? "broker" : name_address;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(rte, rtk, cid, "_what element missing in Query");
+ return;
+ }
+
+ if (i->second.getType() != qpid::types::VAR_STRING) {
+ sendException(rte, rtk, cid, "_what element is not a string");
+ return;
+ }
+
+ if (i->second.asString() != "OBJECT") {
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ return;
+ }
+
+ string className;
+ string packageName;
+
+ /*
+ * Handle the _schema_id element, if supplied.
+ */
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
+
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+ }
+
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
+
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ Variant::List list_;
+ ObjectId objId(i->second.asMap());
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock (objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+ if (object) {
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, object->getObjectId().getV2Key(), &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object " << object->getObjectId().getV2Key() << " from " << userId));
+ }
+ }
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ list_.push_back(map_);
+ }
+
+ string content;
+
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
+ return;
+ }
+ } else {
+ // send class-based result.
+ if (acl != 0) {
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUERY, className /* class-wide query */, &params)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("unauthorized-access: ACL denied QMF query of object class " << className << " from " << userId));
+ }
+ }
+ Variant::List _list;
+ Variant::List _subList;
+ unsigned int objCount = 0;
+
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
+ }
+
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
+ iter++) {
+ ManagementObject::shared_ptr object = *iter;
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+
+ if (!object->isDeleted()) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ object->getObjectId().mapEncode(oidMap);
+
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ _subList.push_back(map_);
+ if (++objCount >= maxReplyObjs) {
+ objCount = 0;
+ _list.push_back(_subList);
+ _subList.clear();
+ }
+ }
+ }
+ }
+
+ if (_subList.size())
+ _list.push_back(_subList);
+
+ headers["partial"] = Variant();
+ string content;
+ while (_list.size() > 1) {
+ ListCodec::encode(_list.front().asList(), content);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
+ _list.pop_front();
+ QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
+ }
+ headers.erase("partial");
+ ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
+ return;
+ }
+
+ // Unrecognized query - Send empty message to indicate CommandComplete
+ string content;
+ ListCodec::encode(Variant::List(), content);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
+}
+
+
+void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid)
+{
+ QPID_LOG(debug, "RCVD AgentLocateRequest");
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch());
+ map["_values"].asMap()["_heartbeat_interval"] = interval;
+ map["_values"].asMap()["_epoch"] = bootSequence;
+
+ string content;
+ MapCodec::encode(map, content);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
+ clientWasAdded = true;
+
+ QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
+}
+
+
+bool ManagementAgent::authorizeAgentMessage(Message& msg)
+{
+ sys::Mutex::ScopedLock lock(userLock);
+ ResizableBuffer inBuffer (qmfV1BufferSize);
+ uint32_t sequence = 0;
+ bool methodReq = false;
+ bool mapMsg = false;
+ string packageName;
+ string className;
+ string methodName;
+ string cid;
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+ //
+ // If the message is larger than our working buffer size (or if it
+ // could not be converted to an 0-10 messgae-transfer), we can't
+ // determine if it's authorized or not. In this case, return true
+ // (authorized) if there is no ACL in place, otherwise return
+ // false;
+ //
+ if (!transfer || transfer->getContentSize() > qmfV1BufferSize)
+ return broker->getAcl() == 0;
+
+ inBuffer.putRawData(transfer->getContent());
+ uint32_t bufferLen = inBuffer.getPosition();
+ inBuffer.reset();
+
+ const framing::MessageProperties* p =
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
+
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+
+ if (headers && p->getAppId() == "qmf2")
+ {
+ mapMsg = true;
+
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (headers->getAsString("qmf.opcode") == "_method_request")
+ {
+ methodReq = true;
+
+ // extract object id and method name
+
+ string body;
+ inBuffer.getRawData(body, bufferLen);
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+
+ ObjectId objId;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ QPID_LOG(warning,
+ "Missing fields in QMF authorize req received.");
+ return false;
+ }
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+ } catch(exception& /*e*/) {
+ QPID_LOG(warning,
+ "Badly formatted QMF authorize req received.");
+ return false;
+ }
+
+ // look up schema for object to get package and class name
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " <<
+ objId);
+ return false;
+ }
+
+ packageName = iter->second->getPackageName();
+ className = iter->second->getClassName();
+ }
+ } else { // old style binary message format
+
+ uint8_t opcode;
+
+ if (!checkHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ methodReq = true;
+
+ // extract method name & schema package and class name
+
+ uint8_t hash[16];
+ inBuffer.getLongLong(); // skip over object id
+ inBuffer.getLongLong();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ }
+ }
+
+ if (methodReq) {
+ map<acl::Property, string> params;
+ AclModule* acl = broker->getAcl();
+ if (acl == 0)
+ return true;
+
+ string userId = msg.getUserId();
+ params[acl::PROP_SCHEMAPACKAGE] = packageName;
+ params[acl::PROP_SCHEMACLASS] = className;
+
+ if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params))
+ return true;
+
+ // authorization failed, send reply if replyTo present
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+ const framing::MessageProperties* p =
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ string rte = rt.getExchange();
+ string rtk = rt.getRoutingKey();
+ string cid;
+ if (p && p->hasCorrelationId())
+ cid = p->getCorrelationId();
+
+ if (mapMsg) {
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ Manageable::STATUS_FORBIDDEN, false);
+ } else {
+
+ ResizableBuffer outBuffer(qmfV1BufferSize);
+
+ encodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ sendBuffer(outBuffer, rte, rtk);
+ }
+
+ QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
+{
+ string rte;
+ string rtk;
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+ const framing::MessageProperties* p = transfer ?
+ transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ rte = rt.getExchange();
+ rtk = rt.getRoutingKey();
+ }
+ else
+ return;
+
+ ResizableBuffer inBuffer(qmfV1BufferSize);
+ uint8_t opcode;
+
+ if (transfer->getContentSize() > qmfV1BufferSize) {
+ QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
+ transfer->getContentSize());
+ return;
+ }
+
+ inBuffer.putRawData(transfer->getContent());
+ uint32_t bufferLen = inBuffer.getPosition();
+ inBuffer.reset();
+
+ ScopedManagementContext context(msg.getPublisher());
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+ if (headers && p->getAppId() == "qmf2")
+ {
+ string opcode = headers->getAsString("qmf.opcode");
+ string contentType = headers->getAsString("qmf.content");
+ string body;
+ string cid;
+ inBuffer.getRawData(body, bufferLen);
+ {
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (opcode == "_method_request")
+ return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
+ else if (opcode == "_query_request")
+ return handleGetQuery(body, rte, rtk, cid, context.getUserId(), viaLocal);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequest(body, rte, rtk, cid);
+ }
+ QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ return;
+ }
+
+ // old preV2 binary messages
+ while (inBuffer.getPosition() < bufferLen) {
+ uint32_t sequence;
+ if (!checkHeader(inBuffer, &opcode, &sequence))
+ return;
+
+ if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
+ else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
+ else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId());
+ else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence, context.getMgmtId());
+ else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId());
+ }
+}
+
+ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ pair<PackageMap::iterator, bool> result =
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
+ QPID_LOG (debug, "ManagementAgent added package " << name);
+
+ // Publish a package-indication message
+ ResizableBuffer outBuffer (qmfV1BufferSize);
+
+ encodeHeader (outBuffer, 'p');
+ encodePackageIndication (outBuffer, result.first);
+ sendBuffer(outBuffer, mExchange, "schema.package");
+ QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
+
+ return result.first;
+}
+
+void ManagementAgent::addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ const string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy(&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
+ return;
+
+ // No such class found, create a new class with local information.
+ QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" <<
+ key.name);
+
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
+ cIter = cMap.find(key);
+}
+
+void ManagementAgent::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString((*pIter).first);
+}
+
+void ManagementAgent::encodeClassIndication(Buffer& buf,
+ const std::string packageName,
+ const SchemaClassKey key,
+ uint8_t kind)
+{
+ buf.putOctet(kind);
+ buf.putShortString(packageName);
+ key.encode(buf);
+}
+
+size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
+{
+ if (kind == ManagementItem::CLASS_KIND_TABLE)
+ return validateTableSchema(inBuffer);
+ else if (kind == ManagementItem::CLASS_KIND_EVENT)
+ return validateEventSchema(inBuffer);
+ return 0;
+}
+
+size_t ManagementAgent::validateTableSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ try {
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_TABLE)
+ return 0;
+
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint8_t superType = 0; //inBuffer.getOctet();
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+
+ if (superType == 1) {
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+ }
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getAsInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
+ } catch (exception& /*e*/) {
+ return 0;
+ }
+
+ end = inBuffer.getPosition();
+ inBuffer.setPosition(start); // restore original position
+ return end - start;
+}
+
+size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ try {
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_EVENT)
+ return 0;
+
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint8_t superType = 0; //inBuffer.getOctet();
+
+ uint16_t argCount = inBuffer.getShort();
+
+ if (superType == 1) {
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+ }
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+ } catch (exception& /*e*/) {
+ return 0;
+ }
+
+ end = inBuffer.getPosition();
+ inBuffer.setPosition(start); // restore original position
+ return end - start;
+}
+
+ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
+{
+ ManagementObjectMap::iterator iter = managementObjects.begin();
+ for (; iter != managementObjects.end(); iter++) {
+ if (oid.equalV1(iter->first))
+ break;
+ }
+
+ return iter;
+}
+
+void ManagementAgent::disallow(const string& className, const string& methodName, const string& message) {
+ disallowed[make_pair(className, methodName)] = message;
+}
+
+void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const {
+ _map["_cname"] = name;
+ _map["_hash"] = qpid::types::Uuid(hash);
+}
+
+void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
+
+ if ((i = _map.find("_cname")) != _map.end()) {
+ name = i->second.asString();
+ }
+
+ if ((i = _map.find("_hash")) != _map.end()) {
+ const qpid::types::Uuid& uuid = i->second.asUuid();
+ memcpy(hash, uuid.data(), uuid.size());
+ }
+}
+
+void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const {
+ buffer.checkAvailable(encodedBufSize());
+ buffer.putShortString(name);
+ buffer.putBin128(hash);
+}
+
+void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) {
+ buffer.checkAvailable(encodedBufSize());
+ buffer.getShortString(name);
+ buffer.getBin128(hash);
+}
+
+uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const {
+ return 1 + name.size() + 16 /* bin128 */;
+}
+
+void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const {
+ _map["_type"] = kind;
+ _map["_pending_sequence"] = pendingSequence;
+ _map["_data"] = data;
+}
+
+void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
+
+ if ((i = _map.find("_type")) != _map.end()) {
+ kind = i->second;
+ }
+ if ((i = _map.find("_pending_sequence")) != _map.end()) {
+ pendingSequence = i->second;
+ }
+ if ((i = _map.find("_data")) != _map.end()) {
+ data = i->second.asString();
+ }
+}
+
+void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
+ Variant::Map _objId, _values;
+
+ map_["_brokerBank"] = brokerBank;
+ map_["_agentBank"] = agentBank;
+ map_["_routingKey"] = routingKey;
+
+ connectionRef.mapEncode(_objId);
+ map_["_object_id"] = _objId;
+
+ mgmtObject->mapEncodeValues(_values, true, false);
+ map_["_values"] = _values;
+}
+
+void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
+ Variant::Map::const_iterator i;
+
+ if ((i = map_.find("_brokerBank")) != map_.end()) {
+ brokerBank = i->second;
+ }
+
+ if ((i = map_.find("_agentBank")) != map_.end()) {
+ agentBank = i->second;
+ }
+
+ if ((i = map_.find("_routingKey")) != map_.end()) {
+ routingKey = i->second.getString();
+ }
+
+ if ((i = map_.find("_object_id")) != map_.end()) {
+ connectionRef.mapDecode(i->second.asMap());
+ }
+
+ mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this));
+
+ if ((i = map_.find("_values")) != map_.end()) {
+ mgmtObject->mapDecodeValues(i->second.asMap());
+ }
+
+ // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
+ mgmtObject->set_connectionRef(connectionRef);
+}
+
+namespace {
+bool isDeletedMap(const ManagementObjectMap::value_type& value) {
+ return value.second->isDeleted();
+}
+
+bool isDeletedVector(const ManagementObjectVector::value_type& value) {
+ return value->isDeleted();
+}
+
+string summarizeMap(const char* name, const ManagementObjectMap& map) {
+ ostringstream o;
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap);
+ o << map.size() << " " << name << " (" << deleted << " deleted), ";
+ return o.str();
+}
+
+string summarizeVector(const char* name, const ManagementObjectVector& map) {
+ ostringstream o;
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector);
+ o << map.size() << " " << name << " (" << deleted << " deleted), ";
+ return o.str();
+}
+
+string dumpMap(const ManagementObjectMap& map) {
+ ostringstream o;
+ for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) {
+ o << endl << " " << i->second->getObjectId().getV2Key()
+ << (i->second->isDeleted() ? " (deleted)" : "");
+ }
+ return o.str();
+}
+
+string dumpVector(const ManagementObjectVector& map) {
+ ostringstream o;
+ for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) {
+ o << endl << " " << (*i)->getObjectId().getV2Key()
+ << ((*i)->isDeleted() ? " (deleted)" : "");
+ }
+ return o.str();
+}
+
+} // namespace
+
+string ManagementAgent::summarizeAgents() {
+ ostringstream msg;
+ if (!remoteAgents.empty()) {
+ msg << remoteAgents.size() << " agents(";
+ for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+ i != remoteAgents.end(); ++i)
+ msg << " " << i->second->routingKey;
+ msg << "), ";
+ }
+ return msg.str();
+}
+
+
+void ManagementAgent::debugSnapshot(const char* title) {
+ sys::Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock objLock (objectLock);
+ QPID_LOG(debug, title << ": management snapshot: "
+ << packages.size() << " packages, "
+ << summarizeMap("objects", managementObjects)
+ << summarizeVector("new objects ", newManagementObjects)
+ << pendingDeletedObjs.size() << " pending deletes"
+ << summarizeAgents());
+
+ QPID_LOG_IF(trace, managementObjects.size(),
+ title << ": objects" << dumpMap(managementObjects));
+ QPID_LOG_IF(trace, newManagementObjects.size(),
+ title << ": new objects" << dumpVector(newManagementObjects));
+}
+
+
+Variant::Map ManagementAgent::toMap(const FieldTable& from)
+{
+ Variant::Map map;
+ qpid::amqp_0_10::translate(from, map);
+ return map;
+}
+
+// construct a DeletedObject from a management object.
+ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2)
+ : packageName(src->getPackageName()),
+ className(src->getClassName())
+{
+ bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish()));
+
+ stringstream oid;
+ oid << src->getObjectId();
+ objectId = oid.str();
+
+ if (v1) {
+ src->writeProperties(encodedV1Config);
+ if (send_stats) {
+ src->writeStatistics(encodedV1Inst);
+ }
+ }
+
+ if (v2) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ src->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(),
+ src->getClassName(),
+ "_data",
+ src->getMd5Sum());
+ src->writeTimestamps(map_);
+ src->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ encodedV2 = map_;
+ }
+}
+
+// Remove Deleted objects, and save for later publishing...
+bool ManagementAgent::moveDeletedObjects() {
+ typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList;
+
+ sys::Mutex::ScopedLock lock (objectLock);
+
+ DeleteList deleteList;
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ ++iter)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->isDeleted()) deleteList.push_back(*iter);
+ }
+
+ // Iterate in reverse over deleted object list
+ for (DeleteList::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++)
+ {
+ ManagementObject::shared_ptr delObj = iter->second;
+ assert(delObj->isDeleted());
+ DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support));
+
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
+ managementObjects.erase(iter->first);
+ }
+ return !deleteList.empty();
+}
+
+ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
+ const EventQueue::Batch& batch)
+{
+ EventQueue::Batch::const_iterator i;
+ for (i = batch.begin(); i != batch.end(); ++i) {
+ DeliverableMessage deliverable (i->second, 0);
+ try {
+ i->first->route(deliverable);
+ } catch(exception& e) {
+ QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what());
+ }
+ }
+ return i;
+}
+
+namespace {
+QPID_TSS const Connection* currentPublisher = 0;
+}
+
+void setManagementExecutionContext(const Connection& p)
+{
+ currentPublisher = &p;
+}
+
+void resetManagementExecutionContext()
+{
+ currentPublisher = 0;
+}
+
+const Connection* getCurrentPublisher()
+{
+ return currentPublisher;
+}
+
+}}
+
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
new file mode 100644
index 0000000000..81bf542766
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -0,0 +1,388 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/management/ManagementEvent.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+#include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
+#include "qpid/types/Variant.h"
+#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace broker {
+class Connection;
+class ProtocolRegistry;
+}
+namespace sys {
+class Timer;
+}
+namespace management {
+
+class ManagementAgent
+{
+private:
+
+ int threadPoolSize;
+
+public:
+ typedef enum {
+ SEV_EMERG = 0,
+ SEV_ALERT = 1,
+ SEV_CRIT = 2,
+ SEV_ERROR = 3,
+ SEV_WARN = 4,
+ SEV_NOTE = 5,
+ SEV_INFO = 6,
+ SEV_DEBUG = 7,
+ SEV_DEFAULT = 8
+ } severity_t;
+
+
+ ManagementAgent (const bool qmfV1, const bool qmfV2);
+ virtual ~ManagementAgent ();
+
+ /** Called before plugins are initialized */
+ void configure (const std::string& dataDir, bool publish, uint16_t interval,
+ qpid::broker::Broker* broker, int threadPoolSize);
+
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
+ void getName(std::string& vendor, std::string& product, std::string& instance);
+ const std::string& getAddress();
+
+ void setInterval(uint16_t _interval) { interval = _interval; }
+ void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+ void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+
+ int getMaxThreads () { return threadPoolSize; }
+ QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
+ const std::string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ uint64_t persistId = 0,
+ bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ const std::string& key,
+ bool persistent = false);
+ QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
+ severity_t severity = SEV_DEFAULT);
+ QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
+
+ bool dispatchCommand (qpid::broker::Deliverable& msg,
+ const std::string& routingKey,
+ const framing::FieldTable* args,
+ const bool topic,
+ int qmfVersion);
+
+ /** Disallow a method. Attempts to call it will receive an exception with message. */
+ void disallow(const std::string& className, const std::string& methodName, const std::string& message);
+
+ uint16_t getBootSequence(void) { return bootSequence; }
+ void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
+
+ const framing::Uuid& getUuid() const { return uuid; }
+ void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
+
+ static types::Variant::Map toMap(const framing::FieldTable& from);
+
+ class DeletedObject {
+ public:
+ typedef boost::shared_ptr<DeletedObject> shared_ptr;
+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
+ ~DeletedObject() {};
+ const std::string getKey() const {
+ // used to batch up objects of the same class type
+ return std::string(packageName + std::string(":") + className);
+ }
+
+ private:
+ friend class ManagementAgent;
+
+ std::string packageName;
+ std::string className;
+ std::string objectId;
+
+ std::string encodedV1Config; // qmfv1 properties
+ std::string encodedV1Inst; // qmfv1 statistics
+ qpid::types::Variant::Map encodedV2;
+ };
+
+ typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
+
+private:
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent : public Manageable
+ {
+ ManagementAgent& agent;
+ uint32_t brokerBank;
+ uint32_t agentBank;
+ std::string routingKey;
+ ObjectId connectionRef;
+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+
+ virtual ~RemoteAgent ();
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ };
+
+ typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedBufSize() const;
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+
+ struct SchemaClass
+ {
+ uint8_t kind;
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ std::string data;
+ uint32_t pendingSequence;
+
+ SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
+ kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
+ SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
+ kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
+ void appendSchema (framing::Buffer& buf);
+
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+
+ //
+ // Protected by objectLock
+ //
+ ManagementObjectMap managementObjects;
+
+ //
+ // Protected by addLock
+ //
+ ManagementObjectVector newManagementObjects;
+
+ framing::Uuid uuid;
+
+ //
+ // Lock ordering: userLock -> addLock -> objectLock
+ //
+ sys::Mutex userLock;
+ sys::Mutex addLock;
+ sys::Mutex objectLock;
+
+ qpid::broker::Exchange::shared_ptr mExchange;
+ qpid::broker::Exchange::shared_ptr dExchange;
+ qpid::broker::Exchange::shared_ptr v2Topic;
+ qpid::broker::Exchange::shared_ptr v2Direct;
+ std::string dataDir;
+ bool publish;
+ uint16_t interval;
+ qpid::broker::Broker* broker;
+ qpid::sys::Timer* timer;
+ qpid::broker::ProtocolRegistry* protocols;
+ uint16_t bootSequence;
+ uint32_t nextObjectId;
+ uint32_t brokerBank;
+ uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
+ bool clientWasAdded;
+ const qpid::sys::AbsTime startTime;
+ bool suppressed;
+
+ typedef std::pair<std::string,std::string> MethodName;
+ typedef std::map<MethodName, std::string> DisallowedMethods;
+ DisallowedMethods disallowed;
+ bool disallowAllV1Methods;
+
+ // Agent name and address
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
+ std::string vendorNameKey; // "." --> "_"
+ std::string productNameKey; // "." --> "_"
+ std::string instanceNameKey; // "." --> "_"
+
+ // supported management protocol
+ bool qmf1Support;
+ bool qmf2Support;
+
+ // Maximum # of objects allowed in a single V2 response
+ // message.
+ uint32_t maxReplyObjs;
+
+ // list of objects that have been deleted, but have yet to be published
+ // one final time.
+ // Indexed by a string composed of the object's package and class name.
+ // Protected by objectLock.
+ typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
+ PendingDeletedObjsMap pendingDeletedObjs;
+
+ // Pollable queue to serialize event messages
+ typedef std::pair<boost::shared_ptr<broker::Exchange>,
+ broker::Message> ExchangeAndMessage;
+ typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
+
+ //
+ // Memory statistics object
+ //
+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
+
+ void writeData ();
+ void periodicProcessing (void);
+ void deleteObjectNow(const ObjectId& oid);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
+ void sendBuffer(framing::Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
+ void sendBuffer(framing::Buffer& buf,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void moveNewObjects();
+ bool moveDeletedObjects();
+
+ bool authorizeAgentMessage(qpid::broker::Message& msg);
+ void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
+
+ PackageMap::iterator findOrAddPackageLH(std::string name);
+ void addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void encodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void encodeClassIndication (framing::Buffer& buf,
+ const std::string packageName,
+ const struct SchemaClassKey key,
+ uint8_t kind);
+ bool bankInUse (uint32_t bank);
+ uint32_t allocateNewBank ();
+ uint32_t assignBankLH (uint32_t requestedPrefix);
+ void deleteOrphanedAgentsLH();
+ void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+ uint32_t code = 0, const std::string& text = "OK");
+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
+ void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+
+
+ size_t validateSchema(framing::Buffer&, uint8_t kind);
+ size_t validateTableSchema(framing::Buffer&);
+ size_t validateEventSchema(framing::Buffer&);
+ ManagementObjectMap::iterator numericFind(const ObjectId& oid);
+
+ std::string summarizeAgents();
+ void debugSnapshot(const char* title);
+ std::auto_ptr<EventQueue> sendQueue;
+};
+
+void setManagementExecutionContext(const broker::Connection&);
+void resetManagementExecutionContext();
+const broker::Connection* getCurrentPublisher();
+}}
+
+#endif /*!_ManagementAgent_*/
diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
new file mode 100644
index 0000000000..8ede6940b0
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/log/Statement.h"
+#include <assert.h>
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b),
+ DirectExchange(_name, _parent, b),
+ managementAgent(0) {}
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, false, _args, _parent, b),
+ DirectExchange(_name, _durable, false, _args, _parent, b),
+ managementAgent(0) {}
+
+void ManagementDirectExchange::route(Deliverable& msg)
+{
+ bool routeIt = true;
+
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion);
+
+ if (routeIt)
+ DirectExchange::route(msg);
+}
+
+void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+ managementAgent = agent;
+ qmfVersion = qv;
+ assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange
+}
+
+
+ManagementDirectExchange::~ManagementDirectExchange() {}
+
+const std::string ManagementDirectExchange::typeName("management-direct");
+
diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.h b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
new file mode 100644
index 0000000000..582354d723
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementDirectExchange_
+#define _ManagementDirectExchange_
+
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementDirectExchange : public virtual DirectExchange
+{
+ private:
+ management::ManagementAgent* managementAgent;
+ int qmfVersion;
+
+ public:
+ static const std::string typeName;
+
+ ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementDirectExchange(const std::string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual void route(Deliverable& msg);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
+
+ virtual ~ManagementDirectExchange();
+};
+
+
+}
+}
+
+#endif
diff --git a/qpid/cpp/src/qpid/management/ManagementEvent.h b/qpid/cpp/src/qpid/management/ManagementEvent.h
new file mode 100644
index 0000000000..e80175096f
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementEvent.h
@@ -0,0 +1,53 @@
+#ifndef _ManagementEvent_
+#define _ManagementEvent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementObject.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgent;
+
+class ManagementEvent : public ManagementItem {
+ public:
+ static const uint8_t MD5_LEN = 16;
+ //typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+ typedef void (*writeSchemaCall_t)(std::string&);
+ virtual ~ManagementEvent() {}
+
+ virtual writeSchemaCall_t getWriteSchemaCall(void) = 0;
+ //virtual mapEncodeSchemaCall_t getMapEncodeSchemaCall(void) = 0;
+ virtual std::string& getEventName() const = 0;
+ virtual std::string& getPackageName() const = 0;
+ virtual uint8_t* getMd5Sum() const = 0;
+ virtual uint8_t getSeverity() const = 0;
+ virtual void encode(std::string&) const = 0;
+ virtual void mapEncode(qpid::types::Variant::Map&) const = 0;
+};
+
+}}
+
+#endif /*!_ManagementEvent_*/
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
new file mode 100644
index 0000000000..019963e832
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -0,0 +1,385 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
+
+#include <stdlib.h>
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::management;
+
+void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
+{
+ first =
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (bank & 0x0fffffff));
+}
+
+// Deprecated
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object)
+ : agent(0), agentEpoch(seq)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28;
+ second = object;
+}
+
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker)
+ : agent(0), second(0), agentEpoch(seq)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq)
+ : agent(_agent), second(0), agentEpoch(seq)
+{
+
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48;
+}
+
+
+ObjectId::ObjectId(istream& in) : agent(0)
+{
+ string text;
+ in >> text;
+ fromString(text);
+}
+
+ObjectId::ObjectId(const string& text) : agent(0)
+{
+ fromString(text);
+}
+
+void ObjectId::fromString(const string& text)
+{
+#define FIELDS 5
+#if defined (_WIN32) && !defined (atoll)
+# define atoll(X) _atoi64(X)
+#endif
+
+ // format:
+ // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id>
+ // V2: Not used
+
+ string copy(text.c_str());
+ char* cText;
+ char* field[FIELDS];
+ bool atFieldStart = true;
+ int idx = 0;
+
+ cText = const_cast<char*>(copy.c_str());
+ for (char* cursor = cText; *cursor; cursor++) {
+ if (atFieldStart) {
+ if (idx >= FIELDS)
+ throw Exception("Invalid ObjectId format");
+ field[idx++] = cursor;
+ atFieldStart = false;
+ } else {
+ if (*cursor == '-') {
+ *cursor = '\0';
+ atFieldStart = true;
+ }
+ }
+ }
+
+ if (idx != FIELDS)
+ throw Exception("Invalid ObjectId format");
+
+ agentEpoch = atoll(field[1]);
+
+ first = (atoll(field[0]) << 60) +
+ (atoll(field[1]) << 48) +
+ (atoll(field[2]) << 28);
+
+ agentName = string(field[3]);
+ second = atoll(field[4]);
+}
+
+
+bool ObjectId::operator==(const ObjectId &other) const
+{
+ return v2Key == other.v2Key;
+}
+
+bool ObjectId::operator<(const ObjectId &other) const
+{
+ return v2Key < other.v2Key;
+}
+
+bool ObjectId::equalV1(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+ return first == otherFirst && second == other.second;
+}
+
+// encode as V1-format binary
+void ObjectId::encode(string& buffer) const
+{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ if (agent == 0)
+ body.putLongLong(first);
+ else
+ body.putLongLong(first | agent->first);
+ body.putLongLong(second);
+
+ body.reset();
+ body.getRawData(buffer, len);
+}
+
+// decode as V1-format binary
+void ObjectId::decode(const string& buffer)
+{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ body.checkAvailable(buffer.length());
+ body.putRawData(buffer);
+ body.reset();
+ first = body.getLongLong();
+ second = body.getLongLong();
+ v2Key = boost::lexical_cast<string>(second);
+}
+
+// generate the V2 key from the index fields defined
+// in the schema.
+void ObjectId::setV2Key(const ManagementObject& object)
+{
+ stringstream oname;
+ oname << object.getPackageName() << ":" << object.getClassName() << ":" << object.getKey();
+ v2Key = oname.str();
+}
+
+
+// encode as V2-format map
+void ObjectId::mapEncode(types::Variant::Map& map) const
+{
+ map["_object_name"] = v2Key;
+ if (!agentName.empty())
+ map["_agent_name"] = agentName;
+ if (agentEpoch)
+ map["_agent_epoch"] = agentEpoch;
+}
+
+// decode as v2-format map
+void ObjectId::mapDecode(const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_object_name")) != map.end())
+ v2Key = i->second.asString();
+ else
+ throw Exception("Required _object_name field missing.");
+
+ if ((i = map.find("_agent_name")) != map.end())
+ agentName = i->second.asString();
+
+ if ((i = map.find("_agent_epoch")) != map.end())
+ agentEpoch = i->second.asInt64();
+}
+
+
+ObjectId::operator types::Variant::Map() const
+{
+ types::Variant::Map m;
+ mapEncode(m);
+ return m;
+}
+
+
+
+namespace qpid {
+namespace management {
+
+ostream& operator<<(ostream& out, const ObjectId& i)
+{
+ uint64_t virtFirst = i.first;
+ if (i.agent)
+ virtFirst |= i.agent->getFirst();
+
+ out << ((virtFirst & 0xF000000000000000LL) >> 60) <<
+ "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) <<
+ "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) <<
+ "-" << i.agentName <<
+ "-" << i.second <<
+ "(" << i.v2Key << ")";
+ return out;
+}
+
+}}
+
+ManagementObject::ManagementObject(Manageable* _core) :
+ createTime(qpid::sys::Duration::FromEpoch()),
+ destroyTime(0), updateTime(createTime), configChanged(true),
+ instChanged(true), deleted(false),
+ coreObject(_core), flags(0), forcePublish(false) {}
+
+void ManagementObject::setUpdateTime()
+{
+ updateTime = sys::Duration::FromEpoch();
+}
+
+void ManagementObject::resourceDestroy()
+{
+ QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
+ destroyTime = sys::Duration::FromEpoch();
+ deleted = true;
+}
+
+int ManagementObject::maxThreads = 1;
+int ManagementObject::nextThreadIndex = 0;
+
+void ManagementObject::writeTimestamps (string& buf) const
+{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+
+ body.putShortString (getPackageName ());
+ body.putShortString (getClassName ());
+ body.putBin128 (getMd5Sum ());
+ body.putLongLong (updateTime);
+ body.putLongLong (createTime);
+ body.putLongLong (destroyTime);
+
+ uint32_t len = body.getPosition();
+ body.reset();
+ body.getRawData(buf, len);
+
+ string oid;
+ objectId.encode(oid);
+ buf += oid;
+}
+
+void ManagementObject::readTimestamps (const string& buf)
+{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+ string unused;
+ uint8_t unusedUuid[16];
+
+ body.checkAvailable(buf.length());
+ body.putRawData(buf);
+ body.reset();
+
+ body.getShortString(unused);
+ body.getShortString(unused);
+ body.getBin128(unusedUuid);
+ updateTime = body.getLongLong();
+ createTime = body.getLongLong();
+ destroyTime = body.getLongLong();
+}
+
+uint32_t ManagementObject::writeTimestampsSize() const
+{
+ return 1 + getPackageName().length() + // str8
+ 1 + getClassName().length() + // str8
+ 16 + // bin128
+ 8 + // uint64
+ 8 + // uint64
+ 8 + // uint64
+ objectId.encodedSize(); // objectId
+}
+
+
+void ManagementObject::writeTimestamps (types::Variant::Map& map) const
+{
+ // types::Variant::Map oid, sid;
+
+ // sid["_package_name"] = getPackageName();
+ // sid["_class_name"] = getClassName();
+ // sid["_hash"] = qpid::types::Uuid(getMd5Sum());
+ // map["_schema_id"] = sid;
+
+ // objectId.mapEncode(oid);
+ // map["_object_id"] = oid;
+
+ map["_update_ts"] = updateTime;
+ map["_create_ts"] = createTime;
+ map["_delete_ts"] = destroyTime;
+}
+
+void ManagementObject::readTimestamps (const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_update_ts")) != map.end())
+ updateTime = i->second.asUint64();
+ if ((i = map.find("_create_ts")) != map.end())
+ createTime = i->second.asUint64();
+ if ((i = map.find("_delete_ts")) != map.end())
+ destroyTime = i->second.asUint64();
+}
+
+
+void ManagementObject::setReference(ObjectId) {}
+
+int ManagementObject::getThreadIndex() {
+ static QPID_TSS int thisIndex = -1;
+ if (thisIndex == -1) {
+ Mutex::ScopedLock mutex(accessLock);
+ thisIndex = nextThreadIndex;
+ if (nextThreadIndex < maxThreads - 1)
+ nextThreadIndex++;
+ }
+ return thisIndex;
+}
+
+
+// void ManagementObject::mapEncode(types::Variant::Map& map,
+// bool includeProperties,
+// bool includeStatistics)
+// {
+// types::Variant::Map values;
+
+// writeTimestamps(map);
+
+// mapEncodeValues(values, includeProperties, includeStatistics);
+// map["_values"] = values;
+// }
+
+// void ManagementObject::mapDecode(const types::Variant::Map& map)
+// {
+// types::Variant::Map::const_iterator i;
+
+// readTimestamps(map);
+
+// if ((i = map.find("_values")) != map.end())
+// mapDecodeValues(i->second.asMap());
+// }
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
new file mode 100644
index 0000000000..93fbec7bc7
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -0,0 +1,246 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/CommonImportExport.h"
+
+#include "qpid/management/Mutex.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <vector>
+
+#ifdef _IN_QPID_BROKER
+#include <boost/shared_ptr.hpp>
+#endif
+
+namespace qpid {
+namespace management {
+
+class Manageable;
+class ObjectId;
+class ManagementObject;
+
+
+class AgentAttachment {
+ friend class ObjectId;
+private:
+ uint64_t first;
+public:
+ AgentAttachment() : first(0) {}
+ QPID_COMMON_EXTERN void setBanks(uint32_t broker, uint32_t bank);
+ uint64_t getFirst() const { return first; }
+};
+
+
+class ObjectId {
+protected:
+ const AgentAttachment* agent;
+ uint64_t first;
+ uint64_t second;
+ uint64_t agentEpoch;
+ std::string v2Key;
+ std::string agentName;
+ void fromString(const std::string&);
+public:
+ QPID_COMMON_INLINE_EXTERN ObjectId() : agent(0), first(0), second(0), agentEpoch(0) {}
+ QPID_COMMON_INLINE_EXTERN ObjectId(const types::Variant& map) :
+ agent(0), first(0), second(0), agentEpoch(0) { mapDecode(map.asMap()); }
+ QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker);
+ QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq);
+ QPID_COMMON_EXTERN ObjectId(std::istream&);
+ QPID_COMMON_EXTERN ObjectId(const std::string&);
+ QPID_COMMON_INLINE_EXTERN ObjectId(const std::string& agentAddress, const std::string& key,
+ uint64_t epoch=0) : agent(0), first(0), second(0),
+ agentEpoch(epoch), v2Key(key), agentName(agentAddress) {}
+
+ // Deprecated:
+ QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object);
+ QPID_COMMON_EXTERN bool operator==(const ObjectId &other) const;
+ QPID_COMMON_EXTERN bool operator<(const ObjectId &other) const;
+ QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map) const;
+ QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map);
+ QPID_COMMON_EXTERN operator types::Variant::Map() const;
+ QPID_COMMON_INLINE_EXTERN uint32_t encodedSize() const { return 16; };
+ QPID_COMMON_EXTERN void encode(std::string& buffer) const;
+ QPID_COMMON_EXTERN void decode(const std::string& buffer);
+ QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
+ QPID_COMMON_INLINE_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
+ QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
+ QPID_COMMON_INLINE_EXTERN void setAgentName(const std::string& _name) { agentName = _name; }
+ QPID_COMMON_INLINE_EXTERN const std::string& getAgentName() const { return agentName; }
+ QPID_COMMON_INLINE_EXTERN const std::string& getV2Key() const { return v2Key; }
+ friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&);
+};
+
+class ManagementItem {
+public:
+ static const uint8_t TYPE_U8 = 1;
+ static const uint8_t TYPE_U16 = 2;
+ static const uint8_t TYPE_U32 = 3;
+ static const uint8_t TYPE_U64 = 4;
+ static const uint8_t TYPE_SSTR = 6;
+ static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_ABSTIME = 8;
+ static const uint8_t TYPE_DELTATIME = 9;
+ static const uint8_t TYPE_REF = 10;
+ static const uint8_t TYPE_BOOL = 11;
+ static const uint8_t TYPE_FLOAT = 12;
+ static const uint8_t TYPE_DOUBLE = 13;
+ static const uint8_t TYPE_UUID = 14;
+ static const uint8_t TYPE_FTABLE = 15;
+ static const uint8_t TYPE_S8 = 16;
+ static const uint8_t TYPE_S16 = 17;
+ static const uint8_t TYPE_S32 = 18;
+ static const uint8_t TYPE_S64 = 19;
+ static const uint8_t TYPE_LIST = 21;
+
+ static const uint8_t ACCESS_RC = 1;
+ static const uint8_t ACCESS_RW = 2;
+ static const uint8_t ACCESS_RO = 3;
+
+ static const uint8_t DIR_I = 1;
+ static const uint8_t DIR_O = 2;
+ static const uint8_t DIR_IO = 3;
+
+ static const uint8_t FLAG_CONFIG = 0x01;
+ static const uint8_t FLAG_INDEX = 0x02;
+ static const uint8_t FLAG_END = 0x80;
+
+ const static uint8_t CLASS_KIND_TABLE = 1;
+ const static uint8_t CLASS_KIND_EVENT = 2;
+
+
+
+public:
+ virtual ~ManagementItem() {}
+};
+
+class QPID_COMMON_CLASS_EXTERN ManagementObject : public ManagementItem
+{
+protected:
+
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t updateTime;
+ ObjectId objectId;
+ mutable bool configChanged;
+ mutable bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+ mutable Mutex accessLock;
+ uint32_t flags;
+
+ static int nextThreadIndex;
+ bool forcePublish;
+
+ QPID_COMMON_EXTERN int getThreadIndex();
+ QPID_COMMON_EXTERN void writeTimestamps(std::string& buf) const;
+ QPID_COMMON_EXTERN void readTimestamps(const std::string& buf);
+ QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const;
+
+ public:
+#ifdef _IN_QPID_BROKER
+ typedef boost::shared_ptr<ManagementObject> shared_ptr;
+#endif
+
+ QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16;
+ QPID_COMMON_EXTERN static int maxThreads;
+ //typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
+ typedef void (*writeSchemaCall_t) (std::string&);
+
+ QPID_COMMON_EXTERN ManagementObject(Manageable* _core);
+ virtual ~ManagementObject() {}
+
+ virtual writeSchemaCall_t getWriteSchemaCall() = 0;
+ virtual std::string getKey() const = 0;
+
+ // Encode & Decode the property and statistics values
+ // for this object.
+ virtual void mapEncodeValues(types::Variant::Map& map,
+ bool includeProperties,
+ bool includeStatistics) = 0;
+ virtual void mapDecodeValues(const types::Variant::Map& map) = 0;
+ virtual void doMethod(std::string& methodName,
+ const types::Variant::Map& inMap,
+ types::Variant::Map& outMap,
+ const std::string& userId) = 0;
+ QPID_COMMON_EXTERN void writeTimestamps(types::Variant::Map& map) const;
+ QPID_COMMON_EXTERN void readTimestamps(const types::Variant::Map& buf);
+
+ /**
+ * The following five methods are not pure-virtual because they will only
+ * be overridden in cases where QMFv1 is to be supported.
+ */
+ virtual uint32_t writePropertiesSize() const { return 0; }
+ virtual void readProperties(const std::string&) {}
+ virtual void writeProperties(std::string&) const {}
+ virtual void writeStatistics(std::string&, bool = false) {}
+ virtual void doMethod(std::string&, const std::string&, std::string&, const std::string&) {}
+
+ QPID_COMMON_EXTERN virtual void setReference(ObjectId objectId);
+
+ virtual std::string& getClassName() const = 0;
+ virtual std::string& getPackageName() const = 0;
+ virtual uint8_t* getMd5Sum() const = 0;
+
+ void setObjectId(ObjectId oid) { objectId = oid; }
+ ObjectId getObjectId() { return objectId; }
+ inline bool getConfigChanged() { return configChanged; }
+ virtual bool getInstChanged() { return instChanged; }
+ virtual bool hasInst() { return true; }
+ inline void setForcePublish(bool f) { forcePublish = f; }
+ inline bool getForcePublish() { return forcePublish; }
+ QPID_COMMON_EXTERN void setUpdateTime();
+ QPID_COMMON_EXTERN void resourceDestroy();
+ inline bool isDeleted() { return deleted; }
+ inline void setFlags(uint32_t f) { flags = f; }
+ inline uint32_t getFlags() { return flags; }
+ bool isSameClass(ManagementObject& other) {
+ for (int idx = 0; idx < MD5_LEN; idx++)
+ if (other.getMd5Sum()[idx] != getMd5Sum()[idx])
+ return false;
+ return other.getClassName() == getClassName() &&
+ other.getPackageName() == getPackageName();
+ }
+
+ // QPID_COMMON_EXTERN void encode(qpid::framing::Buffer& buf) const { writeProperties(buf); }
+ // QPID_COMMON_EXTERN void decode(qpid::framing::Buffer& buf) { readProperties(buf); }
+ //QPID_COMMON_EXTERN uint32_t encodedSize() const { return writePropertiesSize(); }
+
+ // Encode/Decode the entire object as a map
+ //QPID_COMMON_EXTERN void mapEncode(types::Variant::Map& map,
+ //bool includeProperties=true,
+ //bool includeStatistics=true);
+
+ //QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map);
+};
+
+#ifdef _IN_QPID_BROKER
+typedef std::map<ObjectId, ManagementObject::shared_ptr> ManagementObjectMap;
+typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
+#endif
+
+}}
+
+
+
+#endif /*!_ManagementObject_*/
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
new file mode 100644
index 0000000000..0241d5a404
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementTopicExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b),
+ TopicExchange(_name, _parent, b),
+ managementAgent(0) {}
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
+ bool _durable,
+ const FieldTable& _args,
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, false, _args, _parent, b),
+ TopicExchange(_name, _durable, false, _args, _parent, b),
+ managementAgent(0) {}
+
+void ManagementTopicExchange::route(Deliverable& msg)
+{
+ bool routeIt = true;
+
+ // Intercept management agent commands
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion);
+
+ if (routeIt)
+ TopicExchange::route(msg);
+}
+
+bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args)
+{
+ if (qmfVersion == 1)
+ managementAgent->clientAdded(routingKey);
+ return TopicExchange::bind(queue, routingKey, args);
+}
+
+void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+ managementAgent = agent;
+ qmfVersion = qv;
+}
+
+
+ManagementTopicExchange::~ManagementTopicExchange() {}
+
+const std::string ManagementTopicExchange::typeName("management-topic");
+
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
new file mode 100644
index 0000000000..f5192a0936
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementTopicExchange_
+#define _ManagementTopicExchange_
+
+#include "qpid/broker/TopicExchange.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementTopicExchange : public virtual TopicExchange
+{
+ private:
+ management::ManagementAgent* managementAgent;
+ int qmfVersion;
+
+ public:
+ static const std::string typeName;
+
+ ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const std::string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args,
+ Manageable* _parent = 0, Broker* broker = 0);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual void route(Deliverable& msg);
+
+ virtual bool bind(Queue::shared_ptr queue,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
+
+ virtual ~ManagementTopicExchange();
+};
+
+
+}
+}
+
+#endif
diff --git a/qpid/cpp/src/qpid/management/Mutex.cpp b/qpid/cpp/src/qpid/management/Mutex.cpp
new file mode 100644
index 0000000000..f05abb01dc
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Mutex.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/management/Mutex.h"
+#include "qpid/sys/Mutex.h"
+
+using namespace std;
+using namespace qpid::management;
+
+Mutex::Mutex() : impl(new sys::Mutex()) {}
+Mutex::~Mutex() { delete impl; }
+void Mutex::lock() { impl->lock(); }
+void Mutex::unlock() { impl->unlock(); }
+
diff --git a/qpid/cpp/src/qpid/management/Mutex.h b/qpid/cpp/src/qpid/management/Mutex.h
new file mode 100644
index 0000000000..67ae04bae9
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/Mutex.h
@@ -0,0 +1,67 @@
+#ifndef _management_Mutex_h
+#define _management_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/CommonImportExport.h"
+
+namespace qpid {
+ namespace sys {
+ class Mutex;
+ }
+
+ namespace management {
+
+ /**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+ template <class L> class ScopedLockTemplate {
+ public:
+ ScopedLockTemplate(L& l) : mutex(l) { mutex.lock(); }
+ ~ScopedLockTemplate() { mutex.unlock(); }
+ private:
+ L& mutex;
+ };
+
+ template <class L> class ScopedUnlockTemplate {
+ public:
+ ScopedUnlockTemplate(L& l) : mutex(l) { mutex.unlock(); }
+ ~ScopedUnlockTemplate() { mutex.lock(); }
+ private:
+ L& mutex;
+ };
+
+ class Mutex {
+ public:
+ typedef ScopedLockTemplate<Mutex> ScopedLock;
+ typedef ScopedUnlockTemplate<Mutex> ScopedUnlock;
+
+ QPID_COMMON_EXTERN Mutex();
+ QPID_COMMON_EXTERN ~Mutex();
+ QPID_COMMON_EXTERN void lock();
+ QPID_COMMON_EXTERN void unlock();
+ private:
+ sys::Mutex* impl;
+ };
+ }
+}
+
+#endif
+