diff options
author | Nuno Santos <nsantos@apache.org> | 2008-01-23 19:43:03 +0000 |
---|---|---|
committer | Nuno Santos <nsantos@apache.org> | 2008-01-23 19:43:03 +0000 |
commit | bdecc9d9952d735b0fe6e00c90b11f5c8f542c32 (patch) | |
tree | 2fc2af9454809d6d17f32b4c4c78cce1308f6cda | |
parent | 86795907f60601c0127952af46886c758b541eae (diff) | |
download | qpid-python-bdecc9d9952d735b0fe6e00c90b11f5c8f542c32.tar.gz |
removed management.py.rej, which had been checked in by mistake
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@614638 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/management.py.rej | 457 |
1 files changed, 0 insertions, 457 deletions
diff --git a/qpid/python/qpid/management.py.rej b/qpid/python/qpid/management.py.rej deleted file mode 100644 index 28d172abe5..0000000000 --- a/qpid/python/qpid/management.py.rej +++ /dev/null @@ -1,457 +0,0 @@ -*************** -*** 18,24 **** - # - - """ -- Management classes for AMQP - """ - - import qpid ---- 18,24 ---- - # - - """ -+ Management API for Qpid - """ - - import qpid -*************** -*** 42,91 **** - #=================================================================== - class ManagementMetadata: - -- def parseSchema (self, cls, oid, len, codec): -- #print "Schema Record: objId=", oid - -- config = [] -- inst = [] -- while 1: -- flags = codec.decode_octet () -- if flags == 0x80: -- break - -- tc = codec.decode_octet () -- name = codec.decode_shortstr () -- desc = codec.decode_shortstr () - -- if flags & 1: # TODO: Define constants for these -- config.append ((tc, name, desc)) -- if (flags & 1) == 0 or (flags & 2) == 2: -- inst.append ((tc, name, desc)) - - # TODO: Handle notification of schema change outbound -- self.schema[(oid,'C')] = config -- self.schema[(oid,'I')] = inst - -- def parseContent (self, cls, oid, len, codec): -- #print "Content Record: Class=", cls, ", objId=", oid -- - if cls == 'C' and self.broker.config_cb == None: - return - if cls == 'I' and self.broker.inst_cb == None: - return - -- if (oid,cls) not in self.schema: - return - - row = [] - timestamps = [] - -- timestamps.append (codec.decode_longlong ()); # Current Time -- timestamps.append (codec.decode_longlong ()); # Create Time -- timestamps.append (codec.decode_longlong ()); # Delete Time - -- for element in self.schema[(oid,cls)][:]: -- tc = element[0] -- name = element[1] - if tc == 1: # TODO: Define constants for these - data = codec.decode_octet () - elif tc == 2: ---- 42,132 ---- - #=================================================================== - class ManagementMetadata: - -+ def parseSchema (self, cls, codec): -+ className = codec.decode_shortstr () -+ configCount = codec.decode_short () -+ instCount = codec.decode_short () -+ methodCount = codec.decode_short () -+ eventCount = codec.decode_short () - -+ configs = [] -+ insts = [] -+ methods = [] -+ events = [] - -+ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) -+ insts.append (("id", 4, None, None)) - -+ for idx in range (configCount): -+ ft = codec.decode_table () -+ name = ft["name"] -+ type = ft["type"] -+ access = ft["access"] -+ index = ft["index"] -+ unit = None -+ min = None -+ max = None -+ maxlen = None -+ desc = None - -+ for key, value in ft.items (): -+ if key == "unit": -+ unit = value -+ elif key == "min": -+ min = value -+ elif key == "max": -+ max = value -+ elif key == "maxlen": -+ maxlen = value -+ elif key == "desc": -+ desc = value -+ -+ config = (name, type, unit, desc, access, index, min, max, maxlen) -+ configs.append (config) -+ -+ for idx in range (instCount): -+ ft = codec.decode_table () -+ name = ft["name"] -+ type = ft["type"] -+ unit = None -+ desc = None -+ -+ for key, value in ft.items (): -+ if key == "unit": -+ unit = value -+ elif key == "desc": -+ desc = value -+ -+ inst = (name, type, unit, desc) -+ insts.append (inst) -+ - # TODO: Handle notification of schema change outbound -+ self.schema[(className,'C')] = configs -+ self.schema[(className,'I')] = insts -+ self.schema[(className,'M')] = methods -+ self.schema[(className,'E')] = events - -+ def parseContent (self, cls, codec): - if cls == 'C' and self.broker.config_cb == None: - return - if cls == 'I' and self.broker.inst_cb == None: - return - -+ className = codec.decode_shortstr () -+ -+ if (className,cls) not in self.schema: - return - - row = [] - timestamps = [] - -+ timestamps.append (codec.decode_longlong ()) # Current Time -+ timestamps.append (codec.decode_longlong ()) # Create Time -+ timestamps.append (codec.decode_longlong ()) # Delete Time - -+ for element in self.schema[(className,cls)][:]: -+ tc = element[1] -+ name = element[0] - if tc == 1: # TODO: Define constants for these - data = codec.decode_octet () - elif tc == 2: -*************** -*** 98,130 **** - data = codec.decode_octet () - elif tc == 6: - data = codec.decode_shortstr () - row.append ((name, data)) - - if cls == 'C': -- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps) -- if cls == 'I': -- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps) - - def parse (self, codec): -- try: -- opcode = chr (codec.decode_octet ()) -- except EOF: -- return 0 - -- cls = chr (codec.decode_octet ()) -- oid = codec.decode_short () -- len = codec.decode_long () -- -- if len < 8: -- raise ValueError ("parse error: value of length field too small") -- - if opcode == 'S': -- self.parseSchema (cls, oid, len, codec) - -- if opcode == 'C': -- self.parseContent (cls, oid, len, codec) - -- return 1 - - def __init__ (self, broker): - self.broker = broker ---- 139,167 ---- - data = codec.decode_octet () - elif tc == 6: - data = codec.decode_shortstr () -+ elif tc == 7: -+ data = codec.decode_longstr () -+ else: -+ raise ValueError ("Invalid type code: %d" % tc) - row.append ((name, data)) - - if cls == 'C': -+ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps) -+ elif cls == 'I': -+ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps) - - def parse (self, codec): -+ opcode = chr (codec.decode_octet ()) -+ cls = chr (codec.decode_octet ()) - - if opcode == 'S': -+ self.parseSchema (cls, codec) - -+ elif opcode == 'C': -+ self.parseContent (cls, codec) - -+ else: -+ raise ValueError ("Unknown opcode: %c" % opcode); - - def __init__ (self, broker): - self.broker = broker -*************** -*** 140,146 **** - #=================================================================== - class ManagedBroker: - -- exchange = "qpid.management" - - def checkHeader (self, codec): - octet = chr (codec.decode_octet ()) ---- 177,184 ---- - #=================================================================== - class ManagedBroker: - -+ mExchange = "qpid.management" -+ dExchange = "amq.direct" - - def checkHeader (self, codec): - octet = chr (codec.decode_octet ()) -*************** -*** 157,225 **** - return 0 - return 1 - -- def receive_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - - if self.checkHeader (codec) == 0: - raise ValueError ("outer header invalid"); - -- while self.metadata.parse (codec): -- pass - - msg.complete () - -- def __init__ (self, host = "localhost", port = 5672, -- username = "guest", password = "guest"): - -- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml") -- self.client = None -- self.channel = None -- self.queue = None -- self.qname = None -- self.metadata = ManagementMetadata (self) - - # Initialize the callback records - self.schema_cb = None - self.config_cb = None - self.inst_cb = None - - self.host = host - self.port = port - self.username = username - self.password = password - - def schemaListener (self, context, callback): - self.schema_cb = (context, callback) - - def configListener (self, context, callback): - self.config_cb = (context, callback) - - def instrumentationListener (self, context, callback): - self.inst_cb = (context, callback) - - def start (self): -- print "Connecting to broker", self.host - - try: - self.client = Client (self.host, self.port, self.spec) - self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) - self.channel = self.client.channel (1) -- response = self.channel.session_open (detached_lifetime=300) -- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id) - -- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) -- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname, -- routing_key="mgmt") -- self.channel.message_subscribe (queue=self.qname, destination="dest") -- self.queue = self.client.queue ("dest") -- self.queue.listen (self.receive_cb) - -- self.channel.message_flow_mode (destination="dest", mode=1) -- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF) -- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF) - - except socket.error, e: - print "Socket Error Detected:", e[1] - raise - except: - raise ---- 195,335 ---- - return 0 - return 1 - -+ def publish_cb (self, msg): - codec = Codec (StringIO (msg.content.body), self.spec) - - if self.checkHeader (codec) == 0: - raise ValueError ("outer header invalid"); - -+ self.metadata.parse (codec) -+ msg.complete () - -+ def reply_cb (self, msg): -+ codec = Codec (StringIO (msg.content.body), self.spec) -+ methodId = codec.decode_long () -+ status = codec.decode_long () -+ sText = codec.decode_shortstr () -+ -+ args = {} -+ if status == 0: -+ args["sequence"] = codec.decode_long () -+ args["body"] = codec.decode_longstr () -+ -+ if self.method_cb != None: -+ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args) -+ - msg.complete () - -+ def __init__ (self, -+ host = "localhost", -+ port = 5672, -+ username = "guest", -+ password = "guest", -+ specfile = "../specs/amqp.0-10-preview.xml"): - -+ self.spec = qpid.spec.load (specfile) -+ self.client = None -+ self.channel = None -+ self.queue = None -+ self.rqueue = None -+ self.qname = None -+ self.rqname = None -+ self.metadata = ManagementMetadata (self) -+ self.connected = 0 -+ self.lastConnectError = None - - # Initialize the callback records -+ self.status_cb = None - self.schema_cb = None - self.config_cb = None - self.inst_cb = None -+ self.method_cb = None - - self.host = host - self.port = port - self.username = username - self.password = password - -+ def statusListener (self, context, callback): -+ self.status_cb = (context, callback) -+ - def schemaListener (self, context, callback): - self.schema_cb = (context, callback) - - def configListener (self, context, callback): - self.config_cb = (context, callback) - -+ def methodListener (self, context, callback): -+ self.method_cb = (context, callback) -+ - def instrumentationListener (self, context, callback): - self.inst_cb = (context, callback) - -+ def method (self, methodId, objId, className, -+ methodName, args=None, packageName="qpid"): -+ codec = Codec (StringIO (), self.spec); -+ codec.encode_long (methodId) -+ codec.encode_longlong (objId) -+ codec.encode_shortstr (self.rqname) -+ -+ # TODO: Encode args according to schema -+ if methodName == "echo": -+ codec.encode_long (args["sequence"]) -+ codec.encode_longstr (args["body"]) -+ -+ msg = Content (codec.stream.getvalue ()) -+ msg["content_type"] = "application/octet-stream" -+ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName -+ msg["reply_to"] = self.spec.struct ("reply_to") -+ self.channel.message_transfer (destination="qpid.management", content=msg) -+ -+ def isConnected (self): -+ return connected -+ - def start (self): -+ print "Connecting to broker %s:%d" % (self.host, self.port) - - try: - self.client = Client (self.host, self.port, self.spec) - self.client.start ({"LOGIN": self.username, "PASSWORD": self.password}) - self.channel = self.client.channel (1) -+ response = self.channel.session_open (detached_lifetime=10) -+ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id) -+ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id) - -+ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1) -+ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1) -+ -+ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname, -+ routing_key="mgmt.#") -+ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname, -+ routing_key=self.rqname) - -+ self.channel.message_subscribe (queue=self.qname, destination="mdest") -+ self.channel.message_subscribe (queue=self.rqname, destination="rdest") - -+ self.queue = self.client.queue ("mdest") -+ self.queue.listen (self.publish_cb) -+ -+ self.channel.message_flow_mode (destination="mdest", mode=1) -+ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF) -+ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF) -+ -+ self.rqueue = self.client.queue ("rdest") -+ self.rqueue.listen (self.reply_cb) -+ -+ self.channel.message_flow_mode (destination="rdest", mode=1) -+ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF) -+ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF) -+ -+ self.connected = 1 -+ - except socket.error, e: - print "Socket Error Detected:", e[1] -+ self.lastConnectError = e - raise - except: - raise -+ -+ def stop (self): -+ pass |