summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-01-13 11:57:19 +0000
committerTed Ross <tross@apache.org>2010-01-13 11:57:19 +0000
commitfd64f22be60f12e03df8974b547cd9b3af331601 (patch)
treee2d9cafd894623102b5b1384d00af75db4dc88ff /qpid/cpp/src
parent26280399738211de3f472b625091ff22c762538b (diff)
downloadqpid-python-fd64f22be60f12e03df8974b547cd9b3af331601.tar.gz
Added raise_event support to the Ruby and Python wrapped agent APIs.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@898727 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qmf.mk2
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp13
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.cpp106
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.h53
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp19
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp4
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h6
7 files changed, 193 insertions, 10 deletions
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 96a977f3cd..1e4c59b19e 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -73,6 +73,8 @@ libqmfengine_la_SOURCES = \
qmf/engine/ConnectionSettingsImpl.h \
qmf/engine/ConsoleImpl.cpp \
qmf/engine/ConsoleImpl.h \
+ qmf/engine/EventImpl.cpp \
+ qmf/engine/EventImpl.h \
qmf/engine/MessageImpl.cpp \
qmf/engine/MessageImpl.h \
qmf/engine/ObjectIdImpl.cpp \
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index c5d1bff2e0..fe9b84c565 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -21,6 +21,7 @@
#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/Typecode.h"
+#include "qmf/engine/EventImpl.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
@@ -476,9 +477,19 @@ const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistI
return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
}
-void AgentImpl::raiseEvent(Event&)
+void AgentImpl::raiseEvent(Event& event)
{
Mutex::ScopedLock _lock(lock);
+ Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
+ Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
+
+ event.impl->encodeSchemaKey(buffer);
+ buffer.putLongLong(uint64_t(Duration(now())));
+ event.impl->encode(buffer);
+ string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
+
+ sendBufferLH(buffer, QMF_EXCHANGE, key);
+ QPID_LOG(trace, "SENT EventIndication");
}
AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp
new file mode 100644
index 0000000000..6bdda9321e
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/engine/EventImpl.h>
+#include <qmf/engine/ValueImpl.h>
+
+using namespace std;
+using namespace qmf::engine;
+using qpid::framing::Buffer;
+
+EventImpl::EventImpl(const SchemaEventClass* type) : eventClass(type)
+{
+ int argCount = eventClass->getArgumentCount();
+ int idx;
+
+ for (idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = eventClass->getArgument(idx);
+ arguments[arg->getName()] = ValuePtr(new Value(arg->getType()));
+ }
+}
+
+
+EventImpl::EventImpl(const SchemaEventClass* type, Buffer&) :
+ eventClass(type)
+{
+}
+
+
+Event* EventImpl::factory(const SchemaEventClass* type, Buffer& buffer)
+{
+ EventImpl* impl(new EventImpl(type, buffer));
+ return new Event(impl);
+}
+
+
+Value* EventImpl::getValue(const char* key) const
+{
+ map<string, ValuePtr>::const_iterator iter;
+
+ iter = arguments.find(key);
+ if (iter != arguments.end())
+ return iter->second.get();
+
+ return 0;
+}
+
+
+void EventImpl::encodeSchemaKey(Buffer& buffer) const
+{
+ buffer.putShortString(eventClass->getClassKey()->getPackageName());
+ buffer.putShortString(eventClass->getClassKey()->getClassName());
+ buffer.putBin128(const_cast<uint8_t*>(eventClass->getClassKey()->getHash()));
+}
+
+
+void EventImpl::encode(Buffer& buffer) const
+{
+ buffer.putOctet((uint8_t) eventClass->getSeverity());
+
+ int argCount = eventClass->getArgumentCount();
+ for (int idx = 0; idx < argCount; idx++) {
+ const SchemaArgument* arg = eventClass->getArgument(idx);
+ ValuePtr value = arguments[arg->getName()];
+ value->impl->encode(buffer);
+ }
+}
+
+
+string EventImpl::getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const
+{
+ stringstream key;
+
+ key << "console.event." << brokerBank << "." << agentBank << "." <<
+ eventClass->getClassKey()->getPackageName() << "." <<
+ eventClass->getClassKey()->getClassName();
+ return key.str();
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+
+Event::Event(const SchemaEventClass* type) : impl(new EventImpl(type)) {}
+Event::Event(EventImpl* i) : impl(i) {}
+Event::Event(const Event& from) : impl(new EventImpl(*(from.impl))) {}
+Event::~Event() { delete impl; }
+const SchemaEventClass* Event::getClass() const { return impl->getClass(); }
+Value* Event::getValue(const char* key) const { return impl->getValue(key); }
+
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.h b/qpid/cpp/src/qmf/engine/EventImpl.h
new file mode 100644
index 0000000000..dfdf64e848
--- /dev/null
+++ b/qpid/cpp/src/qmf/engine/EventImpl.h
@@ -0,0 +1,53 @@
+#ifndef _QmfEngineEventImpl_
+#define _QmfEngineEventImpl_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qmf/engine/Event.h>
+#include <qmf/engine/Schema.h>
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qmf {
+namespace engine {
+
+ struct EventImpl {
+ typedef boost::shared_ptr<Value> ValuePtr;
+ const SchemaEventClass* eventClass;
+ mutable std::map<std::string, ValuePtr> arguments;
+
+ EventImpl(const SchemaEventClass* type);
+ EventImpl(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
+ static Event* factory(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
+
+ const SchemaEventClass* getClass() const { return eventClass; }
+ Value* getValue(const char* key) const;
+
+ void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
+ void encode(qpid::framing::Buffer& buffer) const;
+ std::string getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const;
+ };
+
+}
+}
+
+#endif
+
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
index 53524fdbd8..9c19e4d460 100644
--- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
+++ b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -39,6 +39,8 @@
#include <set>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
+#include <unistd.h>
+#include <fcntl.h>
using namespace std;
using namespace qmf::engine;
@@ -330,6 +332,10 @@ void ResilientConnectionImpl::unbind(SessionHandle handle,
void ResilientConnectionImpl::setNotifyFd(int fd)
{
notifyFd = fd;
+ if (notifyFd > 0) {
+ int original = fcntl(notifyFd, F_GETFL);
+ fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
+ }
}
void ResilientConnectionImpl::run()
@@ -403,13 +409,16 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
const MessageImpl& message,
const string& errorText)
{
- Mutex::ScopedLock _lock(lock);
- ResilientConnectionEventImpl event(kind, message);
+ {
+ Mutex::ScopedLock _lock(lock);
+ ResilientConnectionEventImpl event(kind, message);
- event.sessionContext = sessionContext;
- event.errorText = errorText;
+ event.sessionContext = sessionContext;
+ event.errorText = errorText;
+
+ eventQueue.push_back(event);
+ }
- eventQueue.push_back(event);
if (notifyFd != -1)
{
int unused_ret; //Suppress warnings about ignoring return value.
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index c37ec34890..249a08ba7f 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -459,7 +459,6 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), clas
buffer.getShortString(package);
buffer.getShortString(name);
hash.decode(buffer);
- buffer.putOctet(0); // No parent class
uint16_t argCount = buffer.getShort();
@@ -598,13 +597,14 @@ const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return imp
const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); }
const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); }
-SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {}
+SchemaEventClass::SchemaEventClass(const char* package, const char* name, Severity s) : impl(new SchemaEventClassImpl(package, name, s)) {}
SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {}
SchemaEventClass::~SchemaEventClass() { delete impl; }
void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); }
const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); }
+Severity SchemaEventClass::getSeverity() const { return impl->getSeverity(); }
int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); }
const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); }
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index af3a1d98e4..7be757ee8d 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -201,10 +201,11 @@ namespace engine {
mutable bool hasHash;
std::auto_ptr<SchemaClassKey> classKey;
std::string description;
+ Severity severity;
std::vector<const SchemaArgument*> arguments;
- SchemaEventClassImpl(const char* p, const char* n) :
- package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
+ SchemaEventClassImpl(const char* p, const char* n, Severity sev) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)), severity(sev) {}
SchemaEventClassImpl(qpid::framing::Buffer& buffer);
static SchemaEventClass* factory(qpid::framing::Buffer& buffer);
@@ -213,6 +214,7 @@ namespace engine {
void setDesc(const char* desc) { description = desc; }
const SchemaClassKey* getClassKey() const;
+ Severity getSeverity() const { return severity; }
int getArgumentCount() const { return arguments.size(); }
const SchemaArgument* getArgument(int idx) const;
};