summaryrefslogtreecommitdiff
path: root/qpid/extras/qmf/src/py/qmf2/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/extras/qmf/src/py/qmf2/console.py')
-rw-r--r--qpid/extras/qmf/src/py/qmf2/console.py284
1 files changed, 144 insertions, 140 deletions
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py
index afd20c3655..b62aa7342b 100644
--- a/qpid/extras/qmf/src/py/qmf2/console.py
+++ b/qpid/extras/qmf/src/py/qmf2/console.py
@@ -18,11 +18,11 @@
#
import sys
import os
-import logging
import platform
import time
import datetime
import Queue
+from logging import getLogger
from threading import Thread, Event
from threading import RLock
from threading import currentThread
@@ -41,6 +41,8 @@ from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
_callback_thread=None
+log = getLogger("qmf")
+trace = getLogger("qmf.console")
##==============================================================================
@@ -213,6 +215,7 @@ class _QueryMailbox(_AsyncMailbox):
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
+ trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name)
objects = reply.content
if isinstance(objects, type([])):
# convert from map to native types if needed
@@ -253,7 +256,7 @@ class _QueryMailbox(_AsyncMailbox):
self.result += objects
if not "partial" in reply.properties:
- # logging.error("QUERY COMPLETE for %s" % str(self.context))
+ # log.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
self.console._work_q_put = True
@@ -262,8 +265,7 @@ class _QueryMailbox(_AsyncMailbox):
def expire(self):
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name)
# send along whatever (possibly none) has been received so far
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -291,6 +293,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
"""
Process schema response messages.
"""
+ trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id)
done = False
schemas = reply.content
if schemas and isinstance(schemas, type([])):
@@ -309,6 +312,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
def expire(self):
+ trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id)
self.destroy()
@@ -332,10 +336,10 @@ class _MethodMailbox(_AsyncMailbox):
Process method response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
-
+ trace.debug("Delivering to method mailbox.")
_map = reply.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
result = None
else:
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -358,8 +362,7 @@ class _MethodMailbox(_AsyncMailbox):
The mailbox expired without receiving a reply.
Invoked by the Console Management thread only.
"""
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring method mailbox.")
# send along an empty response
wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
self.console._work_q.put(wi)
@@ -391,30 +394,30 @@ class _SubscriptionMailbox(_AsyncMailbox):
def subscribe(self, query):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("subscribed failed - unknown agent '%s'" %
+ log.warning("subscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
agent._send_subscribe_req(query, self.get_address(), self.interval,
self.duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
def resubscribe(self, duration):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("resubscribed failed - unknown agent '%s'" %
+ log.warning("resubscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
agent._send_resubscribe_req(self.get_address(),
self.agent_subscription_id, duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
@@ -430,7 +433,7 @@ class _SubscriptionMailbox(_AsyncMailbox):
try:
e_map = QmfData.from_map(error)
except TypeError:
- logging.warning("Invalid QmfData map received: '%s'"
+ log.warning("Invalid QmfData map received: '%s'"
% str(error))
e_map = QmfData.create({"error":"Unknown error"})
sp = SubscribeParams(None, None, None, e_map)
@@ -456,12 +459,12 @@ class _SubscriptionMailbox(_AsyncMailbox):
# else: data indication
agent_name = msg.properties.get("qmf.agent")
if not agent_name:
- logging.warning("Ignoring data_ind - no agent name given: %s" %
+ log.warning("Ignoring data_ind - no agent name given: %s" %
msg)
return
agent = self.console.get_agent(agent_name)
if not agent:
- logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ log.warning("Ignoring data_ind - unknown agent '%s'" %
agent_name)
return
@@ -625,7 +628,7 @@ class QmfConsoleData(QmfData):
contents.
"""
if _reply_handle is not None:
- logging.error(" ASYNC REFRESH TBD!!!")
+ log.error(" ASYNC REFRESH TBD!!!")
return None
assert self._agent
@@ -677,28 +680,28 @@ class QmfConsoleData(QmfData):
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._agent._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -751,7 +754,7 @@ class Agent(object):
self._packages = {} # map of {package-name:[list of class-names], } for this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
self._announce_timestamp = None # datetime when last announce received
- logging.debug( "Created Agent with address: [%s]" % self._address )
+ trace.debug( "Created Agent with address: [%s]" % self._address )
def get_name(self):
@@ -768,7 +771,7 @@ class Agent(object):
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
- #logging.error("!!! Console %s sending to agent %s (%s)" %
+ #log.error("!!! Console %s sending to agent %s (%s)" %
# (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
@@ -846,28 +849,28 @@ class Agent(object):
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
@@ -1076,10 +1079,10 @@ class Console(Thread):
@type timeout: float
@param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
"""
- logging.debug("Destroying Console...")
+ trace.debug("Destroying Console...")
if self._conn:
self.remove_connection(self._conn, timeout)
- logging.debug("Console Destroyed")
+ trace.debug("Console Destroyed")
def add_connection(self, conn):
"""
@@ -1103,7 +1106,7 @@ class Console(Thread):
" x-properties:"
" {type:direct}}}",
capacity=1)
- logging.debug("my direct addr=%s" % self._direct_recvr.source)
+ trace.debug("my direct addr=%s" % self._direct_recvr.source)
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
@@ -1111,7 +1114,7 @@ class Console(Thread):
" {type:topic,"
" x-properties:"
" {type:direct}}}")
- logging.debug("my direct sender=%s" % self._direct_sender.target)
+ trace.debug("my direct sender=%s" % self._direct_sender.target)
# for receiving "broadcast" messages from agents
default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
@@ -1120,7 +1123,7 @@ class Console(Thread):
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
- logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
+ trace.debug("default topic recv addr=%s" % self._topic_recvr.source)
# for sending to topic subscribers
@@ -1128,7 +1131,7 @@ class Console(Thread):
self._topic_sender = self._session.sender(str(topic_addr) +
";{create:always,"
" node-properties:{type:topic}}")
- logging.debug("default topic send addr=%s" % self._topic_sender.target)
+ trace.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
@@ -1150,17 +1153,17 @@ class Console(Thread):
@param conn: connection previously added by add_connection()
"""
if self._conn and conn and conn != self._conn:
- logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+ log.error( "Attempt to delete unknown connection: %s" % str(conn))
# tell connection thread to shutdown
self._operational = False
if self.isAlive():
# kick my thread to wake it up
self._wake_thread()
- logging.debug("waiting for console receiver thread to exit")
+ trace.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
- logging.error( "Console thread '%s' is hung..." % self.getName() )
+ log.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
self._direct_sender.close()
self._topic_recvr.close()
@@ -1168,7 +1171,7 @@ class Console(Thread):
self._session.close()
self._session = None
self._conn = None
- logging.debug("console connection removal complete")
+ trace.debug("console connection removal complete")
def get_address(self):
@@ -1219,14 +1222,14 @@ class Console(Thread):
content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
- logging.debug("Sending Agent Locate (%s)" % time.time())
+ trace.debug("Sending Agent Locate (%s)" % time.time())
# TRACE
- #logging.error("!!! Console %s sending agent locate (%s)" %
+ #log.error("!!! Console %s sending agent locate (%s)" %
# (self._name, str(msg)))
try:
self._topic_sender.send(msg)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1234,10 +1237,10 @@ class Console(Thread):
timeout = self._reply_timeout
new_agent = None
- logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
+ trace.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
mbox.destroy()
- logging.debug("Agent Locate wait ended (%s)" % time.time())
+ trace.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
@@ -1288,10 +1291,10 @@ class Console(Thread):
cid = mbox.get_address()
try:
- logging.debug("Sending Query to Agent (%s)" % time.time())
+ trace.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1302,7 +1305,7 @@ class Console(Thread):
if not _timeout:
_timeout = self._reply_timeout
- logging.debug("Waiting for response to Query (%s)" % _timeout)
+ trace.debug("Waiting for response to Query (%s)" % _timeout)
now = datetime.datetime.utcnow()
expire = now + datetime.timedelta(seconds=_timeout)
@@ -1311,7 +1314,7 @@ class Console(Thread):
_timeout = timedelta_to_secs(expire - now)
reply = mbox.fetch(_timeout)
if not reply:
- logging.debug("Query wait timed-out.")
+ trace.debug("Query wait timed-out.")
break
objects = reply.content
@@ -1383,12 +1386,12 @@ class Console(Thread):
mbox.destroy()
return None
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
# @todo: what if mbox expires here?
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("Subscription request wait timed-out.")
+ trace.debug("Subscription request wait timed-out.")
mbox.destroy()
return None
@@ -1405,7 +1408,7 @@ class Console(Thread):
mbox = self._get_mailbox(subscription_id)
if not mbox:
- logging.warning("Subscription %s not found." % subscription_id)
+ log.warning("Subscription %s not found." % subscription_id)
return None
if isinstance(mbox, _AsyncSubscriptionMailbox):
@@ -1418,11 +1421,11 @@ class Console(Thread):
# wait for reply
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("re-subscribe request wait timed-out.")
+ trace.debug("re-subscribe request wait timed-out.")
# @todo???? mbox.destroy()
return None
@@ -1439,11 +1442,11 @@ class Console(Thread):
agent = self.get_agent(mbox.agent_name)
if agent:
try:
- logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ trace.debug("Sending UnSubscribe to Agent (%s)" % time.time())
agent._send_unsubscribe_ind(subscription_id,
mbox.agent_subscription_id)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
@@ -1453,16 +1456,16 @@ class Console(Thread):
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
- logging.debug("Sending noop to wake up [%s]" % self._address)
+ trace.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self._name,
- properties={"method":"request",
+ properties={"method":"indication",
"qmf.opcode":OpCode.noop},
content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
def run(self):
@@ -1484,7 +1487,7 @@ class Console(Thread):
except Empty:
break
# TRACE:
- # logging.error("!!! Console %s: msg on %s [%s]" %
+ # log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
@@ -1494,7 +1497,7 @@ class Console(Thread):
except Empty:
break
# TRACE
- #logging.error("!!! Console %s: msg on %s [%s]" %
+ #log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
@@ -1506,36 +1509,37 @@ class Console(Thread):
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
- logging.info("Calling console notifier.indication")
+ trace.debug("Calling console notifier.indication")
self._notifier.indication()
_callback_thread = None
- if self._operational:
- # wait for a message to arrive, or an agent
- # to expire, or a mailbox requrest to time out
- now = datetime.datetime.utcnow()
- next_expire = self._next_agent_expire
- # the mailbox expire flag may be cleared by the
- # app thread(s)
- self._lock.acquire()
- try:
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
- finally:
- self._lock.release()
+ # wait for a message to arrive, or an agent
+ # to expire, or a mailbox requrest to time out
+ now = datetime.datetime.utcnow()
+ next_expire = self._next_agent_expire
- if next_expire > now:
- timeout = timedelta_to_secs(next_expire - now)
- try:
- logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
- xxx = self._session.next_receiver(timeout = timeout)
- except Empty:
- pass
+ self._lock.acquire()
+ try:
+ # the mailbox expire flag may be cleared by the
+ # app thread(s) to force an immedate mailbox scan
+ if self._next_mbox_expire is None:
+ next_expire = now
+ elif self._next_mbox_expire < next_expire:
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+ timeout = timedelta_to_secs(next_expire - now)
+
+ if self._operational and timeout > 0.0:
+ try:
+ trace.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+ self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
- logging.debug("Shutting down Console thread")
+ trace.debug("Shutting down Console thread")
def get_objects(self,
_object_id=None,
@@ -1639,12 +1643,11 @@ class Console(Thread):
"""
PRIVATE: Process a message received from an Agent
"""
- #logging.debug( "Message received from Agent! [%s]" % msg )
- #logging.error( "Message received from Agent! [%s]" % msg )
+ trace.debug( "Message received from Agent! [%s]" % msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
- logging.error("Ignoring unrecognized message '%s'" % msg)
+ log.error("Ignoring unrecognized message '%s'" % msg)
return
version = 2 # @todo: fix me
@@ -1672,9 +1675,9 @@ class Console(Thread):
else:
self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
- logging.debug("No-op msg received.")
+ trace.debug("No-op msg received.")
else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
def _handle_agent_ind_msg(self, msg, cmap, version, direct):
@@ -1682,15 +1685,15 @@ class Console(Thread):
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
- logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
- logging.warning("Bad agent-ind message received: '%s'" % msg)
+ log.warning("Bad agent-ind message received: '%s'" % msg)
return
name = ai_map.get("_name")
if not name:
- logging.warning("Bad agent-ind message received: agent name missing"
+ log.warning("Bad agent-ind message received: agent name missing"
" '%s'" % msg)
return
@@ -1724,48 +1727,48 @@ class Console(Thread):
if matched:
# unsolicited, but newly discovered
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
self._work_q_put = True
if correlated:
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
- logging.debug("Response msg received with unknown correlation_id"
- " msg='%s'" % str(msg))
+ log.warning("Response msg received with unknown correlation_id"
+ " msg='%s'" % str(msg))
return
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_indication_msg(self, msg, cmap, version, _direct):
aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No agent name field in indication message.")
+ trace.debug("No agent name field in indication message.")
return
content_type = msg.properties.get("qmf.content")
if (content_type != ContentType.event or
not isinstance(msg.content, type([]))):
- logging.warning("Bad event indication message received: '%s'" % msg)
+ log.warning("Bad event indication message received: '%s'" % msg)
return
emap = msg.content[0]
if not isinstance(emap, type({})):
- logging.debug("Invalid event body in indication message: '%s'" % msg)
+ trace.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1775,18 +1778,18 @@ class Console(Thread):
finally:
self._lock.release()
if not agent:
- logging.debug("Agent '%s' not known." % aname)
+ trace.debug("Agent '%s' not known." % aname)
return
try:
# @todo: schema???
event = QmfEvent.from_map(emap)
except TypeError:
- logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+ trace.debug("Invalid QmfEvent map received: %s" % str(emap))
return
# @todo: schema? Need to fetch it, but not from this thread!
# This thread can not pend on a request.
- logging.debug("Publishing event received from agent %s" % aname)
+ trace.debug("Publishing event received from agent %s" % aname)
wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
{"agent":agent,
"event":event})
@@ -1835,12 +1838,12 @@ class Console(Thread):
next_expire_delta = lifetime_delta
self._lock.acquire()
try:
- logging.debug("!!! expiring agents '%s'" % now)
+ trace.debug("!!! expiring agents '%s'" % now)
for agent in self._agent_map.itervalues():
if agent._announce_timestamp:
agent_deathtime = agent._announce_timestamp + lifetime_delta
if agent_deathtime <= now:
- logging.debug("AGENT_DELETED for %s" % agent)
+ trace.debug("AGENT_DELETED for %s" % agent)
agent._announce_timestamp = None
wi = WorkItem(WorkItem.AGENT_DELETED, None,
{"agent":agent})
@@ -1852,7 +1855,7 @@ class Console(Thread):
next_expire_delta = agent_deathtime - now
self._next_agent_expire = now + next_expire_delta
- logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+ trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
finally:
self._lock.release()
@@ -1862,7 +1865,7 @@ class Console(Thread):
"""
Factory to create/retrieve an agent for this console
"""
- logging.debug("creating agent %s" % name)
+ trace.debug("creating agent %s" % name)
self._lock.acquire()
try:
agent = self._agent_map.get(name)
@@ -1878,9 +1881,9 @@ class Console(Thread):
" x-properties:"
" {type:direct}}}")
except:
- logging.warning("Unable to create sender for %s" % name)
+ log.warning("Unable to create sender for %s" % name)
return None
- logging.debug("created agent sender %s" % agent._sender.target)
+ trace.debug("created agent sender %s" % agent._sender.target)
self._agent_map[name] = agent
finally:
@@ -1984,11 +1987,11 @@ class Console(Thread):
if need_fetch:
mbox = _SchemaPrefetchMailbox(self, schema_id)
query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
- logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+ trace.debug("Sending Schema Query to Agent (%s)" % time.time())
try:
agent._send_query(query, mbox.get_address())
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
self._lock.acquire()
try:
@@ -2041,7 +2044,7 @@ class Console(Thread):
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2056,7 +2059,7 @@ class Console(Thread):
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2242,36 +2245,36 @@ class Console(Thread):
# count += 1
# try:
# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
-# logging.debug("Console Event AGENT_ADDED received")
+# trace.debug("Console Event AGENT_ADDED received")
# if self._handler:
# self._handler.agent_added(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
-# logging.debug("Console Event AGENT_DELETED received")
+# trace.debug("Console Event AGENT_DELETED received")
# if self._handler:
# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
-# logging.debug("Console Event NEW_PACKAGE received")
+# trace.debug("Console Event NEW_PACKAGE received")
# if self._handler:
# self._handler.new_package(self._event.name)
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
-# logging.debug("Console Event NEW_CLASS received")
+# trace.debug("Console Event NEW_CLASS received")
# if self._handler:
# self._handler.new_class(SchemaClassKey(self._event.classKey))
# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
-# logging.debug("Console Event OBJECT_UPDATE received")
+# trace.debug("Console Event OBJECT_UPDATE received")
# if self._handler:
# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
# self._event.hasProps, self._event.hasStats)
# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
-# logging.debug("Console Event EVENT_RECEIVED received")
+# trace.debug("Console Event EVENT_RECEIVED received")
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
-# logging.debug("Console Event AGENT_HEARTBEAT received")
+# trace.debug("Console Event AGENT_HEARTBEAT received")
# if self._handler:
# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
-# logging.debug("Console Event METHOD_RESPONSE received")
+# trace.debug("Console Event METHOD_RESPONSE received")
# else:
-# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
# except e:
# print "Exception caught in callback thread:", e
# self.impl.popEvent()
@@ -2300,17 +2303,17 @@ class Console(Thread):
# def shutdown(self):
-# logging.debug("broker.shutdown() called.")
+# trace.debug("broker.shutdown() called.")
# self.console.impl.delConnection(self.impl)
# self.conn.del_conn_handler(self)
# if self._session:
# self.impl.sessionClosed()
-# logging.debug("broker.shutdown() sessionClosed done.")
+# trace.debug("broker.shutdown() sessionClosed done.")
# self._session.destroy()
-# logging.debug("broker.shutdown() session destroy done.")
+# trace.debug("broker.shutdown() session destroy done.")
# self._session = None
# self._operational = False
-# logging.debug("broker.shutdown() done.")
+# trace.debug("broker.shutdown() done.")
# def wait_for_stable(self, timeout = None):
@@ -2343,24 +2346,24 @@ class Console(Thread):
# while valid:
# count += 1
# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
-# logging.debug("Broker Event BROKER_INFO received");
+# trace.debug("Broker Event BROKER_INFO received");
# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
-# logging.debug("Broker Event DECLARE_QUEUE received");
+# trace.debug("Broker Event DECLARE_QUEUE received");
# self.conn.impl.declareQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
-# logging.debug("Broker Event DELETE_QUEUE received");
+# trace.debug("Broker Event DELETE_QUEUE received");
# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.BIND:
-# logging.debug("Broker Event BIND received");
+# trace.debug("Broker Event BIND received");
# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
-# logging.debug("Broker Event UNBIND received");
+# trace.debug("Broker Event UNBIND received");
# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
-# logging.debug("Broker Event SETUP_COMPLETE received");
+# trace.debug("Broker Event SETUP_COMPLETE received");
# self.impl.startProtocol()
# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
-# logging.debug("Broker Event STABLE received");
+# trace.debug("Broker Event STABLE received");
# self._cv.acquire()
# try:
# self._stable = True
@@ -2387,7 +2390,7 @@ class Console(Thread):
# valid = self.impl.getXmtMessage(self._xmtMessage)
# while valid:
# count += 1
-# logging.debug("Broker: sending msg on connection")
+# trace.debug("Broker: sending msg on connection")
# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
# self.impl.popXmt()
# valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -2405,14 +2408,14 @@ class Console(Thread):
# def conn_event_connected(self):
-# logging.debug("Broker: Connection event CONNECTED")
+# trace.debug("Broker: Connection event CONNECTED")
# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
# self.impl.sessionOpened(self._session.handle)
# self._do_events()
# def conn_event_disconnected(self, error):
-# logging.debug("Broker: Connection event DISCONNECTED")
+# trace.debug("Broker: Connection event DISCONNECTED")
# pass
@@ -2421,14 +2424,14 @@ class Console(Thread):
# def sess_event_session_closed(self, context, error):
-# logging.debug("Broker: Session event CLOSED")
+# trace.debug("Broker: Session event CLOSED")
# self.impl.sessionClosed()
# def sess_event_recv(self, context, message):
-# logging.debug("Broker: Session event MSG_RECV")
+# trace.debug("Broker: Session event MSG_RECV")
# if not self._operational:
-# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
# self.impl.handleRcvMessage(message)
# self._do_events()
@@ -2446,6 +2449,7 @@ class Console(Thread):
if __name__ == '__main__':
# temp test code
+ import logging
from common import (qmfTypes, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)