summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-10-31 00:43:11 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-10-31 00:43:11 +0000
commit7dad525a5c71fe346a275646945793d36adf4c9a (patch)
tree80380e4d0286d93dca96c4b01c7704f091e07aaa /qpid/cpp
parent9fd2a66990309c75ffcc461fc010f0cf32505b71 (diff)
downloadqpid-python-7dad525a5c71fe346a275646945793d36adf4c9a.tar.gz
- QPID-667
- for Ted Ross - built and tested. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@590523 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/ManagementAgent.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/ManagementAgent.h17
-rw-r--r--qpid/cpp/src/qpid/broker/ManagementExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/ManagementObject.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp62
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h20
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h5
8 files changed, 84 insertions, 65 deletions
diff --git a/qpid/cpp/src/qpid/broker/ManagementAgent.cpp b/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
index 71b027c7df..2f6a0597f0 100644
--- a/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/broker/ManagementAgent.cpp
@@ -25,6 +25,7 @@
#include <qpid/broker/Message.h>
#include <qpid/broker/MessageDelivery.h>
#include <qpid/framing/AMQFrame.h>
+#include <list>
using namespace qpid::framing;
using namespace qpid::broker;
@@ -57,7 +58,7 @@ void ManagementAgent::Periodic::fire ()
void ManagementAgent::clientAdded (void)
{
- for (ManagementObjectList::iterator iter = managementObjects.begin ();
+ for (ManagementObjectVector::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
iter++)
{
@@ -74,6 +75,7 @@ void ManagementAgent::PeriodicProcessing (void)
char msgChars[BUFSIZE];
Buffer msgBuffer (msgChars, BUFSIZE);
uint32_t contentSize;
+ std::list<uint32_t> deleteList;
if (managementObjects.empty ())
return;
@@ -86,11 +88,9 @@ void ManagementAgent::PeriodicProcessing (void)
msgBuffer.putOctet ('0');
msgBuffer.putOctet ('1');
- for (ManagementObjectList::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
+ for (uint32_t idx = 0; idx < managementObjects.size (); idx++)
{
- ManagementObject::shared_ptr object = *iter;
+ ManagementObject::shared_ptr object = managementObjects[idx];
if (object->getSchemaNeeded ())
{
@@ -147,10 +147,7 @@ void ManagementAgent::PeriodicProcessing (void)
}
if (object->isDeleted ())
- {
- managementObjects.remove (object);
- QPID_LOG (debug, "Management Object Removed");
- }
+ deleteList.push_back (idx);
// Temporary protection against buffer overrun.
// This needs to be replaced with frame fragmentation.
@@ -189,5 +186,20 @@ void ManagementAgent::PeriodicProcessing (void)
DeliverableMessage deliverable (msg);
exchange->route (deliverable, "mgmt", 0);
+
+ // Delete flagged objects
+ for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ {
+ managementObjects.erase (managementObjects.begin () + *iter);
+ }
+ deleteList.clear ();
+}
+
+void ManagementAgent::dispatchCommand (Deliverable& /*msg*/,
+ const string& /*routingKey*/,
+ const FieldTable* /*args*/)
+{
}
diff --git a/qpid/cpp/src/qpid/broker/ManagementAgent.h b/qpid/cpp/src/qpid/broker/ManagementAgent.h
index 1e332023a6..e1d62270db 100644
--- a/qpid/cpp/src/qpid/broker/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/broker/ManagementAgent.h
@@ -40,9 +40,12 @@ class ManagementAgent
ManagementAgent (uint16_t interval);
- void setExchange (Exchange::shared_ptr exchange);
- void addObject (ManagementObject::shared_ptr object);
- void clientAdded (void);
+ void setExchange (Exchange::shared_ptr exchange);
+ void addObject (ManagementObject::shared_ptr object);
+ void clientAdded (void);
+ void dispatchCommand (Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args);
private:
@@ -55,10 +58,10 @@ class ManagementAgent
void fire ();
};
- ManagementObjectList managementObjects;
- Timer timer;
- Exchange::shared_ptr exchange;
- uint16_t interval;
+ ManagementObjectVector managementObjects;
+ Timer timer;
+ Exchange::shared_ptr exchange;
+ uint16_t interval;
void PeriodicProcessing (void);
};
diff --git a/qpid/cpp/src/qpid/broker/ManagementExchange.cpp b/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
index d3de8bc8e1..5d829477ba 100644
--- a/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/ManagementExchange.cpp
@@ -57,8 +57,7 @@ void ManagementExchange::route (Deliverable& msg,
if (routingKey.length () > 7 &&
routingKey.substr (0, 7).compare ("method.") == 0)
{
- QPID_LOG (debug, "ManagementExchange: Intercept command " << routingKey);
- // TODO: Send intercepted commands to ManagementAgent for dispatch
+ managementAgent->dispatchCommand (msg, routingKey, args);
return;
}
diff --git a/qpid/cpp/src/qpid/broker/ManagementObject.h b/qpid/cpp/src/qpid/broker/ManagementObject.h
index 243d853727..237f2f3d79 100644
--- a/qpid/cpp/src/qpid/broker/ManagementObject.h
+++ b/qpid/cpp/src/qpid/broker/ManagementObject.h
@@ -25,7 +25,7 @@
#include "qpid/sys/Time.h"
#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
-#include <list>
+#include <vector>
namespace qpid {
namespace broker {
@@ -108,7 +108,7 @@ class ManagementObject
};
- typedef std::list<ManagementObject::shared_ptr> ManagementObjectList;
+ typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
}}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 18c1ab1056..d668ee0505 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -71,18 +71,18 @@ void Queue::deliver(Message::shared_ptr& msg){
} else {
- // if no store then mark as enqueued
+ // if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
- msg->enqueueComplete();
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize ());
- }else {
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ msg->enqueueComplete();
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize ());
+ }else {
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
push(msg);
- }
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+ }
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
serializer.execute(dispatchCallback);
}
}
@@ -91,8 +91,8 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -108,20 +108,19 @@ void Queue::process(Message::shared_ptr& msg){
mask |= MSG_MASK_PERSIST;
push(msg);
- if (mgmtObjectPtr != 0)
- mgmtObjectPtr->enqueue (msg->contentSize (), mask);
+ if (mgmtObject != 0)
+ mgmtObject->enqueue (msg->contentSize (), mask);
serializer.execute(dispatchCallback);
}
void Queue::requeue(const QueuedMessage& msg){
{
- Mutex::ScopedLock locker(messageLock);
- msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.push_front(msg);
+ Mutex::ScopedLock locker(messageLock);
+ msg.payload->enqueueComplete(); // mark the message as enqueued
+ messages.push_front(msg);
}
serializer.execute(dispatchCallback);
-
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -221,7 +220,7 @@ void Queue::dispatch()
QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
} else {
break;
- }
+ }
}
serviceAllBrowsers();
}
@@ -290,8 +289,8 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){
browsers.push_back(c);
}
- if (mgmtObjectPtr != 0){
- mgmtObjectPtr->incConsumers ();
+ if (mgmtObject != 0){
+ mgmtObject->incConsumers ();
}
}
@@ -302,8 +301,8 @@ void Queue::cancel(Consumer::ptr c){
} else {
cancel(c, browsers);
}
- if (mgmtObjectPtr != 0){
- mgmtObjectPtr->decConsumers ();
+ if (mgmtObject != 0){
+ mgmtObject->decConsumers ();
}
if(exclusive == c) exclusive.reset();
}
@@ -318,17 +317,18 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers)
QueuedMessage Queue::dequeue(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
+
if(!messages.empty()){
msg = messages.front();
pop();
- if (mgmtObjectPtr != 0){
- uint32_t mask = 0;
+ if (mgmtObject != 0){
+ uint32_t mask = 0;
- if (msg.payload->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ if (msg.payload->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
- mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
- }
+ mgmtObject->dequeue (msg.payload->contentSize (), mask);
+ }
}
return msg;
}
@@ -343,7 +343,7 @@ uint32_t Queue::purge(){
void Queue::pop(){
Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
- messages.pop_front();
+ messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
@@ -390,7 +390,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
if (msg->isPersistent() && store) {
msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->enqueue(ctxt, *msg.get(), *this);
- return true;
+ return true;
}
return false;
}
@@ -401,7 +401,7 @@ bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
if (msg->isPersistent() && store) {
msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
store->dequeue(ctxt, *msg.get(), *this);
- return true;
+ return true;
}
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 5146024b6b..17f2d8ba91 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -95,7 +95,7 @@ namespace qpid {
qpid::sys::Serializer<DispatchFunctor> serializer;
DispatchFunctor dispatchCallback;
framing::SequenceNumber sequence;
- ManagementObjectQueue::shared_ptr mgmtObjectPtr;
+ ManagementObjectQueue::shared_ptr mgmtObject;
void pop();
void push(Message::shared_ptr& msg);
@@ -103,7 +103,7 @@ namespace qpid {
void setPolicy(std::auto_ptr<QueuePolicy> policy);
/**
* only called by serilizer
- */
+ */
void dispatch();
void cancel(Consumer::ptr c, Consumers& set);
void serviceAllBrowsers();
@@ -115,16 +115,16 @@ namespace qpid {
bool exclude(Message::shared_ptr msg);
protected:
- /**
- * Call back from store
- */
- virtual void notifyDurableIOComplete();
+ /**
+ * Call back from store
+ */
+ virtual void notifyDurableIOComplete();
public:
typedef boost::shared_ptr<Queue> shared_ptr;
typedef std::vector<shared_ptr> vector;
-
+
Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
const ConnectionToken* const owner = 0);
@@ -135,8 +135,8 @@ namespace qpid {
void destroy();
void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
- void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt; }
- ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; }
+ void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; }
+ ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; }
bool acquire(const QueuedMessage& msg);
@@ -165,7 +165,7 @@ namespace qpid {
* Request dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, so this call schedules the despatch based on
- * the serilizer policy.
+ * the serilizer policy.
*/
void requestDispatch(Consumer::ptr c = Consumer::ptr());
void flush(DispatchCompletion& callback);
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 6c87e5ff98..bc572e4238 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -59,8 +59,7 @@ QueueRegistry::declare(const string& declareName, bool durable,
}
}
-void QueueRegistry::destroy(const string& name){
- RWlock::ScopedWlock locker(lock);
+void QueueRegistry::destroyLH (const string& name){
if (managementAgent){
ManagementObjectQueue::shared_ptr mgmtObject;
QueueMap::iterator i = queues.find(name);
@@ -74,6 +73,11 @@ void QueueRegistry::destroy(const string& name){
queues.erase(name);
}
+void QueueRegistry::destroy (const string& name){
+ RWlock::ScopedWlock locker(lock);
+ destroyLH (name);
+}
+
Queue::shared_ptr QueueRegistry::find(const string& name){
RWlock::ScopedRlock locker(lock);
QueueMap::iterator i = queues.find(name);
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 07669bb3a1..03b4778f7a 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -62,12 +62,13 @@ class QueueRegistry{
* subsequent calls to find or declare with the same name.
*
*/
- void destroy(const string& name);
+ void destroyLH (const string& name);
+ void destroy (const string& name);
template <class Test> bool destroyIf(const string& name, Test test)
{
qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
- queues.erase(name);
+ destroyLH (name);
return true;
} else {
return false;