diff options
Diffstat (limited to 'qpid/cpp/bindings/qmf/python')
-rw-r--r-- | qpid/cpp/bindings/qmf/python/Makefile.am | 51 | ||||
-rw-r--r-- | qpid/cpp/bindings/qmf/python/python.i | 143 | ||||
-rw-r--r-- | qpid/cpp/bindings/qmf/python/qmf.py | 1680 |
3 files changed, 1874 insertions, 0 deletions
diff --git a/qpid/cpp/bindings/qmf/python/Makefile.am b/qpid/cpp/bindings/qmf/python/Makefile.am new file mode 100644 index 0000000000..8abad32959 --- /dev/null +++ b/qpid/cpp/bindings/qmf/python/Makefile.am @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if HAVE_PYTHON_DEVEL + +INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src/qmf -I$(top_srcdir)/src -I$(top_builddir)/src + +generated_file_list = \ + qmfengine.cpp \ + qmfengine.py + +EXTRA_DIST = python.i +BUILT_SOURCES = $(generated_file_list) +SWIG_FLAGS = -w362,401 + +$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmfengine.i + $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i + +pylibdir = $(PYTHON_LIB) + +lib_LTLIBRARIES = _qmfengine.la +qenginedir = $(pyexecdir) +qengine_PYTHON = qmfengine.py qmf.py + +#_qmfengine_la_LDFLAGS = -avoid-version -module -shrext "$(PYTHON_SO)" +#_qmfengine_la_LDFLAGS = -avoid-version -module -shrext ".so" +_qmfengine_la_LDFLAGS = -avoid-version -module -shared +_qmfengine_la_LIBADD = $(PYTHON_LIBS) -L$(top_builddir)/src/.libs -lqpidclient $(top_builddir)/src/libqmf.la +_qmfengine_la_CXXFLAGS = $(INCLUDES) -I$(srcdir)/qmf -I$(PYTHON_INC) -fno-strict-aliasing +nodist__qmfengine_la_SOURCES = qmfengine.cpp + +CLEANFILES = $(generated_file_list) + +endif # HAVE_PYTHON_DEVEL + diff --git a/qpid/cpp/bindings/qmf/python/python.i b/qpid/cpp/bindings/qmf/python/python.i new file mode 100644 index 0000000000..5e25d155f9 --- /dev/null +++ b/qpid/cpp/bindings/qmf/python/python.i @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +%module qmfengine + + +/* unsigned32 Convert from Python --> C */ +%typemap(in) uint32_t { + if (PyInt_Check($input)) { + $1 = (uint32_t) PyInt_AsUnsignedLongMask($input); + } else if (PyLong_Check($input)) { + $1 = (uint32_t) PyLong_AsUnsignedLong($input); + } else { + SWIG_exception_fail(SWIG_ValueError, "unknown integer type"); + } +} + +/* unsinged32 Convert from C --> Python */ +%typemap(out) uint32_t { + $result = PyInt_FromLong((long)$1); +} + + +/* unsigned16 Convert from Python --> C */ +%typemap(in) uint16_t { + if (PyInt_Check($input)) { + $1 = (uint16_t) PyInt_AsUnsignedLongMask($input); + } else if (PyLong_Check($input)) { + $1 = (uint16_t) PyLong_AsUnsignedLong($input); + } else { + SWIG_exception_fail(SWIG_ValueError, "unknown integer type"); + } +} + +/* unsigned16 Convert from C --> Python */ +%typemap(out) uint16_t { + $result = PyInt_FromLong((long)$1); +} + + +/* signed32 Convert from Python --> C */ +%typemap(in) int32_t { + if (PyInt_Check($input)) { + $1 = (int32_t) PyInt_AsLong($input); + } else if (PyLong_Check($input)) { + $1 = (int32_t) PyLong_AsLong($input); + } else { + SWIG_exception_fail(SWIG_ValueError, "unknown integer type"); + } +} + +/* signed32 Convert from C --> Python */ +%typemap(out) int32_t { + $result = PyInt_FromLong((long)$1); +} + + +/* unsigned64 Convert from Python --> C */ +%typemap(in) uint64_t { +%#ifdef HAVE_LONG_LONG + if (PyLong_Check($input)) { + $1 = (uint64_t)PyLong_AsUnsignedLongLong($input); + } else if (PyInt_Check($input)) { + $1 = (uint64_t)PyInt_AsUnsignedLongLongMask($input); + } else +%#endif + { + SWIG_exception_fail(SWIG_ValueError, "unsupported integer size - uint64_t input too large"); + } +} + +/* unsigned64 Convert from C --> Python */ +%typemap(out) uint64_t { +%#ifdef HAVE_LONG_LONG + $result = PyLong_FromUnsignedLongLong((unsigned PY_LONG_LONG)$1); +%#else + SWIG_exception_fail(SWIG_ValueError, "unsupported integer size - uint64_t output too large"); +%#endif +} + +/* signed64 Convert from Python --> C */ +%typemap(in) int64_t { +%#ifdef HAVE_LONG_LONG + if (PyLong_Check($input)) { + $1 = (int64_t)PyLong_AsLongLong($input); + } else if (PyInt_Check($input)) { + $1 = (int64_t)PyInt_AsLong($input); + } else +%#endif + { + SWIG_exception_fail(SWIG_ValueError, "unsupported integer size - int64_t input too large"); + } +} + +/* signed64 Convert from C --> Python */ +%typemap(out) int64_t { +%#ifdef HAVE_LONG_LONG + $result = PyLong_FromLongLong((PY_LONG_LONG)$1); +%#else + SWIG_exception_fail(SWIG_ValueError, "unsupported integer size - int64_t output too large"); +%#endif +} + + +/* Convert from Python --> C */ +%typemap(in) void * { + $1 = (void *)$input; +} + +/* Convert from C --> Python */ +%typemap(out) void * { + $result = (PyObject *) $1; + Py_INCREF($result); +} + +%typemap (typecheck, precedence=SWIG_TYPECHECK_UINT64) uint64_t { + $1 = PyLong_Check($input) ? 1 : 0; +} + +%typemap (typecheck, precedence=SWIG_TYPECHECK_UINT32) uint32_t { + $1 = PyInt_Check($input) ? 1 : 0; +} + + + +%include "../qmfengine.i" + diff --git a/qpid/cpp/bindings/qmf/python/qmf.py b/qpid/cpp/bindings/qmf/python/qmf.py new file mode 100644 index 0000000000..06d3070841 --- /dev/null +++ b/qpid/cpp/bindings/qmf/python/qmf.py @@ -0,0 +1,1680 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import socket +import os +import logging +from threading import Thread +from threading import RLock +from threading import Condition +import qmfengine +from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE) +from qmfengine import (CLASS_EVENT, CLASS_OBJECT) +from qmfengine import (DIR_IN, DIR_IN_OUT, DIR_OUT) +from qmfengine import (TYPE_ABSTIME, TYPE_ARRAY, TYPE_BOOL, TYPE_DELTATIME, + TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT16, TYPE_INT32, TYPE_INT64, + TYPE_INT8, TYPE_LIST, TYPE_LSTR, TYPE_MAP, TYPE_OBJECT, + TYPE_REF, TYPE_SSTR, TYPE_UINT16, TYPE_UINT32, TYPE_UINT64, + TYPE_UINT8, TYPE_UUID) +from qmfengine import (O_EQ, O_NE, O_LT, O_LE, O_GT, O_GE, O_RE_MATCH, O_RE_NOMATCH, + E_NOT, E_AND, E_OR, E_XOR) +from qmfengine import (SEV_EMERG, SEV_ALERT, SEV_CRIT, SEV_ERROR, SEV_WARN, SEV_NOTICE, + SEV_INFORM, SEV_DEBUG) + + +def qmf_to_native(val): + typecode = val.getType() + if typecode == TYPE_UINT8: return val.asUint() + elif typecode == TYPE_UINT16: return val.asUint() + elif typecode == TYPE_UINT32: return val.asUint() + elif typecode == TYPE_UINT64: return val.asUint64() + elif typecode == TYPE_SSTR: return val.asString() + elif typecode == TYPE_LSTR: return val.asString() + elif typecode == TYPE_ABSTIME: return val.asInt64() + elif typecode == TYPE_DELTATIME: return val.asUint64() + elif typecode == TYPE_REF: return ObjectId(val.asObjectId()) + elif typecode == TYPE_BOOL: return val.asBool() + elif typecode == TYPE_FLOAT: return val.asFloat() + elif typecode == TYPE_DOUBLE: return val.asDouble() + elif typecode == TYPE_UUID: return val.asUuid() + elif typecode == TYPE_INT8: return val.asInt() + elif typecode == TYPE_INT16: return val.asInt() + elif typecode == TYPE_INT32: return val.asInt() + elif typecode == TYPE_INT64: return val.asInt64() + elif typecode == TYPE_MAP: return value_to_dict(val) + elif typecode == TYPE_LIST: return value_to_list(val) + else: + # when TYPE_OBJECT + logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) + return None + + +def native_to_qmf(target, value): + val = None + typecode = None + if target.__class__ == qmfengine.Value: + val = target + typecode = val.getType() + else: + typecode = target + val = qmfengine.Value(typecode) + + if typecode == TYPE_UINT8: val.setUint(value) + elif typecode == TYPE_UINT16: val.setUint(value) + elif typecode == TYPE_UINT32: val.setUint(value) + elif typecode == TYPE_UINT64: val.setUint64(value) + elif typecode == TYPE_SSTR: + if value: val.setString(value) + else: val.setString('') + elif typecode == TYPE_LSTR: + if value: val.setString(value) + else: val.setString('') + elif typecode == TYPE_ABSTIME: val.setInt64(value) + elif typecode == TYPE_DELTATIME: val.setUint64(value) + elif typecode == TYPE_REF: val.setObjectId(value.impl) + elif typecode == TYPE_BOOL: val.setBool(value) + elif typecode == TYPE_FLOAT: val.setFloat(value) + elif typecode == TYPE_DOUBLE: val.setDouble(value) + elif typecode == TYPE_UUID: val.setUuid(value) + elif typecode == TYPE_INT8: val.setInt(value) + elif typecode == TYPE_INT16: val.setInt(value) + elif typecode == TYPE_INT32: val.setInt(value) + elif typecode == TYPE_INT64: val.setInt64(value) + elif typecode == TYPE_MAP: dict_to_value(val, value) + elif typecode == TYPE_LIST: list_to_value(val, value) + else: + # when TYPE_OBJECT + logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) + return None + return val + + +def pick_qmf_type(value): + if value.__class__ == int: + if value >= 0: + if value < 0x100000000: return TYPE_UINT32 + return TYPE_UINT64 + else: + if value > -0xffffffff: return TYPE_INT32 + return TYPE_INT64 + + if value.__class__ == long: + if value >= 0: return TYPE_UINT64 + return TYPE_INT64 + + if value.__class__ == str: + if len(value) < 256: return TYPE_SSTR + return TYPE_LSTR + + if value.__class__ == float: return TYPE_DOUBLE + if value.__class__ == bool: return TYPE_BOOL + if value == None: return TYPE_BOOL + if value.__class__ == dict: return TYPE_MAP + if value.__class__ == list: return TYPE_LIST + + raise "QMF type not known for native type %s" % value.__class__ + + +def value_to_dict(val): + if not val.isMap(): raise "value_to_dict must be given a map value" + mymap = {} + for i in range(val.keyCount()): + key = val.key(i) + mymap[key] = qmf_to_native(val.byKey(key)) + return mymap + + +def dict_to_value(val, mymap): + for key, value in mymap.items(): + if key.__class__ != str: raise "QMF map key must be a string" + typecode = pick_qmf_type(value) + val.insert(key, native_to_qmf(typecode, value)) + + +def value_to_list(val): + mylist = [] + if val.isList(): + for i in range(val.listItemCount()): + mylist.append(qmf_to_native(val.listItem(i))) + return mylist + #if val.isArray(): + # for i in range(val.arrayItemCount()): + # mylist.append(qmf_to_native(val.arrayItem(i))) + # return mylist + + raise "value_to_list must be given a list value" + + +def list_to_value(val, mylist): + for item in mylist: + typecode = pick_qmf_type(item) + val.appendToList(native_to_qmf(typecode, item)) + + + ##============================================================================== + ## CONNECTION + ##============================================================================== + +class ConnectionSettings(object): + #attr_reader :impl + def __init__(self, url=None): + if url: + self.impl = qmfengine.ConnectionSettings(url) + else: + self.impl = qmfengine.ConnectionSettings() + + + def set_attr(self, key, val): + if type(val) == str: + _v = qmfengine.Value(TYPE_LSTR) + _v.setString(val) + elif type(val) == int: + _v = qmfengine.Value(TYPE_UINT32) + _v.setUint(val) + elif type(val) == bool: + _v = qmfengine.Value(TYPE_BOOL) + _v.setBool(val) + else: + raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val))) + + good = self.impl.setAttr(key, _v) + if not good: + raise Exception("Argument error: unsupported attribute '%s'" % key ) + + + def get_attr(self, key): + _v = self.impl.getAttr(key) + if _v.isString(): + return _v.asString() + elif _v.isUint(): + return _v.asUint() + elif _v.isBool(): + return _v.asBool() + else: + raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType()))) + + + def __getattr__(self, name): + return self.get_attr(name) + + + def __setattr__(self, name, value): + if name == "impl": + return super.__setattr__(self, name, value) + return self.set_attr(name, value) + + + +class ConnectionHandler: + def conn_event_connected(self): None + def conn_event_disconnected(self, error): None + def conn_event_visit(self): None + def sess_event_session_closed(self, context, error): None + def sess_event_recv(self, context, message): None + + + +class Connection(Thread): + def __init__(self, settings): + Thread.__init__(self) + self._lock = RLock() + self.impl = qmfengine.ResilientConnection(settings.impl) + self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + self.impl.setNotifyFd(self._sockEngine.fileno()) + self._new_conn_handlers = [] + self._conn_handlers_to_delete = [] + self._conn_handlers = [] + self._connected = False + self._operational = True + self.start() + + + def destroy(self, timeout=None): + logging.debug("Destroying Connection...") + self._operational = False + self.kick() + self.join(timeout) + logging.debug("... Conn Destroyed!" ) + if self.isAlive(): + logging.error("Error: Connection thread '%s' is hung..." % self.getName()) + + + def connected(self): + return self._connected + + + def kick(self): + self.impl.notify() + + + def add_conn_handler(self, handler): + self._lock.acquire() + try: + self._new_conn_handlers.append(handler) + finally: + self._lock.release() + self.kick() + + + def del_conn_handler(self, handler): + self._lock.acquire() + try: + self._conn_handlers_to_delete.append(handler) + finally: + self._lock.release() + self.kick() + + + def run(self): + eventImpl = qmfengine.ResilientConnectionEvent() + new_handlers = [] + del_handlers = [] + bt_count = 0 + + while self._operational: + logging.debug("Connection thread waiting for socket data...") + self._sock.recv(1) + + self._lock.acquire() + try: + new_handlers = self._new_conn_handlers + del_handlers = self._conn_handlers_to_delete + self._new_conn_handlers = [] + self._conn_handlers_to_delete = [] + finally: + self._lock.release() + + for nh in new_handlers: + self._conn_handlers.append(nh) + if self._connected: + nh.conn_event_connected() + new_handlers = [] + + for dh in del_handlers: + if dh in self._conn_handlers: + self._conn_handlers.remove(dh) + del_handlers = [] + + valid = self.impl.getEvent(eventImpl) + while valid: + try: + if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED: + logging.debug("Connection thread: CONNECTED event received.") + self._connected = True + for h in self._conn_handlers: + h.conn_event_connected() + + elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED: + logging.debug("Connection thread: DISCONNECTED event received.") + self._connected = False + for h in self._conn_handlers: + h.conn_event_disconnected(eventImpl.errorText) + + elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED: + logging.debug("Connection thread: SESSION_CLOSED event received.") + eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) + + elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV: + logging.debug("Connection thread: RECV event received.") + eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) + else: + logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind)) + + except: + import traceback + logging.error( "Exception occurred during Connection event processing:" ) + logging.error( str(sys.exc_info()) ) + if bt_count < 2: + traceback.print_exc() + traceback.print_stack() + bt_count += 1 + + self.impl.popEvent() + valid = self.impl.getEvent(eventImpl) + + for h in self._conn_handlers: + h.conn_event_visit() + + logging.debug("Shutting down Connection thread") + + + +class Session: + def __init__(self, conn, label, handler): + self._conn = conn + self._label = label + self.handler = handler + self.handle = qmfengine.SessionHandle() + result = self._conn.impl.createSession(label, self, self.handle) + + + def destroy(self): + self._conn.impl.destroySession(self.handle) + + + + ##============================================================================== + ## OBJECTS and EVENTS + ##============================================================================== + +class QmfEvent(object): + # attr_reader :impl, :event_class + def __init__(self, cls, kwargs={}): + self._allow_sets = True + if kwargs.has_key("broker"): + self._broker = kwargs["broker"] + else: + self._broker = None + if cls: + self.event_class = cls + self.impl = qmfengine.Event(self.event_class.impl) + elif kwargs.has_key("impl"): + self.impl = qmfengine.Event(kwargs["impl"]) + self.event_class = SchemaEventClass(None, None, 0, + {"impl":self.impl.getClass()}) + else: + raise Exception("Argument error: required parameter ('impl') not supplied") + + + def arguments(self): + list = [] + for arg in self.event_class.arguments: + list.append([arg, self.get_attr(arg.name())]) + return list + + + def get_attr(self, name): + val = self._value(name) + return qmf_to_native(val) + + + def set_attr(self, name, v): + val = self._value(name) + native_to_qmf(val, v) + + + def __getitem__(self, name): + return self.get_attr(name) + + + def __setitem__(self, name, value): + self.set_attr(name, value) + + + def __setattr__(self, name, value): + # + # Ignore the internal attributes, set them normally... + # + if (name[0] == '_' or + name == 'impl' or + name == 'event_class'): + return super.__setattr__(self, name, value) + + if not self._allow_sets: + raise Exception("'Set' operations not permitted on this object") + + # + # If the name matches an argument name, set the value of the argument. + # + # print "set name=%s" % str(name) + for arg in self.event_class.arguments: + if arg.name() == name: + return self.set_attr(name, value) + + # unrecognized name? should I raise an exception? + super.__setattr__(self, name, value) + + + def __getattr__(self, name, *args): + # + # If the name matches an argument name, return the value of the argument. + # + for arg in self.event_class.arguments: + if arg.name() == name: + return self.get_attr(name) + + # + # This name means nothing to us, pass it up the line to the parent + # class's handler. + # + # print "__getattr__=%s" % str(name) + super.__getattr__(self, name) + + + def _value(self, name): + val = self.impl.getValue(name) + if not val: + raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % + (name, + self.event_class.impl.getClassKey().getPackageName(), + self.event_class.impl.getClassKey().getClassName())) + return val + + +class QmfObject(object): + # attr_reader :impl, :object_class + def __init__(self, cls, kwargs={}): + self._cv = Condition() + self._sync_count = 0 + self._sync_result = None + self._allow_sets = False + if kwargs.has_key("broker"): + self._broker = kwargs["broker"] + else: + self._broker = None + if cls: + self.object_class = cls + self.impl = qmfengine.Object(self.object_class.impl) + elif kwargs.has_key("impl"): + self.impl = qmfengine.Object(kwargs["impl"]) + self.object_class = SchemaObjectClass(None, + None, + {"impl":self.impl.getClass()}) + else: + raise Exception("Argument error: required parameter ('impl') not supplied") + + + def destroy(self): + self.impl.destroy() + + + def object_id(self): + return ObjectId(self.impl.getObjectId()) + + + def set_object_id(self, oid): + self.impl.setObjectId(oid.impl) + + + def properties(self): + list = [] + for prop in self.object_class.properties: + list.append([prop, self.get_attr(prop.name())]) + return list + + + def statistics(self): + list = [] + for stat in self.object_class.statistics: + list.append([stat, self.get_attr(stat.name())]) + return list + + + def get_attr(self, name): + val = self._value(name) + return qmf_to_native(val) + + + def set_attr(self, name, v): + val = self._value(name) + native_to_qmf(val, v) + + + def __getitem__(self, name): + return self.get_attr(name) + + + def __setitem__(self, name, value): + self.set_attr(name, value) + + + def inc_attr(self, name, by=1): + self.set_attr(name, self.get_attr(name) + by) + + + def dec_attr(self, name, by=1): + self.set_attr(name, self.get_attr(name) - by) + + + def __setattr__(self, name, value): + # + # Ignore the internal attributes, set them normally... + # + if (name[0] == '_' or + name == 'impl' or + name == 'object_class'): + return super.__setattr__(self, name, value) + + if not self._allow_sets: + raise Exception("'Set' operations not permitted on this object") + # + # If the name matches a property name, set the value of the property. + # + # print "set name=%s" % str(name) + for prop in self.object_class.properties: + if prop.name() == name: + return self.set_attr(name, value) + # + # otherwise, check for a statistic set... + # + for stat in self.object_class.statistics: + if stat.name() == name: + return self.set_attr(name, value) + + # unrecognized name? should I raise an exception? + super.__setattr__(self, name, value) + + + def __getattr__(self, name, *args): + # + # If the name matches a property name, return the value of the property. + # + for prop in self.object_class.properties: + if prop.name() == name: + return self.get_attr(name) + # + # Do the same for statistics + # + for stat in self.object_class.statistics: + if stat.name() == name: + return self.get_attr(name) + # + # If we still haven't found a match for the name, check to see if + # it matches a method name. If so, marshall up the arguments into + # a map, and invoke the method. + # + for method in self.object_class.methods: + if method.name() == name: + argMap = self._marshall(method, args) + return lambda name, argMap : self._invokeMethod(name, argMap) + + # + # This name means nothing to us, pass it up the line to the parent + # class's handler. + # + # print "__getattr__=%s" % str(name) + super.__getattr__(self, name) + + + def _invokeMethod(self, name, argMap): + """ + Private: Helper function that invokes an object's method, and waits for the result. + """ + self._cv.acquire() + try: + timeout = 30 + self._sync_count = 1 + self.impl.invokeMethod(name, argMap, self) + if self._broker: + self._broker.conn.kick() + self._cv.wait(timeout) + if self._sync_count == 1: + raise Exception("Timed out: waiting for response to method call.") + finally: + self._cv.release() + + return self._sync_result + + + def _method_result(self, result): + """ + Called to return the result of a method call on an object + """ + self._cv.acquire(); + try: + self._sync_result = result + self._sync_count -= 1 + self._cv.notify() + finally: + self._cv.release() + + + def _marshall(schema, args): + ''' + Private: Convert a list of arguments (positional) into a Value object of type "map". + Used to create the argument parameter for an object's method invokation. + ''' + # Build a map of the method's arguments + map = qmfengine.Value(TYPE_MAP) + for arg in schema.arguments: + if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: + map.insert(arg.name, qmfengine.Value(arg.typecode)) + + # install each argument's value into the map + marshalled = Arguments(map) + idx = 0 + for arg in schema.arguments: + if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: + if args[idx]: + marshalled[arg.name] = args[idx] + idx += 1 + + return marshalled.map + + + def _value(self, name): + val = self.impl.getValue(name) + if not val: + raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % + (name, + self.object_class.impl.getClassKey().getPackageName(), + self.object_class.impl.getClassKey().getClassName())) + return val + + + +class AgentObject(QmfObject): + def __init__(self, cls, kwargs={}): + QmfObject.__init__(self, cls, kwargs) + self._allow_sets = True + + + def destroy(self): + self.impl.destroy() + + + def set_object_id(self, oid): + self.impl.setObjectId(oid.impl) + + + +class ConsoleObject(QmfObject): + # attr_reader :current_time, :create_time, :delete_time + def __init__(self, cls, kwargs={}): + QmfObject.__init__(self, cls, kwargs) + + + def update(self): + if not self._broker: + raise Exception("No linkage to broker") + newer = self._broker.console.objects(Query({"object_id":object_id})) + if newer.size != 1: + raise Exception("Expected exactly one update for this object, %d present" % int(newer.size)) + self.merge_update(newer[0]) + + + def merge_update(self, newObject): + self.impl.merge(new_object.impl) + + + def is_deleted(self): + return self.impl.isDeleted() + + + def key(self): pass + + + +class ObjectId: + def __init__(self, impl=None): + if impl: + self.impl = impl + else: + self.impl = qmfengine.ObjectId() + self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) + + + def object_num_high(self): + return self.impl.getObjectNumHi() + + + def object_num_low(self): + return self.impl.getObjectNumLo() + + + def agent_key(self): + self.agent_key + + def __eq__(self, other): + if not isinstance(other, self.__class__): return False + return self.impl == other.impl + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return self.impl.str() + + + +class Arguments(object): + def __init__(self, map): + self.map = map + self._by_hash = {} + key_count = self.map.keyCount() + a = 0 + while a < key_count: + key = self.map.key(a) + self._by_hash[key] = qmf_to_native(self.map.byKey(key)) + a += 1 + + + def __getitem__(self, key): + return self._by_hash[key] + + + def __setitem__(self, key, value): + self._by_hash[key] = value + self.set(key, value) + + + def __iter__(self): + return self._by_hash.__iter__ + + + def __getattr__(self, name): + if name in self._by_hash: + return self._by_hash[name] + return super.__getattr__(self, name) + + + def __setattr__(self, name, value): + # + # ignore local data members + # + if (name[0] == '_' or + name == 'map'): + return super.__setattr__(self, name, value) + + if name in self._by_hash: + self._by_hash[name] = value + return self.set(name, value) + + return super.__setattr__(self, name, value) + + + def set(self, key, value): + val = self.map.byKey(key) + native_to_qmf(val, value) + + + +class MethodResponse(object): + def __init__(self, impl): + self.impl = qmfengine.MethodResponse(impl) + + + def status(self): + return self.impl.getStatus() + + + def exception(self): + return self.impl.getException() + + + def text(self): + return exception().asString() + + + def args(self): + return Arguments(self.impl.getArgs()) + + + def __getattr__(self, name): + myArgs = self.args() + return myArgs.__getattr__(name) + + + def __setattr__(self, name, value): + if name == 'impl': + return super.__setattr__(self, name, value) + + myArgs = self.args() + return myArgs.__setattr__(name, value) + + + + ##============================================================================== + ## QUERY + ##============================================================================== + + +class Query: + def __init__(self, kwargs={}): + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + package = '' + if "key" in kwargs: + # construct using SchemaClassKey: + self.impl = qmfengine.Query(kwargs["key"]) + elif "object_id" in kwargs: + self.impl = qmfengine.Query(kwargs["object_id"].impl) + else: + if "package" in kwargs: + package = kwargs["package"] + if "class" in kwargs: + self.impl = qmfengine.Query(kwargs["class"], package) + else: + raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") + + + def package_name(self): return self.impl.getPackage() + def class_name(self): return self.impl.getClass() + def object_id(self): + _objid = self.impl.getObjectId() + if _objid: + return ObjectId(_objid) + else: + return None + + + ##============================================================================== + ## SCHEMA + ##============================================================================== + + + +class SchemaArgument: + #attr_reader :impl + def __init__(self, name, typecode, kwargs={}): + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaArgument(name, typecode) + if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"]) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def name(self): + return self.impl.getName() + + + def direction(self): + return self.impl.getDirection() + + + def typecode(self): + return self.impl.getType() + + + def __repr__(self): + return self.name() + + + +class SchemaMethod: + # attr_reader :impl, arguments + def __init__(self, name, kwargs={}): + self.arguments = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + for i in range(self.impl.getArgumentCount()): + self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)})) + else: + self.impl = qmfengine.SchemaMethod(name) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def add_argument(self, arg): + self.arguments.append(arg) + self.impl.addArgument(arg.impl) + + def name(self): + return self.impl.getName() + + def __repr__(self): + return self.name() + + + +class SchemaProperty: + #attr_reader :impl + def __init__(self, name, typecode, kwargs={}): + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaProperty(name, typecode) + if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"]) + if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"]) + if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"]) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def name(self): + return self.impl.getName() + + def __repr__(self): + return self.name() + + + +class SchemaStatistic: + # attr_reader :impl + def __init__(self, name, typecode, kwargs={}): + if "impl" in kwargs: + self.impl = kwargs["impl"] + else: + self.impl = qmfengine.SchemaStatistic(name, typecode) + if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def name(self): + return self.impl.getName() + + def __repr__(self): + return self.name() + + + +class SchemaClassKey: + #attr_reader :impl + def __init__(self, i): + self.impl = i + + + def package_name(self): + return self.impl.getPackageName() + + + def class_name(self): + return self.impl.getClassName() + + def __repr__(self): + return self.impl.asString() + + + +class SchemaObjectClass: + # attr_reader :impl, :properties, :statistics, :methods + def __init__(self, package, name, kwargs={}): + self.properties = [] + self.statistics = [] + self.methods = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + + for i in range(self.impl.getPropertyCount()): + self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)})) + + for i in range(self.impl.getStatisticCount()): + self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)})) + + for i in range(self.impl.getMethodCount()): + self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)})) + else: + self.impl = qmfengine.SchemaObjectClass(package, name) + + + def add_property(self, prop): + self.properties.append(prop) + self.impl.addProperty(prop.impl) + + + def add_statistic(self, stat): + self.statistics.append(stat) + self.impl.addStatistic(stat.impl) + + + def add_method(self, meth): + self.methods.append(meth) + self.impl.addMethod(meth.impl) + + + def class_key(self): + return SchemaClassKey(self.impl.getClassKey()) + + + def package_name(self): + return self.impl.getClassKey().getPackageName() + + + def class_name(self): + return self.impl.getClassKey().getClassName() + + + +class SchemaEventClass: + # attr_reader :impl :arguments + def __init__(self, package, name, sev, kwargs={}): + self.arguments = [] + if "impl" in kwargs: + self.impl = kwargs["impl"] + for i in range(self.impl.getArgumentCount()): + self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)})) + else: + self.impl = qmfengine.SchemaEventClass(package, name, sev) + if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) + + + def add_argument(self, arg): + self.arguments.append(arg) + self.impl.addArgument(arg.impl) + + + def name(self): + return self.impl.getClassKey().getClassName() + + def class_key(self): + return SchemaClassKey(self.impl.getClassKey()) + + + def package_name(self): + return self.impl.getClassKey().getPackageName() + + + def class_name(self): + return self.impl.getClassKey().getClassName() + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + +class ConsoleHandler: + def agent_added(self, agent): pass + def agent_deleted(self, agent): pass + def new_package(self, package): pass + def new_class(self, class_key): pass + def object_update(self, obj, hasProps, hasStats): pass + def event_received(self, event): pass + def agent_heartbeat(self, agent, timestamp): pass + def method_response(self, resp): pass + def broker_info(self, broker): pass + + + +class Console(Thread): + # attr_reader :impl + def __init__(self, handler=None, kwargs={}): + Thread.__init__(self) + self._handler = handler + self.impl = qmfengine.Console() + self._event = qmfengine.ConsoleEvent() + self._broker_list = [] + self._cv = Condition() + self._sync_count = 0 + self._sync_result = None + self._select = {} + self._cb_cond = Condition() + self._operational = True + self.start() + + + def destroy(self, timeout=None): + logging.debug("Destroying Console...") + self._operational = False + self.start_console_events() # wake thread up + self.join(timeout) + logging.debug("... Console Destroyed!") + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + + + def add_connection(self, conn): + broker = Broker(self, conn) + self._cv.acquire() + try: + self._broker_list.append(broker) + finally: + self._cv.release() + return broker + + + def del_connection(self, broker): + logging.debug("shutting down broker...") + broker.shutdown() + logging.debug("...broker down.") + self._cv.acquire() + try: + self._broker_list.remove(broker) + finally: + self._cv.release() + logging.debug("del_connection() finished") + + + def packages(self): + plist = [] + for i in range(self.impl.packageCount()): + plist.append(self.impl.getPackageName(i)) + return plist + + + def classes(self, package, kind=CLASS_OBJECT): + clist = [] + for i in range(self.impl.classCount(package)): + key = self.impl.getClass(package, i) + class_kind = self.impl.getClassKind(key) + if class_kind == kind: + if kind == CLASS_OBJECT: + clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + elif kind == CLASS_EVENT: + clist.append(SchemaEventClass(None, None, 0, {"impl":self.impl.getEventClass(key)})) + return clist + + + def bind_package(self, package): + return self.impl.bindPackage(package) + + + def bind_class(self, kwargs = {}): + if "key" in kwargs: + self.impl.bindClass(kwargs["key"]) + elif "package" in kwargs: + package = kwargs["package"] + if "class" in kwargs: + self.impl.bindClass(package, kwargs["class"]) + else: + self.impl.bindClass(package, "*") + else: + raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + def bind_event(self, kwargs = {}): + if "key" in kwargs: + self.impl.bindEvent(kwargs["key"]) + elif "package" in kwargs: + package = kwargs["package"] + if "event" in kwargs: + self.impl.bindEvent(package, kwargs["event"]) + else: + self.impl.bindEvent(package, "*") + else: + raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'event']") + + + def agents(self, broker=None): + blist = [] + if broker: + blist.append(broker) + else: + self._cv.acquire() + try: + # copy while holding lock + blist = self._broker_list[:] + finally: + self._cv.release() + + agents = [] + for b in blist: + for idx in range(b.impl.agentCount()): + agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + return agents + + + def objects(self, query, kwargs = {}): + timeout = 30 + agent = None + temp_args = kwargs.copy() + if type(query) == type({}): + temp_args.update(query) + + if "_timeout" in temp_args: + timeout = temp_args["_timeout"] + temp_args.pop("_timeout") + + if "_agent" in temp_args: + agent = temp_args["_agent"] + temp_args.pop("_agent") + + if type(query) == type({}): + query = Query(temp_args) + + self._select = {} + for k in temp_args.iterkeys(): + if type(k) == str: + self._select[k] = temp_args[k] + + self._cv.acquire() + try: + self._sync_count = 1 + self._sync_result = [] + broker = self._broker_list[0] + broker.send_query(query.impl, None, agent) + self._cv.wait(timeout) + if self._sync_count == 1: + raise Exception("Timed out: waiting for query response") + finally: + self._cv.release() + + return self._sync_result + + + def object(self, query, kwargs = {}): + ''' + Return one and only one object or None. + ''' + objs = objects(query, kwargs) + if len(objs) == 1: + return objs[0] + else: + return None + + + def first_object(self, query, kwargs = {}): + ''' + Return the first of potentially many objects. + ''' + objs = objects(query, kwargs) + if objs: + return objs[0] + else: + return None + + + # Check the object against select to check for a match + def _select_match(self, object): + schema_props = object.properties() + for key in self._select.iterkeys(): + for prop in schema_props: + if key == p[0].name() and self._select[key] != p[1]: + return False + return True + + + def _get_result(self, list, context): + ''' + Called by Broker proxy to return the result of a query. + ''' + self._cv.acquire() + try: + for item in list: + if self._select_match(item): + self._sync_result.append(item) + self._sync_count -= 1 + self._cv.notify() + finally: + self._cv.release() + + + def start_sync(self, query): pass + + + def touch_sync(self, sync): pass + + + def end_sync(self, sync): pass + + + def run(self): + while self._operational: + self._cb_cond.acquire() + try: + self._cb_cond.wait(1) + while self._do_console_events(): + pass + finally: + self._cb_cond.release() + logging.debug("Shutting down Console thread") + + + def start_console_events(self): + self._cb_cond.acquire() + try: + self._cb_cond.notify() + finally: + self._cb_cond.release() + + + def _do_console_events(self): + ''' + Called by the Console thread to poll for events. Passes the events + onto the ConsoleHandler associated with this Console. Is called + periodically, but can also be kicked by Console.start_console_events(). + ''' + count = 0 + valid = self.impl.getEvent(self._event) + while valid: + count += 1 + try: + if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: + logging.debug("Console Event AGENT_ADDED received") + if self._handler: + self._handler.agent_added(AgentProxy(self._event.agent, None)) + elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: + logging.debug("Console Event AGENT_DELETED received") + if self._handler: + self._handler.agent_deleted(AgentProxy(self._event.agent, None)) + elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: + logging.debug("Console Event NEW_PACKAGE received") + if self._handler: + self._handler.new_package(self._event.name) + elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: + logging.debug("Console Event NEW_CLASS received") + if self._handler: + self._handler.new_class(SchemaClassKey(self._event.classKey)) + elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: + logging.debug("Console Event OBJECT_UPDATE received") + if self._handler: + self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), + self._event.hasProps, self._event.hasStats) + elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: + logging.debug("Console Event EVENT_RECEIVED received") + elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: + logging.debug("Console Event AGENT_HEARTBEAT received") + if self._handler: + self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) + elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: + logging.debug("Console Event METHOD_RESPONSE received") + else: + logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) + except e: + print "Exception caught in callback thread:", e + self.impl.popEvent() + valid = self.impl.getEvent(self._event) + return count + + + +class AgentProxy: + # attr_reader :broker + def __init__(self, impl, broker): + self.impl = impl + self.broker = broker + self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) + + + def label(self): + return self.impl.getLabel() + + + def key(self): + return self.key + + +class Broker(ConnectionHandler): + # attr_reader :impl :conn, :console, :broker_bank + def __init__(self, console, conn): + self.broker_bank = 1 + self.console = console + self.conn = conn + self._session = None + self._cv = Condition() + self._stable = None + self._event = qmfengine.BrokerEvent() + self._xmtMessage = qmfengine.Message() + self.impl = qmfengine.BrokerProxy(self.console.impl) + self.console.impl.addConnection(self.impl, self) + self.conn.add_conn_handler(self) + self._operational = True + + + def shutdown(self): + logging.debug("broker.shutdown() called.") + self.console.impl.delConnection(self.impl) + self.conn.del_conn_handler(self) + if self._session: + self.impl.sessionClosed() + logging.debug("broker.shutdown() sessionClosed done.") + self._session.destroy() + logging.debug("broker.shutdown() session destroy done.") + self._session = None + self._operational = False + logging.debug("broker.shutdown() done.") + + + def wait_for_stable(self, timeout = None): + self._cv.acquire() + try: + if self._stable: + return + if timeout: + self._cv.wait(timeout) + if not self._stable: + raise Exception("Timed out: waiting for broker connection to become stable") + else: + while not self._stable: + self._cv.wait() + finally: + self._cv.release() + + + def send_query(self, query, ctx, agent): + agent_impl = None + if agent: + agent_impl = agent.impl + self.impl.sendQuery(query, ctx, agent_impl) + self.conn.kick() + + + def _do_broker_events(self): + count = 0 + valid = self.impl.getEvent(self._event) + while valid: + count += 1 + if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: + logging.debug("Broker Event BROKER_INFO received"); + elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: + logging.debug("Broker Event DECLARE_QUEUE received"); + self.conn.impl.declareQueue(self._session.handle, self._event.name) + elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: + logging.debug("Broker Event DELETE_QUEUE received"); + self.conn.impl.deleteQueue(self._session.handle, self._event.name) + elif self._event.kind == qmfengine.BrokerEvent.BIND: + logging.debug("Broker Event BIND received"); + self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) + elif self._event.kind == qmfengine.BrokerEvent.UNBIND: + logging.debug("Broker Event UNBIND received"); + self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) + elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: + logging.debug("Broker Event SETUP_COMPLETE received"); + self.impl.startProtocol() + elif self._event.kind == qmfengine.BrokerEvent.STABLE: + logging.debug("Broker Event STABLE received"); + self._cv.acquire() + try: + self._stable = True + self._cv.notify() + finally: + self._cv.release() + elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: + result = [] + for idx in range(self._event.queryResponse.getObjectCount()): + result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) + self.console._get_result(result, self._event.context) + elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: + obj = self._event.context + obj._method_result(MethodResponse(self._event.methodResponse())) + + self.impl.popEvent() + valid = self.impl.getEvent(self._event) + + return count + + + def _do_broker_messages(self): + count = 0 + valid = self.impl.getXmtMessage(self._xmtMessage) + while valid: + count += 1 + logging.debug("Broker: sending msg on connection") + self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) + self.impl.popXmt() + valid = self.impl.getXmtMessage(self._xmtMessage) + + return count + + + def _do_events(self): + while True: + self.console.start_console_events() + bcnt = self._do_broker_events() + mcnt = self._do_broker_messages() + if bcnt == 0 and mcnt == 0: + break; + + + def conn_event_connected(self): + logging.debug("Broker: Connection event CONNECTED") + self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) + self.impl.sessionOpened(self._session.handle) + self._do_events() + + + def conn_event_disconnected(self, error): + logging.debug("Broker: Connection event DISCONNECTED") + pass + + + def conn_event_visit(self): + self._do_events() + + + def sess_event_session_closed(self, context, error): + logging.debug("Broker: Session event CLOSED") + self.impl.sessionClosed() + + + def sess_event_recv(self, context, message): + logging.debug("Broker: Session event MSG_RECV") + if not self._operational: + logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) + self.impl.handleRcvMessage(message) + self._do_events() + + + + ##============================================================================== + ## AGENT + ##============================================================================== + + + +class AgentHandler: + def get_query(self, context, query, userId): None + def method_call(self, context, name, object_id, args, userId): None + + + +class Agent(ConnectionHandler): + def __init__(self, handler, label=""): + if label == "": + self._agentLabel = "rb-%s.%d" % (socket.gethostname(), os.getpid()) + else: + self._agentLabel = label + self._conn = None + self._handler = handler + self.impl = qmfengine.Agent(self._agentLabel) + self._event = qmfengine.AgentEvent() + self._xmtMessage = qmfengine.Message() + + + def set_connection(self, conn): + self._conn = conn + self._conn.add_conn_handler(self) + + + def register_class(self, cls): + self.impl.registerClass(cls.impl) + + + def alloc_object_id(self, low = 0, high = 0): + return ObjectId(self.impl.allocObjectId(low, high)) + + + def raise_event(self, event): + self.impl.raiseEvent(event.impl) + + def query_response(self, context, obj): + self.impl.queryResponse(context, obj.impl) + + + def query_complete(self, context): + self.impl.queryComplete(context) + + + def method_response(self, context, status, text, arguments): + self.impl.methodResponse(context, status, text, arguments.map) + + + def do_agent_events(self): + count = 0 + valid = self.impl.getEvent(self._event) + while valid: + count += 1 + if self._event.kind == qmfengine.AgentEvent.GET_QUERY: + self._handler.get_query(self._event.sequence, + Query({"impl":self._event.query}), + self._event.authUserId) + + elif self._event.kind == qmfengine.AgentEvent.START_SYNC: + pass + elif self._event.kind == qmfengine.AgentEvent.END_SYNC: + pass + elif self._event.kind == qmfengine.AgentEvent.METHOD_CALL: + args = Arguments(self._event.arguments) + self._handler.method_call(self._event.sequence, self._event.name, + ObjectId(self._event.objectId), + args, self._event.authUserId) + + elif self._event.kind == qmfengine.AgentEvent.DECLARE_QUEUE: + self._conn.impl.declareQueue(self._session.handle, self._event.name) + + elif self._event.kind == qmfengine.AgentEvent.DELETE_QUEUE: + self._conn.impl.deleteQueue(self._session.handle, self._event.name) + + elif self._event.kind == qmfengine.AgentEvent.BIND: + self._conn.impl.bind(self._session.handle, self._event.exchange, + self._event.name, self._event.bindingKey) + + elif self._event.kind == qmfengine.AgentEvent.UNBIND: + self._conn.impl.unbind(self._session.handle, self._event.exchange, + self._event.name, self._event.bindingKey) + + elif self._event.kind == qmfengine.AgentEvent.SETUP_COMPLETE: + self.impl.startProtocol() + + self.impl.popEvent() + valid = self.impl.getEvent(self._event) + return count + + + def do_agent_messages(self): + count = 0 + valid = self.impl.getXmtMessage(self._xmtMessage) + while valid: + count += 1 + self._conn.impl.sendMessage(self._session.handle, self._xmtMessage) + self.impl.popXmt() + valid = self.impl.getXmtMessage(self._xmtMessage) + return count + + + def do_events(self): + while True: + ecnt = self.do_agent_events() + mcnt = self.do_agent_messages() + if ecnt == 0 and mcnt == 0: + break + + + def conn_event_connected(self): + logging.debug("Agent Connection Established...") + self._session = Session(self._conn, + "qmfa-%s.%d" % (socket.gethostname(), os.getpid()), + self) + self.impl.newSession() + self.do_events() + + + def conn_event_disconnected(self, error): + logging.debug("Agent Connection Lost") + pass + + + def conn_event_visit(self): + self.do_events() + + + def sess_event_session_closed(self, context, error): + logging.debug("Agent Session Lost") + pass + + + def sess_event_recv(self, context, message): + self.impl.handleRcvMessage(message) + self.do_events() + + |