summaryrefslogtreecommitdiff
path: root/python/qmf2/console.py
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-01-27 16:36:37 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-01-27 16:36:37 +0000
commit1e24432f3f0333890334648d808410fb3e9535cd (patch)
treecf6ee8c6cc686acc692f2efced6f36d2d0518997 /python/qmf2/console.py
parenta7ff22a37baac189c1f433fe7785bd3a637953b1 (diff)
downloadqpid-python-1e24432f3f0333890334648d808410fb3e9535cd.tar.gz
QPID-2261: make code compliant with Python PEP-8 style
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@903717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qmf2/console.py')
-rw-r--r--python/qmf2/console.py92
1 files changed, 46 insertions, 46 deletions
diff --git a/python/qmf2/console.py b/python/qmf2/console.py
index 6e469a24ac..94d0fd7583 100644
--- a/python/qmf2/console.py
+++ b/python/qmf2/console.py
@@ -30,7 +30,7 @@ from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
-from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
+from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
SchemaClass, SchemaClassId, SchemaEventClass,
SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
@@ -171,7 +171,7 @@ class SequencedWaiter(object):
self.lock.release()
- def isValid(self, seq):
+ def is_valid(self, seq):
"""
True if seq is in use, else False (seq is unknown)
"""
@@ -254,7 +254,7 @@ class QmfConsoleData(QmfData):
oid = self.get_object_id()
query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
self.get_object_id())
- obj_list = self._agent._console.doQuery(self._agent, query,
+ obj_list = self._agent._console.do_query(self._agent, query,
timeout=_timeout)
if obj_list is None or len(obj_list) != 1:
return None
@@ -314,7 +314,7 @@ class QmfConsoleData(QmfData):
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._agent._sendMethodReq(_map, handle)
+ self._agent._send_method_req(_map, handle)
except SendError, e:
logging.error(str(e))
self._agent._console._req_correlation.release(handle)
@@ -392,10 +392,10 @@ class Agent(object):
def get_name(self):
return self._name
- def isActive(self):
+ def is_active(self):
return self._announce_timestamp != None
- def _sendMsg(self, msg, correlation_id=None):
+ def _send_msg(self, msg, correlation_id=None):
"""
Low-level routine to asynchronously send a message to this agent.
"""
@@ -486,7 +486,7 @@ class Agent(object):
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._sendMethodReq(_map, handle)
+ self._send_method_req(_map, handle)
except SendError, e:
logging.error(str(e))
self._console._req_correlation.release(handle)
@@ -526,22 +526,22 @@ class Agent(object):
def __str__(self):
return self.__repr__()
- def _sendQuery(self, query, correlation_id=None):
+ def _send_query(self, query, correlation_id=None):
"""
"""
msg = Message(properties={"method":"request",
- "qmf.subject":makeSubject(OpCode.get_query)},
+ "qmf.subject":make_subject(OpCode.get_query)},
content={MsgKey.query: query.map_encode()})
- self._sendMsg( msg, correlation_id )
+ self._send_msg( msg, correlation_id )
- def _sendMethodReq(self, mr_map, correlation_id=None):
+ def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
msg = Message(properties={"method":"request",
- "qmf.subject":makeSubject(OpCode.method_req)},
+ "qmf.subject":make_subject(OpCode.method_req)},
content=mr_map)
- self._sendMsg( msg, correlation_id )
+ self._send_msg( msg, correlation_id )
##==============================================================================
@@ -645,12 +645,12 @@ class Console(Thread):
"""
logging.debug("Destroying Console...")
if self._conn:
- self.removeConnection(self._conn, timeout)
+ self.remove_connection(self._conn, timeout)
logging.debug("Console Destroyed")
- def addConnection(self, conn):
+ def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
connection. The console will then broadcast an Agent Locate Indication over
@@ -707,7 +707,7 @@ class Console(Thread):
- def removeConnection(self, conn, timeout=None):
+ def remove_connection(self, conn, timeout=None):
"""
Remove an AMQP connection from the console. Un-does the add_connection() operation,
and releases any agents and sessions associated with the connection.
@@ -725,7 +725,7 @@ class Console(Thread):
logging.debug("Sending noop to wake up [%s]" % self._address)
try:
msg = Message(properties={"method":"request",
- "qmf.subject":makeSubject(OpCode.noop)},
+ "qmf.subject":make_subject(OpCode.noop)},
subject=self._name,
content={"noop":"noop"})
self._direct_sender.send( msg, sync=True )
@@ -752,7 +752,7 @@ class Console(Thread):
return self._address
- def destroyAgent( self, agent ):
+ def destroy_agent( self, agent ):
"""
Undoes create.
"""
@@ -789,7 +789,7 @@ class Console(Thread):
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject="console.ind.locate." + name,
properties={"method":"request",
- "qmf.subject":makeSubject(OpCode.agent_locate)},
+ "qmf.subject":make_subject(OpCode.agent_locate)},
content={MsgKey.query: query.map_encode()})
msg.reply_to = str(self._address)
msg.correlation_id = str(handle)
@@ -844,7 +844,7 @@ class Console(Thread):
return agent
- def doQuery(self, agent, query, timeout=None ):
+ def do_query(self, agent, query, timeout=None ):
"""
"""
@@ -854,7 +854,7 @@ class Console(Thread):
raise Exception("Can not allocate a correlation id!")
try:
logging.debug("Sending Query to Agent (%s)" % time.time())
- agent._sendQuery(query, handle)
+ agent._send_query(query, handle)
except SendError, e:
logging.error(str(e))
self._req_correlation.release(handle)
@@ -953,7 +953,7 @@ class Console(Thread):
# (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
- self._expireAgents() # check for expired agents
+ self._expire_agents() # check for expired agents
#if qLen == 0 and self._work_q.qsize() and self._notifier:
if self._work_q_put and self._notifier:
@@ -1028,7 +1028,7 @@ class Console(Thread):
agent_list = _agents
# @todo validate this list!
- # @todo: fix when async doQuery done - query all agents at once, then
+ # @todo: fix when async do_query done - query all agents at once, then
# wait for replies, instead of per-agent querying....
if _timeout is None:
@@ -1037,13 +1037,13 @@ class Console(Thread):
obj_list = []
expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
for agent in agent_list:
- if not agent.isActive():
+ if not agent.is_active():
continue
now = datetime.datetime.utcnow()
if now >= expired:
break
timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
- reply = self.doQuery(agent, query, timeout)
+ reply = self.do_query(agent, query, timeout)
if reply:
obj_list = obj_list + reply
@@ -1061,7 +1061,7 @@ class Console(Thread):
"""
logging.debug( "Message received from Agent! [%s]" % msg )
try:
- version,opcode = parseSubject(msg.properties.get("qmf.subject"))
+ version,opcode = parse_subject(msg.properties.get("qmf.subject"))
# @todo: deal with version mismatch!!!
except:
logging.error("Ignoring unrecognized message '%s'" % msg)
@@ -1074,17 +1074,17 @@ class Console(Thread):
props = msg.properties
if opcode == OpCode.agent_ind:
- self._handleAgentIndMsg( msg, cmap, version, _direct )
+ self._handle_agent_ind_msg( msg, cmap, version, _direct )
elif opcode == OpCode.data_ind:
- self._handleDataIndMsg(msg, cmap, version, _direct)
+ self._handle_data_ind_msg(msg, cmap, version, _direct)
elif opcode == OpCode.event_ind:
- self._handleEventIndMsg(msg, cmap, version, _direct)
+ self._handle_event_ind_msg(msg, cmap, version, _direct)
elif opcode == OpCode.managed_object:
logging.warning("!!! managed_object TBD !!!")
elif opcode == OpCode.object_ind:
logging.warning("!!! object_ind TBD !!!")
elif opcode == OpCode.response:
- self._handleResponseMsg(msg, cmap, version, _direct)
+ self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.schema_ind:
logging.warning("!!! schema_ind TBD !!!")
elif opcode == OpCode.noop:
@@ -1093,12 +1093,12 @@ class Console(Thread):
logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
- def _handleAgentIndMsg(self, msg, cmap, version, direct):
+ def _handle_agent_ind_msg(self, msg, cmap, version, direct):
"""
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
- logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
+ logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
ai_map = cmap.get(MsgKey.agent_info)
if not ai_map or not isinstance(ai_map, type({})):
@@ -1116,7 +1116,7 @@ class Console(Thread):
agent_query = self._agent_discovery_filter
if msg.correlation_id:
- correlated = self._req_correlation.isValid(msg.correlation_id)
+ correlated = self._req_correlation.is_valid(msg.correlation_id)
if direct and correlated:
ignore = False
@@ -1134,7 +1134,7 @@ class Console(Thread):
if not agent:
# need to create and add a new agent
- agent = self._createAgent(name)
+ agent = self._create_agent(name)
if not agent:
return # failed to add agent
@@ -1160,13 +1160,13 @@ class Console(Thread):
- def _handleDataIndMsg(self, msg, cmap, version, direct):
+ def _handle_data_ind_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
+ logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.isValid(msg.correlation_id):
+ if not self._req_correlation.is_valid(msg.correlation_id):
logging.debug("Data indicate received with unknown correlation_id"
" msg='%s'" % str(msg))
return
@@ -1176,14 +1176,14 @@ class Console(Thread):
self._req_correlation.put_data(msg.correlation_id, msg)
- def _handleResponseMsg(self, msg, cmap, version, direct):
+ def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
# @todo code replication - clean me.
- logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
+ logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.isValid(msg.correlation_id):
+ if not self._req_correlation.is_valid(msg.correlation_id):
logging.debug("Response msg received with unknown correlation_id"
" msg='%s'" % str(msg))
return
@@ -1192,7 +1192,7 @@ class Console(Thread):
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
- def _handleEventIndMsg(self, msg, cmap, version, _direct):
+ def _handle_event_ind_msg(self, msg, cmap, version, _direct):
ei_map = cmap.get(MsgKey.event)
if not ei_map or not isinstance(ei_map, type({})):
logging.warning("Bad event indication message received: '%s'" % msg)
@@ -1233,7 +1233,7 @@ class Console(Thread):
self._work_q_put = True
- def _expireAgents(self):
+ def _expire_agents(self):
"""
Check for expired agents and issue notifications when they expire.
"""
@@ -1267,7 +1267,7 @@ class Console(Thread):
- def _createAgent( self, name ):
+ def _create_agent( self, name ):
"""
Factory to create/retrieve an agent for this console
"""
@@ -1392,8 +1392,8 @@ class Console(Thread):
if _agent is None:
return None
- # note: doQuery will add the new schema to the cache automatically.
- slist = self.doQuery(_agent,
+ # note: do_query will add the new schema to the cache automatically.
+ slist = self.do_query(_agent,
QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
_timeout)
if slist:
@@ -1781,7 +1781,7 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
- from common import (qmfTypes, QmfEvent, SchemaProperty)
+ from common import (qmfTypes, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)