diff options
-rw-r--r-- | qpid/cpp/include/qpid/agent/ManagementAgent.h | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 63 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 6 |
3 files changed, 74 insertions, 18 deletions
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h index 1a8d0c4025..b0f0f1cec4 100644 --- a/qpid/cpp/include/qpid/agent/ManagementAgent.h +++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h @@ -30,6 +30,12 @@ namespace qpid { namespace management { +class Notifyable { +public: + virtual ~Notifyable() {} + virtual void notify() = 0; +}; + class ManagementAgent { public: @@ -150,11 +156,20 @@ class ManagementAgent virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0; // If "useExternalThread" was set to true in the constructor, this method provides - // a standard file descriptor that can be used in a select statement to signal that - // there are method callbacks ready (i.e. that "pollCallbacks" will result in at - // least one method call). When this fd is ready-for-read, pollCallbacks may be - // invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd. + // a callback that is invoked whenever there is work to be done by pollCallbacks. + // This function is invoked on the agent's thread and should not perform any work + // except to signal the application's thread. + // + // There are two flavors of callback: + // A C version that uses a pointer to a function with a void* context + // A C++ version that uses a class derived from Notifyable + // + // Either type of callback may be used. If they are both provided, the C++ callback + // will be the only one invoked. // + typedef void (*cb_t)(void*); + virtual void setSignalCallback(cb_t callback, void* context) = 0; + virtual void setSignalCallback(Notifyable& notifyable) = 0; virtual int getSignalFd() = 0; }; diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index f84e158154..467f70510b 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -21,7 +21,6 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" -#include "qpid/sys/PipeHandle.h" #include "qpid/agent/ManagementAgentImpl.h" #include <list> #include <string.h> @@ -78,7 +77,8 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : - interval(10), extThread(false), pipeHandle(0), + interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), + notifyable(0), inCallback(false), initialized(false), connected(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), @@ -149,12 +149,6 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, " interval=" << intervalSeconds << " storeFile=" << _storeFile); connectionSettings = settings; - // TODO: Abstract the socket calls for portability - // qpid::sys::PipeHandle to create a pipe - if (extThread) { - pipeHandle = new PipeHandle(true); - } - retrieveData(); bootSequence++; if ((bootSequence & 0xF000) != 0) @@ -226,6 +220,11 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) { Mutex::ScopedLock lock(agentLock); + if (inCallback) { + QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!"); + return 0; + } + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { if (methodQueue.empty()) break; @@ -239,15 +238,35 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) delete item; } } - - char rbuf[100]; - while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes + + if (pipeHandle != 0) { + char rbuf[100]; + while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes + } return methodQueue.size(); } -int ManagementAgentImpl::getSignalFd(void) +int ManagementAgentImpl::getSignalFd() { - return pipeHandle->getReadHandle(); + if (extThread) { + pipeHandle = new PipeHandle(true); + return pipeHandle->getReadHandle(); + } + + return -1; +} + +void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context) +{ + Mutex::ScopedLock lock(agentLock); + notifyCallback = callback; + notifyContext = context; +} + +void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) +{ + Mutex::ScopedLock lock(agentLock); + notifyable = &_notifyable; } void ManagementAgentImpl::startProtocol() @@ -528,7 +547,23 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inBuffer.getRawData(body, inBuffer.available()); methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); - pipeHandle->write("X", 1); + if (pipeHandle != 0) { + pipeHandle->write("X", 1); + } else if (notifyable != 0) { + inCallback = true; + { + Mutex::ScopedUnlock unlock(agentLock); + notifyable->notify(); + } + inCallback = false; + } else if (notifyCallback != 0) { + inCallback = true; + { + Mutex::ScopedUnlock unlock(agentLock); + notifyCallback(notifyContext); + } + inCallback = false; + } } else { invokeMethodRequest(inBuffer, sequence, replyTo); } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index b1efa1809b..affaa45d2d 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -78,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT); uint32_t pollCallbacks(uint32_t callLimit = 0); int getSignalFd(); + void setSignalCallback(cb_t callback, void* context); + void setSignalCallback(Notifyable& n); uint16_t getInterval() { return interval; } void periodicProcessing(); @@ -142,6 +144,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen bool extThread; sys::PipeHandle* pipeHandle; uint64_t nextObjectId; + cb_t notifyCallback; + void* notifyContext; + Notifyable* notifyable; + bool inCallback; std::string storeFile; sys::Mutex agentLock; sys::Mutex addLock; |