summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-29 22:07:03 +0000
committerTed Ross <tross@apache.org>2010-01-29 22:07:03 +0000
commit73ad8a2de26f0c7830aacb608b4b6ea44914f683 (patch)
tree60403e69f3483b0d92c567c77cefd1525525b04d /cpp/src
parent5cd0ee04a9c1095b96f312d5694f607b07d59b63 (diff)
downloadqpid-python-73ad8a2de26f0c7830aacb608b4b6ea44914f683.tar.gz
QPID-2251, QPID-1982 - Added alternative to non-portable FD notifier in the c++ QMF agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904645 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp63
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h6
2 files changed, 55 insertions, 14 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index f84e158154..467f70510b 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -21,7 +21,6 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/PipeHandle.h"
#include "qpid/agent/ManagementAgentImpl.h"
#include <list>
#include <string.h>
@@ -78,7 +77,8 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
- interval(10), extThread(false), pipeHandle(0),
+ interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
+ notifyable(0), inCallback(false),
initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -149,12 +149,6 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
" interval=" << intervalSeconds << " storeFile=" << _storeFile);
connectionSettings = settings;
- // TODO: Abstract the socket calls for portability
- // qpid::sys::PipeHandle to create a pipe
- if (extThread) {
- pipeHandle = new PipeHandle(true);
- }
-
retrieveData();
bootSequence++;
if ((bootSequence & 0xF000) != 0)
@@ -226,6 +220,11 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
Mutex::ScopedLock lock(agentLock);
+ if (inCallback) {
+ QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!");
+ return 0;
+ }
+
for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
if (methodQueue.empty())
break;
@@ -239,15 +238,35 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
delete item;
}
}
-
- char rbuf[100];
- while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+
+ if (pipeHandle != 0) {
+ char rbuf[100];
+ while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
+ }
return methodQueue.size();
}
-int ManagementAgentImpl::getSignalFd(void)
+int ManagementAgentImpl::getSignalFd()
{
- return pipeHandle->getReadHandle();
+ if (extThread) {
+ pipeHandle = new PipeHandle(true);
+ return pipeHandle->getReadHandle();
+ }
+
+ return -1;
+}
+
+void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context)
+{
+ Mutex::ScopedLock lock(agentLock);
+ notifyCallback = callback;
+ notifyContext = context;
+}
+
+void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
+{
+ Mutex::ScopedLock lock(agentLock);
+ notifyable = &_notifyable;
}
void ManagementAgentImpl::startProtocol()
@@ -528,7 +547,23 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getRawData(body, inBuffer.available());
methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
- pipeHandle->write("X", 1);
+ if (pipeHandle != 0) {
+ pipeHandle->write("X", 1);
+ } else if (notifyable != 0) {
+ inCallback = true;
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ notifyable->notify();
+ }
+ inCallback = false;
+ } else if (notifyCallback != 0) {
+ inCallback = true;
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ notifyCallback(notifyContext);
+ }
+ inCallback = false;
+ }
} else {
invokeMethodRequest(inBuffer, sequence, replyTo);
}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index b1efa1809b..affaa45d2d 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -78,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
uint32_t pollCallbacks(uint32_t callLimit = 0);
int getSignalFd();
+ void setSignalCallback(cb_t callback, void* context);
+ void setSignalCallback(Notifyable& n);
uint16_t getInterval() { return interval; }
void periodicProcessing();
@@ -142,6 +144,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
bool extThread;
sys::PipeHandle* pipeHandle;
uint64_t nextObjectId;
+ cb_t notifyCallback;
+ void* notifyContext;
+ Notifyable* notifyable;
+ bool inCallback;
std::string storeFile;
sys::Mutex agentLock;
sys::Mutex addLock;