diff options
author | Ted Ross <tross@apache.org> | 2011-09-13 19:34:38 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2011-09-13 19:34:38 +0000 |
commit | 97cde6a461403e847799ccfe55e699bc311ae24b (patch) | |
tree | 429d1d2d750d023cddac6d2291e5d66f27ba401b | |
parent | a1aec7ac41539186a093ac750ce925ca5d9aca81 (diff) | |
download | qpid-python-97cde6a461403e847799ccfe55e699bc311ae24b.tar.gz |
QPID-3484 - QMF Main-Loop Integration
Applied patch from Darryl Pierce.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1170314 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/bindings/qmf2/examples/cpp/Makefile.am | 5 | ||||
-rw-r--r-- | cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp | 107 | ||||
-rw-r--r-- | cpp/include/qmf/AgentSession.h | 1 | ||||
-rw-r--r-- | cpp/include/qmf/ConsoleSession.h | 1 | ||||
-rw-r--r-- | cpp/include/qmf/posix/EventNotifier.h | 62 | ||||
-rw-r--r-- | cpp/src/CMakeLists.txt | 5 | ||||
-rw-r--r-- | cpp/src/qmf.mk | 4 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 167 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSessionImpl.h | 175 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 17 | ||||
-rw-r--r-- | cpp/src/qmf/EventNotifierImpl.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qmf/EventNotifierImpl.h | 48 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifier.cpp | 63 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.cpp | 108 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.h | 61 | ||||
-rw-r--r-- | cpp/src/tests/Qmf2.cpp | 104 |
17 files changed, 890 insertions, 132 deletions
diff --git a/cpp/bindings/qmf2/examples/cpp/Makefile.am b/cpp/bindings/qmf2/examples/cpp/Makefile.am index 84207d43c4..062fbd0a85 100644 --- a/cpp/bindings/qmf2/examples/cpp/Makefile.am +++ b/cpp/bindings/qmf2/examples/cpp/Makefile.am @@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include AM_CPPFLAGS = $(INCLUDE) -noinst_PROGRAMS=agent list_agents print_events +noinst_PROGRAMS=agent event_driven_list_agents list_agents print_events agent_SOURCES=agent.cpp agent_LDADD=$(top_builddir)/src/libqmf2.la @@ -29,5 +29,8 @@ agent_LDADD=$(top_builddir)/src/libqmf2.la list_agents_SOURCES=list_agents.cpp list_agents_LDADD=$(top_builddir)/src/libqmf2.la +event_driven_list_agents_SOURCES=event_driven_list_agents.cpp +event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la + print_events_SOURCES=print_events.cpp print_events_LDADD=$(top_builddir)/src/libqmf2.la diff --git a/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp b/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp new file mode 100644 index 0000000000..c288aa6bdd --- /dev/null +++ b/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <sys/select.h> +#include <time.h> + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Duration.h> +#include <qmf/Agent.h> +#include <qmf/ConsoleEvent.h> +#include <qmf/ConsoleSession.h> +#include <qpid/types/Variant.h> +#include "qmf/posix/EventNotifier.h" + +#include <string> +#include <iostream> + +using namespace std; +using namespace qmf; +using qpid::types::Variant; +using qpid::messaging::Duration; + +int main(int argc, char** argv) +{ + string url("localhost"); + string connectionOptions; + string sessionOptions; + + if (argc > 1) + url = argv[1]; + if (argc > 2) + connectionOptions = argv[2]; + if (argc > 3) + sessionOptions = argv[3]; + + qpid::messaging::Connection connection(url, connectionOptions); + connection.open(); + + ConsoleSession session(connection, sessionOptions); + session.open(); + session.setAgentFilter(""); + + posix::EventNotifier notifier(session); + + int fd(notifier.getHandle()); + time_t lastUpdate; + bool ftl = false; + + time(&lastUpdate); + + while (true) { + fd_set rfds; + struct timeval tv; + int nfds, retval; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + nfds = fd + 1; + tv.tv_sec = 10; + tv.tv_usec = 0; + + retval = select(nfds, &rfds, NULL, NULL, &tv); + + if (retval > 0 && FD_ISSET(fd, &rfds)) { + ConsoleEvent event; + while (session.nextEvent(event, Duration::IMMEDIATE)) { + string eventType = ""; + switch(event.getType()) { + case CONSOLE_AGENT_ADD: eventType = "Added"; break; + case CONSOLE_AGENT_DEL: eventType = "Deleted"; break; + case CONSOLE_AGENT_RESTART: eventType = "Restarted"; break; + case CONSOLE_AGENT_SCHEMA_UPDATE: eventType = "Schema Updated"; break; + case CONSOLE_AGENT_SCHEMA_RESPONSE: eventType = "Schema Response"; break; + case CONSOLE_EVENT: eventType = "Event"; break; + case CONSOLE_QUERY_RESPONSE: eventType = "Query Response"; break; + case CONSOLE_METHOD_RESPONSE: eventType = "Method Response"; break; + case CONSOLE_EXCEPTION: eventType = "Exception"; break; + case CONSOLE_SUBSCRIBE_ADD: eventType = "Subscription Added"; break; + case CONSOLE_SUBSCRIBE_UPDATE: eventType = "Subscription Updated"; break; + case CONSOLE_SUBSCRIBE_DEL: eventType = "Subscription Deleted" ; break; + case CONSOLE_THREAD_FAILED: eventType = "Thread Failure"; break; + default: eventType = "[UNDEFINED]"; + } + cout << "Agent " << eventType << ": " << event.getAgent().getName() << endl; + } + } else { + cout << "No message received within waiting period." << endl; + } + } +} + diff --git a/cpp/include/qmf/AgentSession.h b/cpp/include/qmf/AgentSession.h index 5ecfb0412c..589d364bcc 100644 --- a/cpp/include/qmf/AgentSession.h +++ b/cpp/include/qmf/AgentSession.h @@ -188,6 +188,7 @@ namespace qmf { #ifndef SWIG private: friend class qmf::PrivateImplRef<AgentSession>; + friend struct AgentSessionImplAccess; #endif }; diff --git a/cpp/include/qmf/ConsoleSession.h b/cpp/include/qmf/ConsoleSession.h index 5e3a091e5d..022485cfa7 100644 --- a/cpp/include/qmf/ConsoleSession.h +++ b/cpp/include/qmf/ConsoleSession.h @@ -123,6 +123,7 @@ namespace qmf { #ifndef SWIG private: friend class qmf::PrivateImplRef<ConsoleSession>; + friend struct ConsoleSessionImplAccess; #endif }; diff --git a/cpp/include/qmf/posix/EventNotifier.h b/cpp/include/qmf/posix/EventNotifier.h new file mode 100644 index 0000000000..91817cc771 --- /dev/null +++ b/cpp/include/qmf/posix/EventNotifier.h @@ -0,0 +1,62 @@ +#ifndef __QMF_POSIX_EVENT_NOTIFIER_H +#define __QMF_POSIX_EVENT_NOTIFIER_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/ImportExport.h> +#include "qmf/Handle.h" +#include "qmf/AgentSession.h" +#include "qmf/ConsoleSession.h" + +namespace qmf { + + class PosixEventNotifierImpl; + class PosixEventNotifierImplAccess; + +namespace posix { + +#ifndef SWIG + template <class> class PrivateImplRef; +#endif + + class QMF_CLASS_EXTERN EventNotifier : public qmf::Handle<qmf::PosixEventNotifierImpl> { + public: + QMF_EXTERN EventNotifier(::qmf::AgentSession& agentSession); + QMF_EXTERN EventNotifier(::qmf::ConsoleSession& consoleSession); + QMF_EXTERN EventNotifier(const EventNotifier& that); + + QMF_EXTERN ~EventNotifier(); + + QMF_EXTERN EventNotifier& operator=(const EventNotifier& that); + + QMF_EXTERN int getHandle() const; + +#ifndef SWIG + private: + friend class qmf::PrivateImplRef<EventNotifier>; + friend struct qmf::PosixEventNotifierImplAccess; +#endif + + }; + +}} + +#endif + diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 49f94322d8..d8496c5122 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1100,6 +1100,7 @@ if(NOT WIN32) ../include/qmf/exceptions.h ../include/qmf/Handle.h ../include/qmf/ImportExport.h + ../include/qmf/posix/EventNotifier.h ../include/qmf/Query.h ../include/qmf/Schema.h ../include/qmf/SchemaId.h @@ -1129,6 +1130,10 @@ if(NOT WIN32) qmf/DataAddrImpl.h qmf/Data.cpp qmf/DataImpl.h + qmf/EventNotifierImpl.h + qmf/EventNotifierImpl.cpp + qmf/PosixEventNotifier.cpp + qmf/PosixEventNotifierImpl.cpp qmf/exceptions.cpp qmf/Expression.cpp qmf/Expression.h diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index f3462f1a93..4da8470f2f 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -43,6 +43,7 @@ QMF2_API = \ ../include/qmf/ConsoleSession.h \ ../include/qmf/DataAddr.h \ ../include/qmf/Data.h \ + ../include/qmf/posix/EventNotifier.h \ ../include/qmf/exceptions.h \ ../include/qmf/Handle.h \ ../include/qmf/ImportExport.h \ @@ -104,6 +105,9 @@ libqmf2_la_SOURCES = \ qmf/DataAddrImpl.h \ qmf/Data.cpp \ qmf/DataImpl.h \ + qmf/EventNotifierImpl.cpp \ + qmf/PosixEventNotifier.cpp \ + qmf/PosixEventNotifierImpl.cpp \ qmf/exceptions.cpp \ qmf/Expression.cpp \ qmf/Expression.h \ diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index a88782d107..d92b2a41d3 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -19,134 +19,7 @@ * */ -#include "qpid/RefCounted.h" -#include "qmf/PrivateImplRef.h" -#include "qmf/exceptions.h" -#include "qmf/AgentSession.h" -#include "qmf/AgentEventImpl.h" -#include "qmf/SchemaIdImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/DataAddrImpl.h" -#include "qmf/DataImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/agentCapability.h" -#include "qmf/constants.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/AddressParser.h" -#include "qpid/management/Buffer.h" -#include <queue> -#include <map> -#include <set> -#include <iostream> -#include <memory> - -using namespace std; -using namespace qpid::messaging; -using namespace qmf; -using qpid::types::Variant; - -namespace qmf { - class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { - public: - ~AgentSessionImpl(); - - // - // Methods from API handle - // - AgentSessionImpl(Connection& c, const string& o); - void setDomain(const string& d) { checkOpen(); domain = d; } - void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } - void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } - void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } - void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } - const string& getName() const { return agentName; } - void open(); - void close(); - bool nextEvent(AgentEvent& e, Duration t); - int pendingEvents() const; - - void registerSchema(Schema& s); - DataAddr addData(Data& d, const string& n, bool persist); - void delData(const DataAddr&); - - void authAccept(AgentEvent& e); - void authReject(AgentEvent& e, const string& m); - void raiseException(AgentEvent& e, const string& s); - void raiseException(AgentEvent& e, const Data& d); - void response(AgentEvent& e, const Data& d); - void complete(AgentEvent& e); - void methodSuccess(AgentEvent& e); - void raiseEvent(const Data& d); - void raiseEvent(const Data& d, int s); - - private: - typedef map<DataAddr, Data, DataAddrCompare> DataIndex; - typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; - - mutable qpid::sys::Mutex lock; - qpid::sys::Condition cond; - Connection connection; - Session session; - Sender directSender; - Sender topicSender; - string domain; - Variant::Map attributes; - Variant::Map options; - string agentName; - bool opened; - queue<AgentEvent> eventQueue; - qpid::sys::Thread* thread; - bool threadCanceled; - uint32_t bootSequence; - uint32_t interval; - uint64_t lastHeartbeat; - uint64_t lastVisit; - bool forceHeartbeat; - bool externalStorage; - bool autoAllowQueries; - bool autoAllowMethods; - uint32_t maxSubscriptions; - uint32_t minSubInterval; - uint32_t subLifetime; - bool publicEvents; - bool listenOnDirect; - bool strictSecurity; - uint32_t maxThreadWaitTime; - uint64_t schemaUpdateTime; - string directBase; - string topicBase; - - SchemaMap schemata; - DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; - - void checkOpen(); - void setAgentName(); - void enqueueEvent(const AgentEvent&); - void handleLocateRequest(const Variant::List& content, const Message& msg); - void handleMethodRequest(const Variant::Map& content, const Message& msg); - void handleQueryRequest(const Variant::Map& content, const Message& msg); - void handleSchemaRequest(AgentEvent&); - void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); - void dispatch(Message); - void sendHeartbeat(); - void send(Message, const Address&); - void flushResponses(AgentEvent&, bool); - void periodicProcessing(uint64_t); - void run(); - }; -} - -typedef qmf::PrivateImplRef<AgentSession> PI; +#include "qmf/AgentSessionImpl.h" AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } @@ -345,6 +218,8 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -359,6 +234,19 @@ int AgentSessionImpl::pendingEvents() const } +void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + eventNotifier = notifier; +} + +EventNotifierImpl* AgentSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventNotifier; +} + + void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) @@ -614,8 +502,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event) qpid::sys::Mutex::ScopedLock l(lock); bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -1059,6 +949,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) } +void AgentSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void AgentSessionImpl::run() { QPID_LOG(debug, "AgentSession thread started for agent " << agentName); @@ -1089,3 +986,15 @@ void AgentSessionImpl::run() QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } + +AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session) +{ + return *session.impl; +} + + +const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session) +{ + return *session.impl; +} + diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h new file mode 100644 index 0000000000..cf1b1d770f --- /dev/null +++ b/cpp/src/qmf/AgentSessionImpl.h @@ -0,0 +1,175 @@ +#ifndef __QMF_AGENT_SESSION_IMPL_H +#define __QMF_AGENT_SESSION_IMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentEventImpl.h" +#include "qmf/EventNotifierImpl.h" +#include "qpid/messaging/Connection.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/AgentSession.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/DataImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" +#include "qmf/constants.h" + +#include <queue> +#include <map> +#include <iostream> +#include <memory> + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; +using namespace boost; + +typedef qmf::PrivateImplRef<AgentSession> PI; + +namespace qmf { + class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~AgentSessionImpl(); + + // + // Methods from API handle + // + AgentSessionImpl(Connection& c, const string& o); + void setDomain(const string& d) { checkOpen(); domain = d; } + void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } + void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } + void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } + void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } + const string& getName() const { return agentName; } + void open(); + void close(); + bool nextEvent(AgentEvent& e, Duration t); + int pendingEvents() const; + + void setEventNotifier(EventNotifierImpl* eventNotifier); + EventNotifierImpl* getEventNotifier() const; + + void registerSchema(Schema& s); + DataAddr addData(Data& d, const string& n, bool persist); + void delData(const DataAddr&); + + void authAccept(AgentEvent& e); + void authReject(AgentEvent& e, const string& m); + void raiseException(AgentEvent& e, const string& s); + void raiseException(AgentEvent& e, const Data& d); + void response(AgentEvent& e, const Data& d); + void complete(AgentEvent& e); + void methodSuccess(AgentEvent& e); + void raiseEvent(const Data& d); + void raiseEvent(const Data& d, int s); + + private: + typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; + + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + Connection connection; + Session session; + Sender directSender; + Sender topicSender; + string domain; + Variant::Map attributes; + Variant::Map options; + string agentName; + bool opened; + queue<AgentEvent> eventQueue; + EventNotifierImpl* eventNotifier; + qpid::sys::Thread* thread; + bool threadCanceled; + uint32_t bootSequence; + uint32_t interval; + uint64_t lastHeartbeat; + uint64_t lastVisit; + bool forceHeartbeat; + bool externalStorage; + bool autoAllowQueries; + bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; + bool listenOnDirect; + bool strictSecurity; + uint32_t maxThreadWaitTime; + uint64_t schemaUpdateTime; + string directBase; + string topicBase; + + SchemaMap schemata; + DataIndex globalIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; + + void checkOpen(); + void setAgentName(); + void enqueueEvent(const AgentEvent&); + void alertEventNotifierLH(bool readable); + void handleLocateRequest(const Variant::List& content, const Message& msg); + void handleMethodRequest(const Variant::Map& content, const Message& msg); + void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); + void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); + void dispatch(Message); + void sendHeartbeat(); + void send(Message, const Address&); + void flushResponses(AgentEvent&, bool); + void periodicProcessing(uint64_t); + void run(); + }; + + struct AgentSessionImplAccess { + static AgentSessionImpl& get(AgentSession& session); + static const AgentSessionImpl& get(const AgentSession& session); + }; +} + + +#endif + diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index af835959cf..d084b8a8eb 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -237,6 +237,8 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -251,6 +253,20 @@ int ConsoleSessionImpl::pendingEvents() const } +void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + this->eventNotifier = notifier; +} + + +EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return this->eventNotifier; +} + + uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -292,8 +308,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -602,6 +620,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) } +void ConsoleSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); @@ -633,3 +658,14 @@ void ConsoleSessionImpl::run() QPID_LOG(debug, "ConsoleSession thread exiting"); } + +ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session) +{ + return *session.impl; +} + + +const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session) +{ + return *session.impl; +} diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index 478d24e56b..660fc9b83f 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -27,6 +27,7 @@ #include "qmf/SchemaId.h" #include "qmf/Schema.h" #include "qmf/ConsoleEventImpl.h" +#include "qmf/EventNotifierImpl.h" #include "qmf/SchemaCache.h" #include "qmf/Query.h" #include "qpid/sys/Mutex.h" @@ -41,9 +42,14 @@ #include "qpid/messaging/Address.h" #include "qpid/management/Buffer.h" #include "qpid/types/Variant.h" + +#include <boost/shared_ptr.hpp> #include <map> #include <queue> +using namespace boost; +using namespace std; + namespace qmf { class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { public: @@ -59,6 +65,10 @@ namespace qmf { void close(); bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t); int pendingEvents() const; + + void setEventNotifier(EventNotifierImpl* notifier); + EventNotifierImpl* getEventNotifier() const; + uint32_t getAgentCount() const; Agent getAgent(uint32_t i) const; Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } @@ -80,6 +90,7 @@ namespace qmf { Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; + EventNotifierImpl* eventNotifier; qpid::sys::Thread* thread; bool threadCanceled; uint64_t lastVisit; @@ -102,11 +113,17 @@ namespace qmf { void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); + void alertEventNotifierLH(bool readable); void run(); uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; + + struct ConsoleSessionImplAccess { + static ConsoleSessionImpl& get(ConsoleSession& session); + static const ConsoleSessionImpl& get(const ConsoleSession& session); + }; } #endif diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp new file mode 100644 index 0000000000..20114aaa5e --- /dev/null +++ b/cpp/src/qmf/EventNotifierImpl.cpp @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/EventNotifierImpl.h" +#include "qmf/AgentSessionImpl.h" +#include "qmf/ConsoleSessionImpl.h" + +EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession) + : readable(false), agent(agentSession) +{ + AgentSessionImplAccess::get(agent).setEventNotifier(this); +} + + +EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession) + : readable(false), console(consoleSession) +{ + ConsoleSessionImplAccess::get(console).setEventNotifier(this); +} + + +EventNotifierImpl::~EventNotifierImpl() +{ + if (agent.isValid()) + AgentSessionImplAccess::get(agent).setEventNotifier(NULL); + if (console.isValid()) + ConsoleSessionImplAccess::get(console).setEventNotifier(NULL); +} + +void EventNotifierImpl::setReadable(bool readable) +{ + update(readable); + this->readable = readable; +} + + +bool EventNotifierImpl::isReadable() const +{ + return this->readable; +} diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h new file mode 100644 index 0000000000..d85f9979d2 --- /dev/null +++ b/cpp/src/qmf/EventNotifierImpl.h @@ -0,0 +1,48 @@ +#ifndef __QMF_EVENT_NOTIFIER_IMPL_H +#define __QMF_EVENT_NOTIFIER_IMPL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/AgentSession.h" +#include "qmf/ConsoleSession.h" + +namespace qmf +{ + class EventNotifierImpl { + private: + bool readable; + AgentSession agent; + ConsoleSession console; + + public: + EventNotifierImpl(AgentSession& agentSession); + EventNotifierImpl(ConsoleSession& consoleSession); + virtual ~EventNotifierImpl(); + + void setReadable(bool readable); + bool isReadable() const; + + protected: + virtual void update(bool readable) = 0; + }; +} + +#endif + diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp new file mode 100644 index 0000000000..b5c71210bb --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifier.cpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/posix/EventNotifier.h" +#include "qmf/PosixEventNotifierImpl.h" +#include "qmf/PrivateImplRef.h" + +using namespace qmf; +using namespace std; + +typedef qmf::PrivateImplRef<posix::EventNotifier> PI; + +posix::EventNotifier::EventNotifier(AgentSession& agentSession) +{ + PI::ctor(*this, new PosixEventNotifierImpl(agentSession)); +} + + +posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession) +{ + PI::ctor(*this, new PosixEventNotifierImpl(consoleSession)); +} + + +posix::EventNotifier::EventNotifier(const posix::EventNotifier& that) + : Handle<PosixEventNotifierImpl>() +{ + PI::copy(*this, that); +} + + +posix::EventNotifier::~EventNotifier() +{ + PI::dtor(*this); +} + +posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that) +{ + return PI::assign(*this, that); +} + + +int posix::EventNotifier::getHandle() const +{ + return impl->getHandle(); +} + diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp new file mode 100644 index 0000000000..abc9cadcfa --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifierImpl.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "PosixEventNotifierImpl.h" + +#include <fcntl.h> +#include <unistd.h> + +#define BUFFER_SIZE 10 + +using namespace qmf; + +PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession) + : EventNotifierImpl(agentSession) +{ + openHandle(); +} + + +PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession) + : EventNotifierImpl(consoleSession) +{ + openHandle(); +} + + +PosixEventNotifierImpl::~PosixEventNotifierImpl() +{ + closeHandle(); +} + + +void PosixEventNotifierImpl::update(bool readable) +{ + char buffer[BUFFER_SIZE]; + + if(readable && !this->isReadable()) { + (void) ::write(myHandle, "1", 1); + } + else if(!readable && this->isReadable()) { + (void) ::read(yourHandle, buffer, BUFFER_SIZE); + } +} + + +void PosixEventNotifierImpl::openHandle() +{ + int pair[2]; + + if(::pipe(pair) == -1) + throw QmfException("Unable to open event notifier handle."); + + yourHandle = pair[0]; + myHandle = pair[1]; + + int flags; + + flags = ::fcntl(yourHandle, F_GETFL); + if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1) + throw QmfException("Unable to make remote handle non-blocking."); + + flags = ::fcntl(myHandle, F_GETFL); + if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1) + throw QmfException("Unable to make local handle non-blocking."); +} + + +void PosixEventNotifierImpl::closeHandle() +{ + if(myHandle > 0) { + ::close(myHandle); + myHandle = -1; + } + + if(yourHandle > 0) { + ::close(yourHandle); + yourHandle = -1; + } +} + + +PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier) +{ + return *notifier.impl; +} + + +const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier) +{ + return *notifier.impl; +} + diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h new file mode 100644 index 0000000000..c8a7446bd5 --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifierImpl.h @@ -0,0 +1,61 @@ +#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H +#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/posix/EventNotifier.h" +#include "qmf/EventNotifierImpl.h" +#include "qpid/RefCounted.h" + +namespace qmf +{ + class AgentSession; + class ConsoleSession; + + class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted + { + public: + PosixEventNotifierImpl(AgentSession& agentSession); + PosixEventNotifierImpl(ConsoleSession& consoleSession); + virtual ~PosixEventNotifierImpl(); + + int getHandle() const { return yourHandle; } + + private: + int myHandle; + int yourHandle; + + void openHandle(); + void closeHandle(); + + protected: + void update(bool readable); + }; + + struct PosixEventNotifierImplAccess + { + static PosixEventNotifierImpl& get(posix::EventNotifier& notifier); + static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier); + }; + +} + +#endif + diff --git a/cpp/src/tests/Qmf2.cpp b/cpp/src/tests/Qmf2.cpp index 66c774accd..bc263d5c6d 100644 --- a/cpp/src/tests/Qmf2.cpp +++ b/cpp/src/tests/Qmf2.cpp @@ -23,12 +23,36 @@ #include "qmf/QueryImpl.h" #include "qmf/SchemaImpl.h" #include "qmf/exceptions.h" - +#include "qpid/messaging/Connection.h" +#include "qmf/PosixEventNotifierImpl.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentSessionImpl.h" +#include "qmf/ConsoleSession.h" +#include "qmf/ConsoleSessionImpl.h" #include "unit_test.h" +using namespace std; using namespace qpid::types; +using namespace qpid::messaging; using namespace qmf; +bool isReadable(int fd) +{ + fd_set rfds; + struct timeval tv; + int nfds, result; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + nfds = fd + 1; + tv.tv_sec = 0; + tv.tv_usec = 0; + + result = select(nfds, &rfds, NULL, NULL, &tv); + + return result > 0; +} + namespace qpid { namespace tests { @@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema) BOOST_CHECK_THROW(method.getArgument(3), QmfException); } +QPID_AUTO_TEST_CASE(testAgentSessionEventListener) +{ + Connection connection("localhost"); + AgentSession session(connection, ""); + posix::EventNotifier notifier(session); + + AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session); + + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); +} + +QPID_AUTO_TEST_CASE(testConsoleSessionEventListener) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + + ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session); + + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); +} + +QPID_AUTO_TEST_CASE(testGetHandle) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + + BOOST_CHECK(notifier.getHandle() > 0); +} + +QPID_AUTO_TEST_CASE(testSetReadableToFalse) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + PosixEventNotifierImplAccess::get(notifier).setReadable(false); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(!readable); +} + +QPID_AUTO_TEST_CASE(testSetReadable) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + PosixEventNotifierImplAccess::get(notifier).setReadable(true); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(readable); +} + +QPID_AUTO_TEST_CASE(testSetReadableMultiple) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + posix::EventNotifier notifier(session); + for (int i = 0; i < 15; i++) + PosixEventNotifierImplAccess::get(notifier).setReadable(true); + PosixEventNotifierImplAccess::get(notifier).setReadable(false); + + bool readable(isReadable(notifier.getHandle())); + BOOST_CHECK(!readable); +} + +QPID_AUTO_TEST_CASE(testDeleteNotifier) +{ + Connection connection("localhost"); + ConsoleSession session(connection, ""); + ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session); + { + posix::EventNotifier notifier(session); + BOOST_CHECK(sessionImpl.getEventNotifier() != 0); + } + BOOST_CHECK(sessionImpl.getEventNotifier() == 0); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |