diff options
author | Ted Ross <tross@apache.org> | 2010-01-13 11:57:19 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-01-13 11:57:19 +0000 |
commit | fd64f22be60f12e03df8974b547cd9b3af331601 (patch) | |
tree | e2d9cafd894623102b5b1384d00af75db4dc88ff /qpid/cpp/src | |
parent | 26280399738211de3f472b625091ff22c762538b (diff) | |
download | qpid-python-fd64f22be60f12e03df8974b547cd9b3af331601.tar.gz |
Added raise_event support to the Ruby and Python wrapped agent APIs.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@898727 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qmf.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/EventImpl.cpp | 106 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/EventImpl.h | 53 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ResilientConnection.cpp | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SchemaImpl.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/SchemaImpl.h | 6 |
7 files changed, 193 insertions, 10 deletions
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 96a977f3cd..1e4c59b19e 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -73,6 +73,8 @@ libqmfengine_la_SOURCES = \ qmf/engine/ConnectionSettingsImpl.h \ qmf/engine/ConsoleImpl.cpp \ qmf/engine/ConsoleImpl.h \ + qmf/engine/EventImpl.cpp \ + qmf/engine/EventImpl.h \ qmf/engine/MessageImpl.cpp \ qmf/engine/MessageImpl.h \ qmf/engine/ObjectIdImpl.cpp \ diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index c5d1bff2e0..fe9b84c565 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -21,6 +21,7 @@ #include "qmf/engine/MessageImpl.h" #include "qmf/engine/SchemaImpl.h" #include "qmf/engine/Typecode.h" +#include "qmf/engine/EventImpl.h" #include "qmf/engine/ObjectImpl.h" #include "qmf/engine/ObjectIdImpl.h" #include "qmf/engine/QueryImpl.h" @@ -476,9 +477,19 @@ const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistI return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); } -void AgentImpl::raiseEvent(Event&) +void AgentImpl::raiseEvent(Event& event) { Mutex::ScopedLock _lock(lock); + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION); + + event.impl->encodeSchemaKey(buffer); + buffer.putLongLong(uint64_t(Duration(now()))); + event.impl->encode(buffer); + string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank)); + + sendBufferLH(buffer, QMF_EXCHANGE, key); + QPID_LOG(trace, "SENT EventIndication"); } AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp new file mode 100644 index 0000000000..6bdda9321e --- /dev/null +++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/engine/EventImpl.h> +#include <qmf/engine/ValueImpl.h> + +using namespace std; +using namespace qmf::engine; +using qpid::framing::Buffer; + +EventImpl::EventImpl(const SchemaEventClass* type) : eventClass(type) +{ + int argCount = eventClass->getArgumentCount(); + int idx; + + for (idx = 0; idx < argCount; idx++) { + const SchemaArgument* arg = eventClass->getArgument(idx); + arguments[arg->getName()] = ValuePtr(new Value(arg->getType())); + } +} + + +EventImpl::EventImpl(const SchemaEventClass* type, Buffer&) : + eventClass(type) +{ +} + + +Event* EventImpl::factory(const SchemaEventClass* type, Buffer& buffer) +{ + EventImpl* impl(new EventImpl(type, buffer)); + return new Event(impl); +} + + +Value* EventImpl::getValue(const char* key) const +{ + map<string, ValuePtr>::const_iterator iter; + + iter = arguments.find(key); + if (iter != arguments.end()) + return iter->second.get(); + + return 0; +} + + +void EventImpl::encodeSchemaKey(Buffer& buffer) const +{ + buffer.putShortString(eventClass->getClassKey()->getPackageName()); + buffer.putShortString(eventClass->getClassKey()->getClassName()); + buffer.putBin128(const_cast<uint8_t*>(eventClass->getClassKey()->getHash())); +} + + +void EventImpl::encode(Buffer& buffer) const +{ + buffer.putOctet((uint8_t) eventClass->getSeverity()); + + int argCount = eventClass->getArgumentCount(); + for (int idx = 0; idx < argCount; idx++) { + const SchemaArgument* arg = eventClass->getArgument(idx); + ValuePtr value = arguments[arg->getName()]; + value->impl->encode(buffer); + } +} + + +string EventImpl::getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const +{ + stringstream key; + + key << "console.event." << brokerBank << "." << agentBank << "." << + eventClass->getClassKey()->getPackageName() << "." << + eventClass->getClassKey()->getClassName(); + return key.str(); +} + + +//================================================================== +// Wrappers +//================================================================== + +Event::Event(const SchemaEventClass* type) : impl(new EventImpl(type)) {} +Event::Event(EventImpl* i) : impl(i) {} +Event::Event(const Event& from) : impl(new EventImpl(*(from.impl))) {} +Event::~Event() { delete impl; } +const SchemaEventClass* Event::getClass() const { return impl->getClass(); } +Value* Event::getValue(const char* key) const { return impl->getValue(key); } + diff --git a/qpid/cpp/src/qmf/engine/EventImpl.h b/qpid/cpp/src/qmf/engine/EventImpl.h new file mode 100644 index 0000000000..dfdf64e848 --- /dev/null +++ b/qpid/cpp/src/qmf/engine/EventImpl.h @@ -0,0 +1,53 @@ +#ifndef _QmfEngineEventImpl_ +#define _QmfEngineEventImpl_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qmf/engine/Event.h> +#include <qmf/engine/Schema.h> +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <map> + +namespace qmf { +namespace engine { + + struct EventImpl { + typedef boost::shared_ptr<Value> ValuePtr; + const SchemaEventClass* eventClass; + mutable std::map<std::string, ValuePtr> arguments; + + EventImpl(const SchemaEventClass* type); + EventImpl(const SchemaEventClass* type, qpid::framing::Buffer& buffer); + static Event* factory(const SchemaEventClass* type, qpid::framing::Buffer& buffer); + + const SchemaEventClass* getClass() const { return eventClass; } + Value* getValue(const char* key) const; + + void encodeSchemaKey(qpid::framing::Buffer& buffer) const; + void encode(qpid::framing::Buffer& buffer) const; + std::string getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const; + }; + +} +} + +#endif + diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp index 53524fdbd8..9c19e4d460 100644 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp @@ -39,6 +39,8 @@ #include <set> #include <boost/intrusive_ptr.hpp> #include <boost/noncopyable.hpp> +#include <unistd.h> +#include <fcntl.h> using namespace std; using namespace qmf::engine; @@ -330,6 +332,10 @@ void ResilientConnectionImpl::unbind(SessionHandle handle, void ResilientConnectionImpl::setNotifyFd(int fd) { notifyFd = fd; + if (notifyFd > 0) { + int original = fcntl(notifyFd, F_GETFL); + fcntl(notifyFd, F_SETFL, O_NONBLOCK | original); + } } void ResilientConnectionImpl::run() @@ -403,13 +409,16 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k const MessageImpl& message, const string& errorText) { - Mutex::ScopedLock _lock(lock); - ResilientConnectionEventImpl event(kind, message); + { + Mutex::ScopedLock _lock(lock); + ResilientConnectionEventImpl event(kind, message); - event.sessionContext = sessionContext; - event.errorText = errorText; + event.sessionContext = sessionContext; + event.errorText = errorText; + + eventQueue.push_back(event); + } - eventQueue.push_back(event); if (notifyFd != -1) { int unused_ret; //Suppress warnings about ignoring return value. diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index c37ec34890..249a08ba7f 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -459,7 +459,6 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), clas buffer.getShortString(package); buffer.getShortString(name); hash.decode(buffer); - buffer.putOctet(0); // No parent class uint16_t argCount = buffer.getShort(); @@ -598,13 +597,14 @@ const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return imp const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); } const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); } -SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {} +SchemaEventClass::SchemaEventClass(const char* package, const char* name, Severity s) : impl(new SchemaEventClassImpl(package, name, s)) {} SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {} SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {} SchemaEventClass::~SchemaEventClass() { delete impl; } void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); } void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); } const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); } +Severity SchemaEventClass::getSeverity() const { return impl->getSeverity(); } int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); } const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); } diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index af3a1d98e4..7be757ee8d 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -201,10 +201,11 @@ namespace engine { mutable bool hasHash; std::auto_ptr<SchemaClassKey> classKey; std::string description; + Severity severity; std::vector<const SchemaArgument*> arguments; - SchemaEventClassImpl(const char* p, const char* n) : - package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {} + SchemaEventClassImpl(const char* p, const char* n, Severity sev) : + package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)), severity(sev) {} SchemaEventClassImpl(qpid::framing::Buffer& buffer); static SchemaEventClass* factory(qpid::framing::Buffer& buffer); @@ -213,6 +214,7 @@ namespace engine { void setDesc(const char* desc) { description = desc; } const SchemaClassKey* getClassKey() const; + Severity getSeverity() const { return severity; } int getArgumentCount() const { return arguments.size(); } const SchemaArgument* getArgument(int idx) const; }; |