#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import cqpid from qmf2 import * class ExampleAgent(AgentHandler): """ This example agent is implemented as a single class that inherits AgentHandler. It does not use a separate thread since once set up, it is driven strictly by incoming method calls. """ def __init__(self, url): ## ## Create and open a messaging connection to a broker. ## self.connection = cqpid.Connection(url, "{reconnect:True}") self.session = None self.connection.open() ## ## Create, configure, and open a QMFv2 agent session using the connection. ## self.session = AgentSession(self.connection, "{interval:30}") self.session.setVendor('profitron.com') self.session.setProduct('blastinator') self.session.setAttribute('attr1', 1000) self.session.open() ## ## Initialize the parent class. ## AgentHandler.__init__(self, self.session) def shutdown(self): """ Clean up the session and connection. """ if self.session: self.session.close() self.connection.close() def method(self, handle, methodName, args, subtypes, addr, userId): """ Handle incoming method calls. """ if addr == self.controlAddr: self.control.methodCount += 1 try: if methodName == "stop": self.session.methodSuccess(handle) self.cancel() elif methodName == "echo": handle.addReturnArgument("sequence", args["sequence"]) handle.addReturnArgument("map", args["map"]) self.session.methodSuccess(handle) elif methodName == "event": ev = Data(self.sch_event) ev.text = args['text'] self.session.raiseEvent(ev, args['severity']) self.session.methodSuccess(handle) elif methodName == "fail": if args['useString']: self.session.raiseException(handle, args['stringVal']) else: ex = Data(self.sch_exception) ex.whatHappened = "It Failed" ex.howBad = 75 ex.details = args['details'] self.session.raiseException(handle, ex) elif methodName == "create_child": name = args['name'] child = Data(self.sch_child) child.name = name addr = self.session.addData(child, name) handle.addReturnArgument("childAddr", addr.asMap()) self.session.methodSuccess(handle) except BaseException, e: self.session.raiseException(handle, "%r" % e) def setupSchema(self): """ Create and register the schema for this agent. """ package = "com.profitron.bntor" ## ## Declare a schema for a structured exception that can be used in failed ## method invocations. ## self.sch_exception = Schema(SCHEMA_TYPE_DATA, package, "exception") self.sch_exception.addProperty(SchemaProperty("whatHappened", SCHEMA_DATA_STRING)) self.sch_exception.addProperty(SchemaProperty("howBad", SCHEMA_DATA_INT)) self.sch_exception.addProperty(SchemaProperty("details", SCHEMA_DATA_MAP)) ## ## Declare a control object to test methods against. ## self.sch_control = Schema(SCHEMA_TYPE_DATA, package, "control") self.sch_control.addProperty(SchemaProperty("state", SCHEMA_DATA_STRING)) self.sch_control.addProperty(SchemaProperty("methodCount", SCHEMA_DATA_INT)) stopMethod = SchemaMethod("stop", desc="Stop Agent") stopMethod.addArgument(SchemaProperty("message", SCHEMA_DATA_STRING, direction=DIR_IN)) self.sch_control.addMethod(stopMethod) echoMethod = SchemaMethod("echo", desc="Echo Arguments") echoMethod.addArgument(SchemaProperty("sequence", SCHEMA_DATA_INT, direction=DIR_IN_OUT)) echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT)) self.sch_control.addMethod(echoMethod) eventMethod = SchemaMethod("event", desc="Raise an Event") eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN)) eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN)) self.sch_control.addMethod(eventMethod) failMethod = SchemaMethod("fail", desc="Expected to Fail") failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN)) failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN)) failMethod.addArgument(SchemaProperty("details", SCHEMA_DATA_MAP, direction=DIR_IN)) self.sch_control.addMethod(failMethod) createMethod = SchemaMethod("create_child", desc="Create Child Object") createMethod.addArgument(SchemaProperty("name", SCHEMA_DATA_STRING, direction=DIR_IN)) createMethod.addArgument(SchemaProperty("childAddr", SCHEMA_DATA_MAP, direction=DIR_OUT)) self.sch_control.addMethod(createMethod) ## ## Declare a child object ## self.sch_child = Schema(SCHEMA_TYPE_DATA, package, "child") self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING)) ## ## Declare the event class ## self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event") self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING)) ## ## Register our schemata with the agent session. ## self.session.registerSchema(self.sch_exception) self.session.registerSchema(self.sch_control) self.session.registerSchema(self.sch_child) self.session.registerSchema(self.sch_event) def populateData(self): """ Create a control object and give it to the agent session to manage. """ self.control = Data(self.sch_control) self.control.state = "OPERATIONAL" self.control.methodCount = 0 self.controlAddr = self.session.addData(self.control, "singleton") try: agent = ExampleAgent("localhost") agent.setupSchema() agent.populateData() agent.run() # Use agent.start() to launch the agent in a separate thread agent.shutdown() except Exception, e: print "Exception Caught:", e