summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-29 22:07:03 +0000
committerTed Ross <tross@apache.org>2010-01-29 22:07:03 +0000
commit6f16d9d4fa398056a817726ca8512f356422353d (patch)
tree3336090b503ae69264abd3d43e84f3be02af41c6
parentcbb2022f29b9905e8c80f5622441a46778cb6a4b (diff)
downloadqpid-python-6f16d9d4fa398056a817726ca8512f356422353d.tar.gz
QPID-2251, QPID-1982 - Added alternative to non-portable FD notifier in the c++ QMF agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@904645 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/agent/ManagementAgent.h23
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp63
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h6
3 files changed, 74 insertions, 18 deletions
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h
index 1a8d0c4025..b0f0f1cec4 100644
--- a/qpid/cpp/include/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h
@@ -30,6 +30,12 @@
namespace qpid {
namespace management {
+class Notifyable {
+public:
+ virtual ~Notifyable() {}
+ virtual void notify() = 0;
+};
+
class ManagementAgent
{
public:
@@ -150,11 +156,20 @@ class ManagementAgent
virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0;
// If "useExternalThread" was set to true in the constructor, this method provides
- // a standard file descriptor that can be used in a select statement to signal that
- // there are method callbacks ready (i.e. that "pollCallbacks" will result in at
- // least one method call). When this fd is ready-for-read, pollCallbacks may be
- // invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd.
+ // a callback that is invoked whenever there is work to be done by pollCallbacks.
+ // This function is invoked on the agent's thread and should not perform any work
+ // except to signal the application's thread.
+ //
+ // There are two flavors of callback:
+ // A C version that uses a pointer to a function with a void* context
+ // A C++ version that uses a class derived from Notifyable
+ //
+ // Either type of callback may be used. If they are both provided, the C++ callback
+ // will be the only one invoked.
//
+ typedef void (*cb_t)(void*);
+ virtual void setSignalCallback(cb_t callback, void* context) = 0;
+ virtual void setSignalCallback(Notifyable& notifyable) = 0;
virtual int getSignalFd() = 0;
};
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f84e158154..467f70510b 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -21,7 +21,6 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/PipeHandle.h"
#include "qpid/agent/ManagementAgentImpl.h"
#include <list>
#include <string.h>
@@ -78,7 +77,8 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
- interval(10), extThread(false), pipeHandle(0),
+ interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
+ notifyable(0), inCallback(false),
initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -149,12 +149,6 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
" interval=" << intervalSeconds << " storeFile=" << _storeFile);
connectionSettings = settings;
- // TODO: Abstract the socket calls for portability
- // qpid::sys::PipeHandle to create a pipe
- if (extThread) {
- pipeHandle = new PipeHandle(true);
- }
-
retrieveData();
bootSequence++;
if ((bootSequence & 0xF000) != 0)
@@ -226,6 +220,11 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
Mutex::ScopedLock lock(agentLock);
+ if (inCallback) {
+ QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!");
+ return 0;
+ }
+
for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
if (methodQueue.empty())
break;
@@ -239,15 +238,35 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
delete item;
}
}
-
- char rbuf[100];
- while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+
+ if (pipeHandle != 0) {
+ char rbuf[100];
+ while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+ }
return methodQueue.size();
}
-int ManagementAgentImpl::getSignalFd(void)
+int ManagementAgentImpl::getSignalFd()
{
- return pipeHandle->getReadHandle();
+ if (extThread) {
+ pipeHandle = new PipeHandle(true);
+ return pipeHandle->getReadHandle();
+ }
+
+ return -1;
+}
+
+void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context)
+{
+ Mutex::ScopedLock lock(agentLock);
+ notifyCallback = callback;
+ notifyContext = context;
+}
+
+void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
+{
+ Mutex::ScopedLock lock(agentLock);
+ notifyable = &_notifyable;
}
void ManagementAgentImpl::startProtocol()
@@ -528,7 +547,23 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getRawData(body, inBuffer.available());
methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
- pipeHandle->write("X", 1);
+ if (pipeHandle != 0) {
+ pipeHandle->write("X", 1);
+ } else if (notifyable != 0) {
+ inCallback = true;
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ notifyable->notify();
+ }
+ inCallback = false;
+ } else if (notifyCallback != 0) {
+ inCallback = true;
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ notifyCallback(notifyContext);
+ }
+ inCallback = false;
+ }
} else {
invokeMethodRequest(inBuffer, sequence, replyTo);
}
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index b1efa1809b..affaa45d2d 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -78,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
uint32_t pollCallbacks(uint32_t callLimit = 0);
int getSignalFd();
+ void setSignalCallback(cb_t callback, void* context);
+ void setSignalCallback(Notifyable& n);
uint16_t getInterval() { return interval; }
void periodicProcessing();
@@ -142,6 +144,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
bool extThread;
sys::PipeHandle* pipeHandle;
uint64_t nextObjectId;
+ cb_t notifyCallback;
+ void* notifyContext;
+ Notifyable* notifyable;
+ bool inCallback;
std::string storeFile;
sys::Mutex agentLock;
sys::Mutex addLock;