summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2011-09-13 19:34:38 +0000
committerTed Ross <tross@apache.org>2011-09-13 19:34:38 +0000
commit97cde6a461403e847799ccfe55e699bc311ae24b (patch)
tree429d1d2d750d023cddac6d2291e5d66f27ba401b
parenta1aec7ac41539186a093ac750ce925ca5d9aca81 (diff)
downloadqpid-python-97cde6a461403e847799ccfe55e699bc311ae24b.tar.gz
QPID-3484 - QMF Main-Loop Integration
Applied patch from Darryl Pierce. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1170314 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/bindings/qmf2/examples/cpp/Makefile.am5
-rw-r--r--cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp107
-rw-r--r--cpp/include/qmf/AgentSession.h1
-rw-r--r--cpp/include/qmf/ConsoleSession.h1
-rw-r--r--cpp/include/qmf/posix/EventNotifier.h62
-rw-r--r--cpp/src/CMakeLists.txt5
-rw-r--r--cpp/src/qmf.mk4
-rw-r--r--cpp/src/qmf/AgentSession.cpp167
-rw-r--r--cpp/src/qmf/AgentSessionImpl.h175
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp38
-rw-r--r--cpp/src/qmf/ConsoleSessionImpl.h17
-rw-r--r--cpp/src/qmf/EventNotifierImpl.cpp56
-rw-r--r--cpp/src/qmf/EventNotifierImpl.h48
-rw-r--r--cpp/src/qmf/PosixEventNotifier.cpp63
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.cpp108
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.h61
-rw-r--r--cpp/src/tests/Qmf2.cpp104
17 files changed, 890 insertions, 132 deletions
diff --git a/cpp/bindings/qmf2/examples/cpp/Makefile.am b/cpp/bindings/qmf2/examples/cpp/Makefile.am
index 84207d43c4..062fbd0a85 100644
--- a/cpp/bindings/qmf2/examples/cpp/Makefile.am
+++ b/cpp/bindings/qmf2/examples/cpp/Makefile.am
@@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include
AM_CPPFLAGS = $(INCLUDE)
-noinst_PROGRAMS=agent list_agents print_events
+noinst_PROGRAMS=agent event_driven_list_agents list_agents print_events
agent_SOURCES=agent.cpp
agent_LDADD=$(top_builddir)/src/libqmf2.la
@@ -29,5 +29,8 @@ agent_LDADD=$(top_builddir)/src/libqmf2.la
list_agents_SOURCES=list_agents.cpp
list_agents_LDADD=$(top_builddir)/src/libqmf2.la
+event_driven_list_agents_SOURCES=event_driven_list_agents.cpp
+event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la
+
print_events_SOURCES=print_events.cpp
print_events_LDADD=$(top_builddir)/src/libqmf2.la
diff --git a/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp b/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp
new file mode 100644
index 0000000000..c288aa6bdd
--- /dev/null
+++ b/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sys/select.h>
+#include <time.h>
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Duration.h>
+#include <qmf/Agent.h>
+#include <qmf/ConsoleEvent.h>
+#include <qmf/ConsoleSession.h>
+#include <qpid/types/Variant.h>
+#include "qmf/posix/EventNotifier.h"
+
+#include <string>
+#include <iostream>
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+using qpid::messaging::Duration;
+
+int main(int argc, char** argv)
+{
+ string url("localhost");
+ string connectionOptions;
+ string sessionOptions;
+
+ if (argc > 1)
+ url = argv[1];
+ if (argc > 2)
+ connectionOptions = argv[2];
+ if (argc > 3)
+ sessionOptions = argv[3];
+
+ qpid::messaging::Connection connection(url, connectionOptions);
+ connection.open();
+
+ ConsoleSession session(connection, sessionOptions);
+ session.open();
+ session.setAgentFilter("");
+
+ posix::EventNotifier notifier(session);
+
+ int fd(notifier.getHandle());
+ time_t lastUpdate;
+ bool ftl = false;
+
+ time(&lastUpdate);
+
+ while (true) {
+ fd_set rfds;
+ struct timeval tv;
+ int nfds, retval;
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+ nfds = fd + 1;
+ tv.tv_sec = 10;
+ tv.tv_usec = 0;
+
+ retval = select(nfds, &rfds, NULL, NULL, &tv);
+
+ if (retval > 0 && FD_ISSET(fd, &rfds)) {
+ ConsoleEvent event;
+ while (session.nextEvent(event, Duration::IMMEDIATE)) {
+ string eventType = "";
+ switch(event.getType()) {
+ case CONSOLE_AGENT_ADD: eventType = "Added"; break;
+ case CONSOLE_AGENT_DEL: eventType = "Deleted"; break;
+ case CONSOLE_AGENT_RESTART: eventType = "Restarted"; break;
+ case CONSOLE_AGENT_SCHEMA_UPDATE: eventType = "Schema Updated"; break;
+ case CONSOLE_AGENT_SCHEMA_RESPONSE: eventType = "Schema Response"; break;
+ case CONSOLE_EVENT: eventType = "Event"; break;
+ case CONSOLE_QUERY_RESPONSE: eventType = "Query Response"; break;
+ case CONSOLE_METHOD_RESPONSE: eventType = "Method Response"; break;
+ case CONSOLE_EXCEPTION: eventType = "Exception"; break;
+ case CONSOLE_SUBSCRIBE_ADD: eventType = "Subscription Added"; break;
+ case CONSOLE_SUBSCRIBE_UPDATE: eventType = "Subscription Updated"; break;
+ case CONSOLE_SUBSCRIBE_DEL: eventType = "Subscription Deleted" ; break;
+ case CONSOLE_THREAD_FAILED: eventType = "Thread Failure"; break;
+ default: eventType = "[UNDEFINED]";
+ }
+ cout << "Agent " << eventType << ": " << event.getAgent().getName() << endl;
+ }
+ } else {
+ cout << "No message received within waiting period." << endl;
+ }
+ }
+}
+
diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h
index 5ecfb0412c..589d364bcc 100644
--- a/cpp/include/qmf/AgentSession.h
+++ b/cpp/include/qmf/AgentSession.h
@@ -188,6 +188,7 @@ namespace qmf {
#ifndef SWIG
private:
friend class qmf::PrivateImplRef<AgentSession>;
+ friend struct AgentSessionImplAccess;
#endif
};
diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h
index 5e3a091e5d..022485cfa7 100644
--- a/cpp/include/qmf/ConsoleSession.h
+++ b/cpp/include/qmf/ConsoleSession.h
@@ -123,6 +123,7 @@ namespace qmf {
#ifndef SWIG
private:
friend class qmf::PrivateImplRef<ConsoleSession>;
+ friend struct ConsoleSessionImplAccess;
#endif
};
diff --git a/cpp/include/qmf/posix/EventNotifier.h b/cpp/include/qmf/posix/EventNotifier.h
new file mode 100644
index 0000000000..91817cc771
--- /dev/null
+++ b/cpp/include/qmf/posix/EventNotifier.h
@@ -0,0 +1,62 @@
+#ifndef __QMF_POSIX_EVENT_NOTIFIER_H
+#define __QMF_POSIX_EVENT_NOTIFIER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/ImportExport.h>
+#include "qmf/Handle.h"
+#include "qmf/AgentSession.h"
+#include "qmf/ConsoleSession.h"
+
+namespace qmf {
+
+ class PosixEventNotifierImpl;
+ class PosixEventNotifierImplAccess;
+
+namespace posix {
+
+#ifndef SWIG
+ template <class> class PrivateImplRef;
+#endif
+
+ class QMF_CLASS_EXTERN EventNotifier : public qmf::Handle<qmf::PosixEventNotifierImpl> {
+ public:
+ QMF_EXTERN EventNotifier(::qmf::AgentSession& agentSession);
+ QMF_EXTERN EventNotifier(::qmf::ConsoleSession& consoleSession);
+ QMF_EXTERN EventNotifier(const EventNotifier& that);
+
+ QMF_EXTERN ~EventNotifier();
+
+ QMF_EXTERN EventNotifier& operator=(const EventNotifier& that);
+
+ QMF_EXTERN int getHandle() const;
+
+#ifndef SWIG
+ private:
+ friend class qmf::PrivateImplRef<EventNotifier>;
+ friend struct qmf::PosixEventNotifierImplAccess;
+#endif
+
+ };
+
+}}
+
+#endif
+
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 49f94322d8..d8496c5122 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1100,6 +1100,7 @@ if(NOT WIN32)
../include/qmf/exceptions.h
../include/qmf/Handle.h
../include/qmf/ImportExport.h
+ ../include/qmf/posix/EventNotifier.h
../include/qmf/Query.h
../include/qmf/Schema.h
../include/qmf/SchemaId.h
@@ -1129,6 +1130,10 @@ if(NOT WIN32)
qmf/DataAddrImpl.h
qmf/Data.cpp
qmf/DataImpl.h
+ qmf/EventNotifierImpl.h
+ qmf/EventNotifierImpl.cpp
+ qmf/PosixEventNotifier.cpp
+ qmf/PosixEventNotifierImpl.cpp
qmf/exceptions.cpp
qmf/Expression.cpp
qmf/Expression.h
diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk
index f3462f1a93..4da8470f2f 100644
--- a/cpp/src/qmf.mk
+++ b/cpp/src/qmf.mk
@@ -43,6 +43,7 @@ QMF2_API = \
../include/qmf/ConsoleSession.h \
../include/qmf/DataAddr.h \
../include/qmf/Data.h \
+ ../include/qmf/posix/EventNotifier.h \
../include/qmf/exceptions.h \
../include/qmf/Handle.h \
../include/qmf/ImportExport.h \
@@ -104,6 +105,9 @@ libqmf2_la_SOURCES = \
qmf/DataAddrImpl.h \
qmf/Data.cpp \
qmf/DataImpl.h \
+ qmf/EventNotifierImpl.cpp \
+ qmf/PosixEventNotifier.cpp \
+ qmf/PosixEventNotifierImpl.cpp \
qmf/exceptions.cpp \
qmf/Expression.cpp \
qmf/Expression.h \
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index a88782d107..d92b2a41d3 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -19,134 +19,7 @@
*
*/
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/AgentEventImpl.h"
-#include "qmf/SchemaIdImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/DataAddrImpl.h"
-#include "qmf/DataImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/agentCapability.h"
-#include "qmf/constants.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/AddressParser.h"
-#include "qpid/management/Buffer.h"
-#include <queue>
-#include <map>
-#include <set>
-#include <iostream>
-#include <memory>
-
-using namespace std;
-using namespace qpid::messaging;
-using namespace qmf;
-using qpid::types::Variant;
-
-namespace qmf {
- class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
- public:
- ~AgentSessionImpl();
-
- //
- // Methods from API handle
- //
- AgentSessionImpl(Connection& c, const string& o);
- void setDomain(const string& d) { checkOpen(); domain = d; }
- void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
- void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
- void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
- void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
- const string& getName() const { return agentName; }
- void open();
- void close();
- bool nextEvent(AgentEvent& e, Duration t);
- int pendingEvents() const;
-
- void registerSchema(Schema& s);
- DataAddr addData(Data& d, const string& n, bool persist);
- void delData(const DataAddr&);
-
- void authAccept(AgentEvent& e);
- void authReject(AgentEvent& e, const string& m);
- void raiseException(AgentEvent& e, const string& s);
- void raiseException(AgentEvent& e, const Data& d);
- void response(AgentEvent& e, const Data& d);
- void complete(AgentEvent& e);
- void methodSuccess(AgentEvent& e);
- void raiseEvent(const Data& d);
- void raiseEvent(const Data& d, int s);
-
- private:
- typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
- typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
-
- mutable qpid::sys::Mutex lock;
- qpid::sys::Condition cond;
- Connection connection;
- Session session;
- Sender directSender;
- Sender topicSender;
- string domain;
- Variant::Map attributes;
- Variant::Map options;
- string agentName;
- bool opened;
- queue<AgentEvent> eventQueue;
- qpid::sys::Thread* thread;
- bool threadCanceled;
- uint32_t bootSequence;
- uint32_t interval;
- uint64_t lastHeartbeat;
- uint64_t lastVisit;
- bool forceHeartbeat;
- bool externalStorage;
- bool autoAllowQueries;
- bool autoAllowMethods;
- uint32_t maxSubscriptions;
- uint32_t minSubInterval;
- uint32_t subLifetime;
- bool publicEvents;
- bool listenOnDirect;
- bool strictSecurity;
- uint32_t maxThreadWaitTime;
- uint64_t schemaUpdateTime;
- string directBase;
- string topicBase;
-
- SchemaMap schemata;
- DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
-
- void checkOpen();
- void setAgentName();
- void enqueueEvent(const AgentEvent&);
- void handleLocateRequest(const Variant::List& content, const Message& msg);
- void handleMethodRequest(const Variant::Map& content, const Message& msg);
- void handleQueryRequest(const Variant::Map& content, const Message& msg);
- void handleSchemaRequest(AgentEvent&);
- void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
- void dispatch(Message);
- void sendHeartbeat();
- void send(Message, const Address&);
- void flushResponses(AgentEvent&, bool);
- void periodicProcessing(uint64_t);
- void run();
- };
-}
-
-typedef qmf::PrivateImplRef<AgentSession> PI;
+#include "qmf/AgentSessionImpl.h"
AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
@@ -345,6 +218,8 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -359,6 +234,19 @@ int AgentSessionImpl::pendingEvents() const
}
+void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
void AgentSessionImpl::registerSchema(Schema& schema)
{
if (!schema.isFinalized())
@@ -614,8 +502,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
qpid::sys::Mutex::ScopedLock l(lock);
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -1059,6 +949,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
}
+void AgentSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void AgentSessionImpl::run()
{
QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
@@ -1089,3 +986,15 @@ void AgentSessionImpl::run()
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
+
+AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
+{
+ return *session.impl;
+}
+
+
+const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
+{
+ return *session.impl;
+}
+
diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h
new file mode 100644
index 0000000000..cf1b1d770f
--- /dev/null
+++ b/cpp/src/qmf/AgentSessionImpl.h
@@ -0,0 +1,175 @@
+#ifndef __QMF_AGENT_SESSION_IMPL_H
+#define __QMF_AGENT_SESSION_IMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/AgentSession.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/DataImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
+
+#include <queue>
+#include <map>
+#include <iostream>
+#include <memory>
+
+using namespace std;
+using namespace qpid::messaging;
+using namespace qmf;
+using qpid::types::Variant;
+using namespace boost;
+
+typedef qmf::PrivateImplRef<AgentSession> PI;
+
+namespace qmf {
+ class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
+ public:
+ ~AgentSessionImpl();
+
+ //
+ // Methods from API handle
+ //
+ AgentSessionImpl(Connection& c, const string& o);
+ void setDomain(const string& d) { checkOpen(); domain = d; }
+ void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
+ void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
+ void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
+ void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
+ const string& getName() const { return agentName; }
+ void open();
+ void close();
+ bool nextEvent(AgentEvent& e, Duration t);
+ int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* eventNotifier);
+ EventNotifierImpl* getEventNotifier() const;
+
+ void registerSchema(Schema& s);
+ DataAddr addData(Data& d, const string& n, bool persist);
+ void delData(const DataAddr&);
+
+ void authAccept(AgentEvent& e);
+ void authReject(AgentEvent& e, const string& m);
+ void raiseException(AgentEvent& e, const string& s);
+ void raiseException(AgentEvent& e, const Data& d);
+ void response(AgentEvent& e, const Data& d);
+ void complete(AgentEvent& e);
+ void methodSuccess(AgentEvent& e);
+ void raiseEvent(const Data& d);
+ void raiseEvent(const Data& d, int s);
+
+ private:
+ typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
+
+ mutable qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ Connection connection;
+ Session session;
+ Sender directSender;
+ Sender topicSender;
+ string domain;
+ Variant::Map attributes;
+ Variant::Map options;
+ string agentName;
+ bool opened;
+ queue<AgentEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
+ qpid::sys::Thread* thread;
+ bool threadCanceled;
+ uint32_t bootSequence;
+ uint32_t interval;
+ uint64_t lastHeartbeat;
+ uint64_t lastVisit;
+ bool forceHeartbeat;
+ bool externalStorage;
+ bool autoAllowQueries;
+ bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
+ uint32_t maxThreadWaitTime;
+ uint64_t schemaUpdateTime;
+ string directBase;
+ string topicBase;
+
+ SchemaMap schemata;
+ DataIndex globalIndex;
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
+
+ void checkOpen();
+ void setAgentName();
+ void enqueueEvent(const AgentEvent&);
+ void alertEventNotifierLH(bool readable);
+ void handleLocateRequest(const Variant::List& content, const Message& msg);
+ void handleMethodRequest(const Variant::Map& content, const Message& msg);
+ void handleQueryRequest(const Variant::Map& content, const Message& msg);
+ void handleSchemaRequest(AgentEvent&);
+ void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
+ void dispatch(Message);
+ void sendHeartbeat();
+ void send(Message, const Address&);
+ void flushResponses(AgentEvent&, bool);
+ void periodicProcessing(uint64_t);
+ void run();
+ };
+
+ struct AgentSessionImplAccess {
+ static AgentSessionImpl& get(AgentSession& session);
+ static const AgentSessionImpl& get(const AgentSession& session);
+ };
+}
+
+
+#endif
+
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index af835959cf..d084b8a8eb 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -237,6 +237,8 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -251,6 +253,20 @@ int ConsoleSessionImpl::pendingEvents() const
}
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ this->eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return this->eventNotifier;
+}
+
+
uint32_t ConsoleSessionImpl::getAgentCount() const
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -292,8 +308,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
{
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -602,6 +620,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
}
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void ConsoleSessionImpl::run()
{
QPID_LOG(debug, "ConsoleSession thread started");
@@ -633,3 +658,14 @@ void ConsoleSessionImpl::run()
QPID_LOG(debug, "ConsoleSession thread exiting");
}
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+ return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+ return *session.impl;
+}
diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h
index 478d24e56b..660fc9b83f 100644
--- a/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/cpp/src/qmf/ConsoleSessionImpl.h
@@ -27,6 +27,7 @@
#include "qmf/SchemaId.h"
#include "qmf/Schema.h"
#include "qmf/ConsoleEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
#include "qmf/SchemaCache.h"
#include "qmf/Query.h"
#include "qpid/sys/Mutex.h"
@@ -41,9 +42,14 @@
#include "qpid/messaging/Address.h"
#include "qpid/management/Buffer.h"
#include "qpid/types/Variant.h"
+
+#include <boost/shared_ptr.hpp>
#include <map>
#include <queue>
+using namespace boost;
+using namespace std;
+
namespace qmf {
class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
public:
@@ -59,6 +65,10 @@ namespace qmf {
void close();
bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* notifier);
+ EventNotifierImpl* getEventNotifier() const;
+
uint32_t getAgentCount() const;
Agent getAgent(uint32_t i) const;
Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -80,6 +90,7 @@ namespace qmf {
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
qpid::sys::Thread* thread;
bool threadCanceled;
uint64_t lastVisit;
@@ -102,11 +113,17 @@ namespace qmf {
void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
+ void alertEventNotifierLH(bool readable);
void run();
uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
friend class AgentImpl;
};
+
+ struct ConsoleSessionImplAccess {
+ static ConsoleSessionImpl& get(ConsoleSession& session);
+ static const ConsoleSessionImpl& get(const ConsoleSession& session);
+ };
}
#endif
diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp
new file mode 100644
index 0000000000..20114aaa5e
--- /dev/null
+++ b/cpp/src/qmf/EventNotifierImpl.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/EventNotifierImpl.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSessionImpl.h"
+
+EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession)
+ : readable(false), agent(agentSession)
+{
+ AgentSessionImplAccess::get(agent).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession)
+ : readable(false), console(consoleSession)
+{
+ ConsoleSessionImplAccess::get(console).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::~EventNotifierImpl()
+{
+ if (agent.isValid())
+ AgentSessionImplAccess::get(agent).setEventNotifier(NULL);
+ if (console.isValid())
+ ConsoleSessionImplAccess::get(console).setEventNotifier(NULL);
+}
+
+void EventNotifierImpl::setReadable(bool readable)
+{
+ update(readable);
+ this->readable = readable;
+}
+
+
+bool EventNotifierImpl::isReadable() const
+{
+ return this->readable;
+}
diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h
new file mode 100644
index 0000000000..d85f9979d2
--- /dev/null
+++ b/cpp/src/qmf/EventNotifierImpl.h
@@ -0,0 +1,48 @@
+#ifndef __QMF_EVENT_NOTIFIER_IMPL_H
+#define __QMF_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/AgentSession.h"
+#include "qmf/ConsoleSession.h"
+
+namespace qmf
+{
+ class EventNotifierImpl {
+ private:
+ bool readable;
+ AgentSession agent;
+ ConsoleSession console;
+
+ public:
+ EventNotifierImpl(AgentSession& agentSession);
+ EventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~EventNotifierImpl();
+
+ void setReadable(bool readable);
+ bool isReadable() const;
+
+ protected:
+ virtual void update(bool readable) = 0;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp
new file mode 100644
index 0000000000..b5c71210bb
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifier.cpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/PrivateImplRef.h"
+
+using namespace qmf;
+using namespace std;
+
+typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
+
+posix::EventNotifier::EventNotifier(AgentSession& agentSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
+}
+
+
+posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(consoleSession));
+}
+
+
+posix::EventNotifier::EventNotifier(const posix::EventNotifier& that)
+ : Handle<PosixEventNotifierImpl>()
+{
+ PI::copy(*this, that);
+}
+
+
+posix::EventNotifier::~EventNotifier()
+{
+ PI::dtor(*this);
+}
+
+posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that)
+{
+ return PI::assign(*this, that);
+}
+
+
+int posix::EventNotifier::getHandle() const
+{
+ return impl->getHandle();
+}
+
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp
new file mode 100644
index 0000000000..abc9cadcfa
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifierImpl.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "PosixEventNotifierImpl.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#define BUFFER_SIZE 10
+
+using namespace qmf;
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
+ : EventNotifierImpl(agentSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
+ : EventNotifierImpl(consoleSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::~PosixEventNotifierImpl()
+{
+ closeHandle();
+}
+
+
+void PosixEventNotifierImpl::update(bool readable)
+{
+ char buffer[BUFFER_SIZE];
+
+ if(readable && !this->isReadable()) {
+ (void) ::write(myHandle, "1", 1);
+ }
+ else if(!readable && this->isReadable()) {
+ (void) ::read(yourHandle, buffer, BUFFER_SIZE);
+ }
+}
+
+
+void PosixEventNotifierImpl::openHandle()
+{
+ int pair[2];
+
+ if(::pipe(pair) == -1)
+ throw QmfException("Unable to open event notifier handle.");
+
+ yourHandle = pair[0];
+ myHandle = pair[1];
+
+ int flags;
+
+ flags = ::fcntl(yourHandle, F_GETFL);
+ if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make remote handle non-blocking.");
+
+ flags = ::fcntl(myHandle, F_GETFL);
+ if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make local handle non-blocking.");
+}
+
+
+void PosixEventNotifierImpl::closeHandle()
+{
+ if(myHandle > 0) {
+ ::close(myHandle);
+ myHandle = -1;
+ }
+
+ if(yourHandle > 0) {
+ ::close(yourHandle);
+ yourHandle = -1;
+ }
+}
+
+
+PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
+
+const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h
new file mode 100644
index 0000000000..c8a7446bd5
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifierImpl.h
@@ -0,0 +1,61 @@
+#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/RefCounted.h"
+
+namespace qmf
+{
+ class AgentSession;
+ class ConsoleSession;
+
+ class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
+ {
+ public:
+ PosixEventNotifierImpl(AgentSession& agentSession);
+ PosixEventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~PosixEventNotifierImpl();
+
+ int getHandle() const { return yourHandle; }
+
+ private:
+ int myHandle;
+ int yourHandle;
+
+ void openHandle();
+ void closeHandle();
+
+ protected:
+ void update(bool readable);
+ };
+
+ struct PosixEventNotifierImplAccess
+ {
+ static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
+ static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
+ };
+
+}
+
+#endif
+
diff --git a/cpp/src/tests/Qmf2.cpp b/cpp/src/tests/Qmf2.cpp
index 66c774accd..bc263d5c6d 100644
--- a/cpp/src/tests/Qmf2.cpp
+++ b/cpp/src/tests/Qmf2.cpp
@@ -23,12 +23,36 @@
#include "qmf/QueryImpl.h"
#include "qmf/SchemaImpl.h"
#include "qmf/exceptions.h"
-
+#include "qpid/messaging/Connection.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSession.h"
+#include "qmf/ConsoleSessionImpl.h"
#include "unit_test.h"
+using namespace std;
using namespace qpid::types;
+using namespace qpid::messaging;
using namespace qmf;
+bool isReadable(int fd)
+{
+ fd_set rfds;
+ struct timeval tv;
+ int nfds, result;
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+ nfds = fd + 1;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ result = select(nfds, &rfds, NULL, NULL, &tv);
+
+ return result > 0;
+}
+
namespace qpid {
namespace tests {
@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
BOOST_CHECK_THROW(method.getArgument(3), QmfException);
}
+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
+{
+ Connection connection("localhost");
+ AgentSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+}
+
+QPID_AUTO_TEST_CASE(testGetHandle)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+
+ BOOST_CHECK(notifier.getHandle() > 0);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadable)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(readable);
+}
+
+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ posix::EventNotifier notifier(session);
+ for (int i = 0; i < 15; i++)
+ PosixEventNotifierImplAccess::get(notifier).setReadable(true);
+ PosixEventNotifierImplAccess::get(notifier).setReadable(false);
+
+ bool readable(isReadable(notifier.getHandle()));
+ BOOST_CHECK(!readable);
+}
+
+QPID_AUTO_TEST_CASE(testDeleteNotifier)
+{
+ Connection connection("localhost");
+ ConsoleSession session(connection, "");
+ ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
+ {
+ posix::EventNotifier notifier(session);
+ BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
+ }
+ BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests