summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp19
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp11
-rw-r--r--cpp/src/qpid/broker/ManagementAgent.cpp164
-rw-r--r--cpp/src/qpid/broker/ManagementAgent.h12
-rw-r--r--cpp/src/qpid/broker/ManagementExchange.cpp77
-rw-r--r--cpp/src/qpid/broker/ManagementExchange.h61
-rw-r--r--cpp/src/qpid/broker/ManagementObject.cpp24
-rw-r--r--cpp/src/qpid/broker/ManagementObject.h65
-rw-r--r--cpp/src/qpid/broker/ManagementObjectQueue.cpp26
-rw-r--r--cpp/src/qpid/broker/ManagementObjectQueue.h99
-rw-r--r--cpp/src/qpid/broker/Queue.cpp7
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp48
13 files changed, 413 insertions, 202 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 02921fbd08..57b3d27b71 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -163,6 +163,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
qpid/broker/ManagementAgent.cpp \
+ qpid/broker/ManagementExchange.cpp \
qpid/broker/ManagementObject.cpp \
qpid/broker/ManagementObjectQueue.cpp \
qpid/broker/Message.cpp \
@@ -260,6 +261,7 @@ nobase_include_HEADERS = \
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
qpid/broker/ManagementAgent.h \
+ qpid/broker/ManagementExchange.h \
qpid/broker/ManagementObject.h \
qpid/broker/ManagementObjectQueue.h \
qpid/broker/Message.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 44aeb482de..e53774740a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -28,6 +28,7 @@
#include "NullMessageStore.h"
#include "RecoveryManagerImpl.h"
#include "TopicExchange.h"
+#include "ManagementExchange.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
@@ -104,8 +105,8 @@ Broker::Broker(const Broker::Options& conf) :
dtxManager(store.get())
{
if(conf.enableMgmt){
- managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
- queues.setManagementAgent(managementAgent);
+ managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
+ queues.setManagementAgent(managementAgent);
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -115,16 +116,18 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- QPID_LOG(info, "Management enabled");
- exchanges.declare(qpid_management, TopicExchange::typeName);
- managementAgent->setExchange (exchanges.get (qpid_management));
+ QPID_LOG(info, "Management enabled");
+ exchanges.declare(qpid_management, ManagementExchange::typeName);
+ Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
+ managementAgent->setExchange (mExchange);
+ dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent);
}
else
- QPID_LOG(info, "Management not enabled");
+ QPID_LOG(info, "Management not enabled");
if(store.get()) {
- store->init(conf.storeDir, conf.storeAsync);
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ store->init(conf.storeDir, conf.storeAsync);
+ RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index edc9a5b63b..ae1afe5abb 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -23,6 +23,7 @@
#include "FanOutExchange.h"
#include "HeadersExchange.h"
#include "TopicExchange.h"
+#include "ManagementExchange.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -41,7 +42,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
- Exchange::shared_ptr exchange;
+ Exchange::shared_ptr exchange;
if(type == TopicExchange::typeName){
exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args));
@@ -51,13 +52,15 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args));
}else if (type == HeadersExchange::typeName) {
exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args));
+ }else if (type == ManagementExchange::typeName) {
+ exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args));
}else{
throw UnknownExchangeTypeException();
}
- exchanges[name] = exchange;
- return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ exchanges[name] = exchange;
+ return std::pair<Exchange::shared_ptr, bool>(exchange, true);
} else {
- return std::pair<Exchange::shared_ptr, bool>(i->second, false);
+ return std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
}
diff --git a/cpp/src/qpid/broker/ManagementAgent.cpp b/cpp/src/qpid/broker/ManagementAgent.cpp
index f2ebe991b5..71b027c7df 100644
--- a/cpp/src/qpid/broker/ManagementAgent.cpp
+++ b/cpp/src/qpid/broker/ManagementAgent.cpp
@@ -35,9 +35,9 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
}
-void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr)
+void ManagementAgent::setExchange (Exchange::shared_ptr _exchange)
{
- exchange = exchangePtr;
+ exchange = _exchange;
}
void ManagementAgent::addObject (ManagementObject::shared_ptr object)
@@ -46,12 +46,6 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object)
QPID_LOG(info, "Management Object Added");
}
-void ManagementAgent::deleteObject (ManagementObject::shared_ptr object)
-{
- managementObjects.remove (object);
- QPID_LOG (debug, "Management Object Removed");
-}
-
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
: TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
@@ -61,6 +55,18 @@ void ManagementAgent::Periodic::fire ()
agent.PeriodicProcessing ();
}
+void ManagementAgent::clientAdded (void)
+{
+ for (ManagementObjectList::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = *iter;
+ object->setAllChanged ();
+ object->setSchemaNeeded ();
+ }
+}
+
void ManagementAgent::PeriodicProcessing (void)
{
#define BUFSIZE 65536
@@ -69,10 +75,9 @@ void ManagementAgent::PeriodicProcessing (void)
Buffer msgBuffer (msgChars, BUFSIZE);
uint32_t contentSize;
- //QPID_LOG (debug, "Timer Fired");
if (managementObjects.empty ())
- return;
-
+ return;
+
Message::shared_ptr msg (new Message ());
// Build the magic number for the management message.
@@ -82,74 +87,75 @@ void ManagementAgent::PeriodicProcessing (void)
msgBuffer.putOctet ('1');
for (ManagementObjectList::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
+ iter != managementObjects.end ();
+ iter++)
{
- ManagementObject::shared_ptr objectPtr = *iter;
-
- //QPID_LOG (debug, " Object Found...");
-
- if (objectPtr->getSchemaNeeded ())
- {
- //QPID_LOG (debug, " Generating Schema");
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('S'); // opcode = Schema Record
- msgBuffer.putOctet (0); // content-class = N/A
- msgBuffer.putShort (objectPtr->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- objectPtr->writeSchema (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- if (objectPtr->getConfigChanged ())
- {
- //QPID_LOG (debug, " Generating Config");
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('C'); // content-class = Configuration
- msgBuffer.putShort (objectPtr->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- objectPtr->writeConfig (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- if (objectPtr->getInstChanged ())
- {
- //QPID_LOG (debug, " Generating Instrumentation");
- uint32_t startAvail = msgBuffer.available ();
- uint32_t recordLength;
-
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('I'); // content-class = Instrumentation
- msgBuffer.putShort (objectPtr->getObjectType ());
- msgBuffer.record (); // Record the position of the length field
- msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
-
- objectPtr->writeInstrumentation (msgBuffer);
- recordLength = startAvail - msgBuffer.available ();
- msgBuffer.restore (true); // Restore pointer to length field
- msgBuffer.putLong (recordLength);
- msgBuffer.restore (); // Re-restore to get to the end of the buffer
- }
-
- // Temporary protection against buffer overrun.
- // This needs to be replaced with frame fragmentation.
- if (msgBuffer.available () < THRESHOLD)
- break;
+ ManagementObject::shared_ptr object = *iter;
+
+ if (object->getSchemaNeeded ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('S'); // opcode = Schema Record
+ msgBuffer.putOctet (0); // content-class = N/A
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeSchema (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->getConfigChanged ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('C'); // content-class = Configuration
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeConfig (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->getInstChanged ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeInstrumentation (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->isDeleted ())
+ {
+ managementObjects.remove (object);
+ QPID_LOG (debug, "Management Object Removed");
+ }
+
+ // Temporary protection against buffer overrun.
+ // This needs to be replaced with frame fragmentation.
+ if (msgBuffer.available () < THRESHOLD)
+ break;
}
msgBuffer.putOctet ('X'); // End-of-message
@@ -161,7 +167,7 @@ void ManagementAgent::PeriodicProcessing (void)
msgBuffer.reset ();
AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
- 0, "qpid.management", 0, 0));
+ 0, "qpid.management", 0, 0));
AMQFrame header (0, AMQHeaderBody());
AMQFrame content;
diff --git a/cpp/src/qpid/broker/ManagementAgent.h b/cpp/src/qpid/broker/ManagementAgent.h
index 4f3b0a0f5f..1e332023a6 100644
--- a/cpp/src/qpid/broker/ManagementAgent.h
+++ b/cpp/src/qpid/broker/ManagementAgent.h
@@ -38,21 +38,21 @@ class ManagementAgent
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
- ManagementAgent(uint16_t interval);
+ ManagementAgent (uint16_t interval);
- void setExchange (Exchange::shared_ptr exchangePtr);
+ void setExchange (Exchange::shared_ptr exchange);
void addObject (ManagementObject::shared_ptr object);
- void deleteObject (ManagementObject::shared_ptr object);
+ void clientAdded (void);
private:
struct Periodic : public TimerTask
{
ManagementAgent& agent;
-
+
Periodic (ManagementAgent& agent, uint32_t seconds);
- ~Periodic () {}
- void fire ();
+ ~Periodic () {}
+ void fire ();
};
ManagementObjectList managementObjects;
diff --git a/cpp/src/qpid/broker/ManagementExchange.cpp b/cpp/src/qpid/broker/ManagementExchange.cpp
new file mode 100644
index 0000000000..d3de8bc8e1
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementExchange.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementExchange::ManagementExchange (const string& _name) :
+ Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const std::string& _name,
+ bool _durable,
+ const FieldTable& _args) :
+ Exchange (_name, _durable, _args),
+ TopicExchange(_name, _durable, _args) {}
+
+
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ bool result = TopicExchange::bind (queue, routingKey, args);
+
+ // Notify the management agent that a new management client has bound to the
+ // exchange.
+ if (result)
+ managementAgent->clientAdded ();
+
+ return result;
+}
+
+void ManagementExchange::route (Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ // Intercept management commands
+ if (routingKey.length () > 7 &&
+ routingKey.substr (0, 7).compare ("method.") == 0)
+ {
+ QPID_LOG (debug, "ManagementExchange: Intercept command " << routingKey);
+ // TODO: Send intercepted commands to ManagementAgent for dispatch
+ return;
+ }
+
+ TopicExchange::route (msg, routingKey, args);
+}
+
+void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+{
+ managementAgent = agent;
+}
+
+
+ManagementExchange::~ManagementExchange() {}
+
+const std::string ManagementExchange::typeName("management");
+
diff --git a/cpp/src/qpid/broker/ManagementExchange.h b/cpp/src/qpid/broker/ManagementExchange.h
new file mode 100644
index 0000000000..56c051a7f8
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementExchange.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementExchange_
+#define _ManagementExchange_
+
+#include "TopicExchange.h"
+#include "ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementExchange : public virtual TopicExchange
+{
+ private:
+ ManagementAgent::shared_ptr managementAgent;
+
+ public:
+ static const std::string typeName;
+
+ ManagementExchange (const string& name);
+ ManagementExchange (const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual bool bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ virtual void route (Deliverable& msg,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent (ManagementAgent::shared_ptr agent);
+
+ virtual ~ManagementExchange();
+};
+
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/broker/ManagementObject.cpp b/cpp/src/qpid/broker/ManagementObject.cpp
index 75913ba3ab..c536d96b1b 100644
--- a/cpp/src/qpid/broker/ManagementObject.cpp
+++ b/cpp/src/qpid/broker/ManagementObject.cpp
@@ -23,14 +23,19 @@
using namespace qpid::framing;
using namespace qpid::broker;
+using namespace qpid::sys;
void ManagementObject::schemaItem (Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig)
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig,
+ bool isIndex)
{
- buf.putOctet (isConfig ? 1 : 0);
+ uint8_t flags =
+ (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
+
+ buf.putOctet (flags);
buf.putOctet (typeCode);
buf.putShortString (name);
buf.putShortString (description);
@@ -38,5 +43,12 @@ void ManagementObject::schemaItem (Buffer& buf,
void ManagementObject::schemaListEnd (Buffer& buf)
{
- buf.putOctet (0xFF);
+ buf.putOctet (FLAG_END);
+}
+
+void ManagementObject::writeTimestamps (Buffer& buf)
+{
+ buf.putLongLong (uint64_t (Duration (now ())));
+ buf.putLongLong (createTime);
+ buf.putLongLong (destroyTime);
}
diff --git a/cpp/src/qpid/broker/ManagementObject.h b/cpp/src/qpid/broker/ManagementObject.h
index 1588aed641..243d853727 100644
--- a/cpp/src/qpid/broker/ManagementObject.h
+++ b/cpp/src/qpid/broker/ManagementObject.h
@@ -31,24 +31,30 @@ namespace qpid {
namespace broker {
using namespace qpid::framing;
+using namespace qpid::sys;
+
+const uint16_t OBJECT_SYSTEM = 1;
+const uint16_t OBJECT_BROKER = 2;
+const uint16_t OBJECT_VHOST = 3;
+const uint16_t OBJECT_QUEUE = 4;
+const uint16_t OBJECT_EXCHANGE = 5;
+const uint16_t OBJECT_BINDING = 6;
+const uint16_t OBJECT_CLIENT = 7;
+const uint16_t OBJECT_SESSION = 8;
+const uint16_t OBJECT_DESTINATION = 9;
+const uint16_t OBJECT_PRODUCER = 10;
+const uint16_t OBJECT_CONSUMER = 11;
-const uint16_t OBJECT_BROKER = 1;
-const uint16_t OBJECT_SERVER = 2;
-const uint16_t OBJECT_QUEUE = 3;
-const uint16_t OBJECT_EXCHANGE = 4;
-const uint16_t OBJECT_BINDING = 5;
class ManagementObject
{
- private:
-
- qpid::sys::AbsTime createTime;
- qpid::sys::AbsTime destroyTime;
-
protected:
- bool configChanged;
- bool instChanged;
+ uint64_t createTime;
+ uint64_t destroyTime;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
static const uint8_t TYPE_UINT8 = 1;
static const uint8_t TYPE_UINT16 = 2;
@@ -56,18 +62,26 @@ class ManagementObject
static const uint8_t TYPE_UINT64 = 4;
static const uint8_t TYPE_BOOL = 5;
static const uint8_t TYPE_STRING = 6;
+
+ static const uint8_t FLAG_CONFIG = 0x01;
+ static const uint8_t FLAG_INDEX = 0x02;
+ static const uint8_t FLAG_END = 0x80;
void schemaItem (Buffer& buf,
- uint8_t typeCode,
- std::string name,
- std::string description,
- bool isConfig = false);
- void schemaListEnd (Buffer & buf);
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig = false,
+ bool isIndex = false);
+ void schemaListEnd (Buffer& buf);
+ void writeTimestamps (Buffer& buf);
public:
typedef boost::shared_ptr<ManagementObject> shared_ptr;
- ManagementObject () : configChanged(true), instChanged(true) { createTime = qpid::sys::now (); }
+ ManagementObject () : destroyTime(0), configChanged(true),
+ instChanged(true), deleted(false)
+ { createTime = uint64_t (Duration (now ())); }
virtual ~ManagementObject () {}
virtual uint16_t getObjectType (void) = 0;
@@ -76,10 +90,21 @@ class ManagementObject
virtual void writeConfig (Buffer& buf) = 0;
virtual void writeInstrumentation (Buffer& buf) = 0;
virtual bool getSchemaNeeded (void) = 0;
-
+ virtual void setSchemaNeeded (void) = 0;
+
inline bool getConfigChanged (void) { return configChanged; }
inline bool getInstChanged (void) { return instChanged; }
- inline void resourceDestroy (void) { destroyTime = qpid::sys::now (); }
+ inline void setAllChanged (void)
+ {
+ configChanged = true;
+ instChanged = true;
+ }
+
+ inline void resourceDestroy (void) {
+ destroyTime = uint64_t (Duration (now ()));
+ deleted = true;
+ }
+ bool isDeleted (void) { return deleted; }
};
diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/ManagementObjectQueue.cpp
index b81cd7b60d..d30cda03a4 100644
--- a/cpp/src/qpid/broker/ManagementObjectQueue.cpp
+++ b/cpp/src/qpid/broker/ManagementObjectQueue.cpp
@@ -76,7 +76,9 @@ ManagementObjectQueue::~ManagementObjectQueue () {}
void ManagementObjectQueue::writeSchema (Buffer& buf)
{
- schemaItem (buf, TYPE_STRING, "name", "Queue Name", true);
+ schemaNeeded = false;
+
+ schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true);
schemaItem (buf, TYPE_BOOL, "durable", "Durable", true);
schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true);
@@ -115,21 +117,24 @@ void ManagementObjectQueue::writeSchema (Buffer& buf)
schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval");
schemaListEnd (buf);
-
- schemaNeeded = false;
}
void ManagementObjectQueue::writeConfig (Buffer& buf)
{
+ configChanged = false;
+
+ writeTimestamps (buf);
buf.putShortString (name);
buf.putOctet (durable ? 1 : 0);
buf.putOctet (autoDelete ? 1 : 0);
-
- configChanged = false;
}
void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
{
+ instChanged = false;
+
+ writeTimestamps (buf);
+ buf.putShortString (name);
buf.putLongLong (msgTotalEnqueues);
buf.putLongLong (msgTotalDequeues);
buf.putLongLong (msgTxEnqueues);
@@ -164,5 +169,14 @@ void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
buf.putLong (consumersLow);
buf.putLong (consumersHigh);
- instChanged = false;
+ msgDepthLow = msgDepth;
+ msgDepthHigh = msgDepth;
+ byteDepthLow = byteDepth;
+ byteDepthHigh = byteDepth;
+ enqueueTxCountLow = enqueueTxCount;
+ enqueueTxCountHigh = enqueueTxCount;
+ dequeueTxCountLow = dequeueTxCount;
+ dequeueTxCountHigh = dequeueTxCount;
+ consumersLow = consumers;
+ consumersHigh = consumers;
}
diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h
index 989d10f8c0..cb2d399b76 100644
--- a/cpp/src/qpid/broker/ManagementObjectQueue.h
+++ b/cpp/src/qpid/broker/ManagementObjectQueue.h
@@ -89,28 +89,29 @@ class ManagementObjectQueue : public ManagementObject
void writeConfig (Buffer& buf);
void writeInstrumentation (Buffer& buf);
bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
inline void adjustQueueHiLo (void){
- if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
- if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
+ if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+ if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
- if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
- if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
- instChanged = true;
+ if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+ if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
+ instChanged = true;
}
inline void adjustTxHiLo (void){
- if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
- if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
- if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
- if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
- instChanged = true;
+ if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+ if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
+ if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+ if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
+ instChanged = true;
}
inline void adjustConsumerHiLo (void){
- if (consumers > consumersHigh) consumersHigh = consumers;
- if (consumers < consumersLow) consumersLow = consumers;
- instChanged = true;
+ if (consumers > consumersHigh) consumersHigh = consumers;
+ if (consumers < consumersLow) consumersLow = consumers;
+ instChanged = true;
}
public:
@@ -124,51 +125,51 @@ class ManagementObjectQueue : public ManagementObject
// messages when counting statistics.
inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
- msgTotalEnqueues++;
- byteTotalEnqueues += bytes;
-
- if (attrMask & MSG_MASK_TX){
- msgTxEnqueues++;
- byteTxEnqueues += bytes;
- }
-
- if (attrMask & MSG_MASK_PERSIST){
- msgPersistEnqueues++;
- bytePersistEnqueues += bytes;
- }
-
- msgDepth++;
- byteDepth += bytes;
- adjustQueueHiLo ();
+ msgTotalEnqueues++;
+ byteTotalEnqueues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxEnqueues++;
+ byteTxEnqueues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistEnqueues++;
+ bytePersistEnqueues += bytes;
+ }
+
+ msgDepth++;
+ byteDepth += bytes;
+ adjustQueueHiLo ();
}
inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
- msgTotalDequeues++;
- byteTotalDequeues += bytes;
-
- if (attrMask & MSG_MASK_TX){
- msgTxDequeues++;
- byteTxDequeues += bytes;
- }
-
- if (attrMask & MSG_MASK_PERSIST){
- msgPersistDequeues++;
- bytePersistDequeues += bytes;
- }
-
- msgDepth--;
- byteDepth -= bytes;
- adjustQueueHiLo ();
+ msgTotalDequeues++;
+ byteTotalDequeues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxDequeues++;
+ byteTxDequeues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistDequeues++;
+ bytePersistDequeues += bytes;
+ }
+
+ msgDepth--;
+ byteDepth -= bytes;
+ adjustQueueHiLo ();
}
inline void incConsumers (void){
- consumers++;
- adjustConsumerHiLo ();
+ consumers++;
+ adjustConsumerHiLo ();
}
inline void decConsumers (void){
- consumers--;
- adjustConsumerHiLo ();
+ consumers--;
+ adjustConsumerHiLo ();
}
};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index ef1358feb9..116e8d9431 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -293,6 +293,10 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){
}
browsers.push_back(c);
}
+
+ if (mgmtObjectPtr != 0){
+ mgmtObjectPtr->incConsumers ();
+ }
}
void Queue::cancel(Consumer::ptr c){
@@ -302,6 +306,9 @@ void Queue::cancel(Consumer::ptr c){
} else {
cancel(c, browsers);
}
+ if (mgmtObjectPtr != 0){
+ mgmtObjectPtr->decConsumers ();
+ }
if(exclusive == c) exclusive.reset();
}
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 927de4c079..6c87e5ff98 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -21,6 +21,7 @@
#include "QueueRegistry.h"
#include "ManagementAgent.h"
#include "ManagementObjectQueue.h"
+#include "qpid/log/Statement.h"
#include <sstream>
#include <assert.h>
@@ -42,33 +43,32 @@ QueueRegistry::declare(const string& declareName, bool durable,
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
- queues[name] = queue;
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+ queues[name] = queue;
- if (managementAgent){
- ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete));
+ if (managementAgent){
+ ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete));
- queue->setMgmt (mgmtObject);
- managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
- }
-
- return std::pair<Queue::shared_ptr, bool>(queue, true);
+ queue->setMgmt (mgmtObject);
+ managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
+ }
+
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
}
}
void QueueRegistry::destroy(const string& name){
RWlock::ScopedWlock locker(lock);
-
if (managementAgent){
- ManagementObjectQueue::shared_ptr mgmtObject;
- QueueMap::iterator i = queues.find(name);
+ ManagementObjectQueue::shared_ptr mgmtObject;
+ QueueMap::iterator i = queues.find(name);
- if (i != queues.end()){
- mgmtObject = i->second->getMgmt ();
- managementAgent->deleteObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
- }
+ if (i != queues.end()){
+ mgmtObject = i->second->getMgmt ();
+ mgmtObject->resourceDestroy ();
+ }
}
queues.erase(name);
@@ -79,20 +79,20 @@ Queue::shared_ptr QueueRegistry::find(const string& name){
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
- return Queue::shared_ptr();
+ return Queue::shared_ptr();
} else {
- return i->second;
+ return i->second;
}
}
string QueueRegistry::generateName(){
string name;
do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
} while(queues.find(name) != queues.end());
return name;
}