diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-27 16:36:37 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-01-27 16:36:37 +0000 |
commit | 1e24432f3f0333890334648d808410fb3e9535cd (patch) | |
tree | cf6ee8c6cc686acc692f2efced6f36d2d0518997 /python/qmf2/console.py | |
parent | a7ff22a37baac189c1f433fe7785bd3a637953b1 (diff) | |
download | qpid-python-1e24432f3f0333890334648d808410fb3e9535cd.tar.gz |
QPID-2261: make code compliant with Python PEP-8 style
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r-- | python/qmf2/console.py | 92 |
1 files changed, 46 insertions, 46 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py index 6e469a24ac..94d0fd7583 100644 --- a/python/qmf2/console.py +++ b/python/qmf2/console.py @@ -30,7 +30,7 @@ from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError -from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, +from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, QmfQueryPredicate, MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent) @@ -171,7 +171,7 @@ class SequencedWaiter(object): self.lock.release() - def isValid(self, seq): + def is_valid(self, seq): """ True if seq is in use, else False (seq is unknown) """ @@ -254,7 +254,7 @@ class QmfConsoleData(QmfData): oid = self.get_object_id() query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, self.get_object_id()) - obj_list = self._agent._console.doQuery(self._agent, query, + obj_list = self._agent._console.do_query(self._agent, query, timeout=_timeout) if obj_list is None or len(obj_list) != 1: return None @@ -314,7 +314,7 @@ class QmfConsoleData(QmfData): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._agent._sendMethodReq(_map, handle) + self._agent._send_method_req(_map, handle) except SendError, e: logging.error(str(e)) self._agent._console._req_correlation.release(handle) @@ -392,10 +392,10 @@ class Agent(object): def get_name(self): return self._name - def isActive(self): + def is_active(self): return self._announce_timestamp != None - def _sendMsg(self, msg, correlation_id=None): + def _send_msg(self, msg, correlation_id=None): """ Low-level routine to asynchronously send a message to this agent. """ @@ -486,7 +486,7 @@ class Agent(object): logging.debug("Sending method req to Agent (%s)" % time.time()) try: - self._sendMethodReq(_map, handle) + self._send_method_req(_map, handle) except SendError, e: logging.error(str(e)) self._console._req_correlation.release(handle) @@ -526,22 +526,22 @@ class Agent(object): def __str__(self): return self.__repr__() - def _sendQuery(self, query, correlation_id=None): + def _send_query(self, query, correlation_id=None): """ """ msg = Message(properties={"method":"request", - "qmf.subject":makeSubject(OpCode.get_query)}, + "qmf.subject":make_subject(OpCode.get_query)}, content={MsgKey.query: query.map_encode()}) - self._sendMsg( msg, correlation_id ) + self._send_msg( msg, correlation_id ) - def _sendMethodReq(self, mr_map, correlation_id=None): + def _send_method_req(self, mr_map, correlation_id=None): """ """ msg = Message(properties={"method":"request", - "qmf.subject":makeSubject(OpCode.method_req)}, + "qmf.subject":make_subject(OpCode.method_req)}, content=mr_map) - self._sendMsg( msg, correlation_id ) + self._send_msg( msg, correlation_id ) ##============================================================================== @@ -645,12 +645,12 @@ class Console(Thread): """ logging.debug("Destroying Console...") if self._conn: - self.removeConnection(self._conn, timeout) + self.remove_connection(self._conn, timeout) logging.debug("Console Destroyed") - def addConnection(self, conn): + def add_connection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the connection. The console will then broadcast an Agent Locate Indication over @@ -707,7 +707,7 @@ class Console(Thread): - def removeConnection(self, conn, timeout=None): + def remove_connection(self, conn, timeout=None): """ Remove an AMQP connection from the console. Un-does the add_connection() operation, and releases any agents and sessions associated with the connection. @@ -725,7 +725,7 @@ class Console(Thread): logging.debug("Sending noop to wake up [%s]" % self._address) try: msg = Message(properties={"method":"request", - "qmf.subject":makeSubject(OpCode.noop)}, + "qmf.subject":make_subject(OpCode.noop)}, subject=self._name, content={"noop":"noop"}) self._direct_sender.send( msg, sync=True ) @@ -752,7 +752,7 @@ class Console(Thread): return self._address - def destroyAgent( self, agent ): + def destroy_agent( self, agent ): """ Undoes create. """ @@ -789,7 +789,7 @@ class Console(Thread): query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) msg = Message(subject="console.ind.locate." + name, properties={"method":"request", - "qmf.subject":makeSubject(OpCode.agent_locate)}, + "qmf.subject":make_subject(OpCode.agent_locate)}, content={MsgKey.query: query.map_encode()}) msg.reply_to = str(self._address) msg.correlation_id = str(handle) @@ -844,7 +844,7 @@ class Console(Thread): return agent - def doQuery(self, agent, query, timeout=None ): + def do_query(self, agent, query, timeout=None ): """ """ @@ -854,7 +854,7 @@ class Console(Thread): raise Exception("Can not allocate a correlation id!") try: logging.debug("Sending Query to Agent (%s)" % time.time()) - agent._sendQuery(query, handle) + agent._send_query(query, handle) except SendError, e: logging.error(str(e)) self._req_correlation.release(handle) @@ -953,7 +953,7 @@ class Console(Thread): # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) - self._expireAgents() # check for expired agents + self._expire_agents() # check for expired agents #if qLen == 0 and self._work_q.qsize() and self._notifier: if self._work_q_put and self._notifier: @@ -1028,7 +1028,7 @@ class Console(Thread): agent_list = _agents # @todo validate this list! - # @todo: fix when async doQuery done - query all agents at once, then + # @todo: fix when async do_query done - query all agents at once, then # wait for replies, instead of per-agent querying.... if _timeout is None: @@ -1037,13 +1037,13 @@ class Console(Thread): obj_list = [] expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout) for agent in agent_list: - if not agent.isActive(): + if not agent.is_active(): continue now = datetime.datetime.utcnow() if now >= expired: break timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds - reply = self.doQuery(agent, query, timeout) + reply = self.do_query(agent, query, timeout) if reply: obj_list = obj_list + reply @@ -1061,7 +1061,7 @@ class Console(Thread): """ logging.debug( "Message received from Agent! [%s]" % msg ) try: - version,opcode = parseSubject(msg.properties.get("qmf.subject")) + version,opcode = parse_subject(msg.properties.get("qmf.subject")) # @todo: deal with version mismatch!!! except: logging.error("Ignoring unrecognized message '%s'" % msg) @@ -1074,17 +1074,17 @@ class Console(Thread): props = msg.properties if opcode == OpCode.agent_ind: - self._handleAgentIndMsg( msg, cmap, version, _direct ) + self._handle_agent_ind_msg( msg, cmap, version, _direct ) elif opcode == OpCode.data_ind: - self._handleDataIndMsg(msg, cmap, version, _direct) + self._handle_data_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.event_ind: - self._handleEventIndMsg(msg, cmap, version, _direct) + self._handle_event_ind_msg(msg, cmap, version, _direct) elif opcode == OpCode.managed_object: logging.warning("!!! managed_object TBD !!!") elif opcode == OpCode.object_ind: logging.warning("!!! object_ind TBD !!!") elif opcode == OpCode.response: - self._handleResponseMsg(msg, cmap, version, _direct) + self._handle_response_msg(msg, cmap, version, _direct) elif opcode == OpCode.schema_ind: logging.warning("!!! schema_ind TBD !!!") elif opcode == OpCode.noop: @@ -1093,12 +1093,12 @@ class Console(Thread): logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) - def _handleAgentIndMsg(self, msg, cmap, version, direct): + def _handle_agent_ind_msg(self, msg, cmap, version, direct): """ Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ - logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time())) + logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) ai_map = cmap.get(MsgKey.agent_info) if not ai_map or not isinstance(ai_map, type({})): @@ -1116,7 +1116,7 @@ class Console(Thread): agent_query = self._agent_discovery_filter if msg.correlation_id: - correlated = self._req_correlation.isValid(msg.correlation_id) + correlated = self._req_correlation.is_valid(msg.correlation_id) if direct and correlated: ignore = False @@ -1134,7 +1134,7 @@ class Console(Thread): if not agent: # need to create and add a new agent - agent = self._createAgent(name) + agent = self._create_agent(name) if not agent: return # failed to add agent @@ -1160,13 +1160,13 @@ class Console(Thread): - def _handleDataIndMsg(self, msg, cmap, version, direct): + def _handle_data_ind_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ - logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) + logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.isValid(msg.correlation_id): + if not self._req_correlation.is_valid(msg.correlation_id): logging.debug("Data indicate received with unknown correlation_id" " msg='%s'" % str(msg)) return @@ -1176,14 +1176,14 @@ class Console(Thread): self._req_correlation.put_data(msg.correlation_id, msg) - def _handleResponseMsg(self, msg, cmap, version, direct): + def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ # @todo code replication - clean me. - logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time())) + logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) - if not self._req_correlation.isValid(msg.correlation_id): + if not self._req_correlation.is_valid(msg.correlation_id): logging.debug("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return @@ -1192,7 +1192,7 @@ class Console(Thread): logging.debug("waking waiters for correlation id %s" % msg.correlation_id) self._req_correlation.put_data(msg.correlation_id, msg) - def _handleEventIndMsg(self, msg, cmap, version, _direct): + def _handle_event_ind_msg(self, msg, cmap, version, _direct): ei_map = cmap.get(MsgKey.event) if not ei_map or not isinstance(ei_map, type({})): logging.warning("Bad event indication message received: '%s'" % msg) @@ -1233,7 +1233,7 @@ class Console(Thread): self._work_q_put = True - def _expireAgents(self): + def _expire_agents(self): """ Check for expired agents and issue notifications when they expire. """ @@ -1267,7 +1267,7 @@ class Console(Thread): - def _createAgent( self, name ): + def _create_agent( self, name ): """ Factory to create/retrieve an agent for this console """ @@ -1392,8 +1392,8 @@ class Console(Thread): if _agent is None: return None - # note: doQuery will add the new schema to the cache automatically. - slist = self.doQuery(_agent, + # note: do_query will add the new schema to the cache automatically. + slist = self.do_query(_agent, QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id), _timeout) if slist: @@ -1781,7 +1781,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code - from common import (qmfTypes, QmfEvent, SchemaProperty) + from common import (qmfTypes, SchemaProperty) logging.getLogger().setLevel(logging.INFO) |