summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-01-23 19:43:03 +0000
committerNuno Santos <nsantos@apache.org>2008-01-23 19:43:03 +0000
commitbdecc9d9952d735b0fe6e00c90b11f5c8f542c32 (patch)
tree2fc2af9454809d6d17f32b4c4c78cce1308f6cda
parent86795907f60601c0127952af46886c758b541eae (diff)
downloadqpid-python-bdecc9d9952d735b0fe6e00c90b11f5c8f542c32.tar.gz
removed management.py.rej, which had been checked in by mistake
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@614638 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/management.py.rej457
1 files changed, 0 insertions, 457 deletions
diff --git a/qpid/python/qpid/management.py.rej b/qpid/python/qpid/management.py.rej
deleted file mode 100644
index 28d172abe5..0000000000
--- a/qpid/python/qpid/management.py.rej
+++ /dev/null
@@ -1,457 +0,0 @@
-***************
-*** 18,24 ****
- #
-
- """
-- Management classes for AMQP
- """
-
- import qpid
---- 18,24 ----
- #
-
- """
-+ Management API for Qpid
- """
-
- import qpid
-***************
-*** 42,91 ****
- #===================================================================
- class ManagementMetadata:
-
-- def parseSchema (self, cls, oid, len, codec):
-- #print "Schema Record: objId=", oid
-
-- config = []
-- inst = []
-- while 1:
-- flags = codec.decode_octet ()
-- if flags == 0x80:
-- break
-
-- tc = codec.decode_octet ()
-- name = codec.decode_shortstr ()
-- desc = codec.decode_shortstr ()
-
-- if flags & 1: # TODO: Define constants for these
-- config.append ((tc, name, desc))
-- if (flags & 1) == 0 or (flags & 2) == 2:
-- inst.append ((tc, name, desc))
-
- # TODO: Handle notification of schema change outbound
-- self.schema[(oid,'C')] = config
-- self.schema[(oid,'I')] = inst
-
-- def parseContent (self, cls, oid, len, codec):
-- #print "Content Record: Class=", cls, ", objId=", oid
--
- if cls == 'C' and self.broker.config_cb == None:
- return
- if cls == 'I' and self.broker.inst_cb == None:
- return
-
-- if (oid,cls) not in self.schema:
- return
-
- row = []
- timestamps = []
-
-- timestamps.append (codec.decode_longlong ()); # Current Time
-- timestamps.append (codec.decode_longlong ()); # Create Time
-- timestamps.append (codec.decode_longlong ()); # Delete Time
-
-- for element in self.schema[(oid,cls)][:]:
-- tc = element[0]
-- name = element[1]
- if tc == 1: # TODO: Define constants for these
- data = codec.decode_octet ()
- elif tc == 2:
---- 42,132 ----
- #===================================================================
- class ManagementMetadata:
-
-+ def parseSchema (self, cls, codec):
-+ className = codec.decode_shortstr ()
-+ configCount = codec.decode_short ()
-+ instCount = codec.decode_short ()
-+ methodCount = codec.decode_short ()
-+ eventCount = codec.decode_short ()
-
-+ configs = []
-+ insts = []
-+ methods = []
-+ events = []
-
-+ configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
-+ insts.append (("id", 4, None, None))
-
-+ for idx in range (configCount):
-+ ft = codec.decode_table ()
-+ name = ft["name"]
-+ type = ft["type"]
-+ access = ft["access"]
-+ index = ft["index"]
-+ unit = None
-+ min = None
-+ max = None
-+ maxlen = None
-+ desc = None
-
-+ for key, value in ft.items ():
-+ if key == "unit":
-+ unit = value
-+ elif key == "min":
-+ min = value
-+ elif key == "max":
-+ max = value
-+ elif key == "maxlen":
-+ maxlen = value
-+ elif key == "desc":
-+ desc = value
-+
-+ config = (name, type, unit, desc, access, index, min, max, maxlen)
-+ configs.append (config)
-+
-+ for idx in range (instCount):
-+ ft = codec.decode_table ()
-+ name = ft["name"]
-+ type = ft["type"]
-+ unit = None
-+ desc = None
-+
-+ for key, value in ft.items ():
-+ if key == "unit":
-+ unit = value
-+ elif key == "desc":
-+ desc = value
-+
-+ inst = (name, type, unit, desc)
-+ insts.append (inst)
-+
- # TODO: Handle notification of schema change outbound
-+ self.schema[(className,'C')] = configs
-+ self.schema[(className,'I')] = insts
-+ self.schema[(className,'M')] = methods
-+ self.schema[(className,'E')] = events
-
-+ def parseContent (self, cls, codec):
- if cls == 'C' and self.broker.config_cb == None:
- return
- if cls == 'I' and self.broker.inst_cb == None:
- return
-
-+ className = codec.decode_shortstr ()
-+
-+ if (className,cls) not in self.schema:
- return
-
- row = []
- timestamps = []
-
-+ timestamps.append (codec.decode_longlong ()) # Current Time
-+ timestamps.append (codec.decode_longlong ()) # Create Time
-+ timestamps.append (codec.decode_longlong ()) # Delete Time
-
-+ for element in self.schema[(className,cls)][:]:
-+ tc = element[1]
-+ name = element[0]
- if tc == 1: # TODO: Define constants for these
- data = codec.decode_octet ()
- elif tc == 2:
-***************
-*** 98,130 ****
- data = codec.decode_octet ()
- elif tc == 6:
- data = codec.decode_shortstr ()
- row.append ((name, data))
-
- if cls == 'C':
-- self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
-- if cls == 'I':
-- self.broker.inst_cb[1] (self.broker.inst_cb[0], oid, row, timestamps)
-
- def parse (self, codec):
-- try:
-- opcode = chr (codec.decode_octet ())
-- except EOF:
-- return 0
-
-- cls = chr (codec.decode_octet ())
-- oid = codec.decode_short ()
-- len = codec.decode_long ()
--
-- if len < 8:
-- raise ValueError ("parse error: value of length field too small")
--
- if opcode == 'S':
-- self.parseSchema (cls, oid, len, codec)
-
-- if opcode == 'C':
-- self.parseContent (cls, oid, len, codec)
-
-- return 1
-
- def __init__ (self, broker):
- self.broker = broker
---- 139,167 ----
- data = codec.decode_octet ()
- elif tc == 6:
- data = codec.decode_shortstr ()
-+ elif tc == 7:
-+ data = codec.decode_longstr ()
-+ else:
-+ raise ValueError ("Invalid type code: %d" % tc)
- row.append ((name, data))
-
- if cls == 'C':
-+ self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
-+ elif cls == 'I':
-+ self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
-
- def parse (self, codec):
-+ opcode = chr (codec.decode_octet ())
-+ cls = chr (codec.decode_octet ())
-
- if opcode == 'S':
-+ self.parseSchema (cls, codec)
-
-+ elif opcode == 'C':
-+ self.parseContent (cls, codec)
-
-+ else:
-+ raise ValueError ("Unknown opcode: %c" % opcode);
-
- def __init__ (self, broker):
- self.broker = broker
-***************
-*** 140,146 ****
- #===================================================================
- class ManagedBroker:
-
-- exchange = "qpid.management"
-
- def checkHeader (self, codec):
- octet = chr (codec.decode_octet ())
---- 177,184 ----
- #===================================================================
- class ManagedBroker:
-
-+ mExchange = "qpid.management"
-+ dExchange = "amq.direct"
-
- def checkHeader (self, codec):
- octet = chr (codec.decode_octet ())
-***************
-*** 157,225 ****
- return 0
- return 1
-
-- def receive_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
-
- if self.checkHeader (codec) == 0:
- raise ValueError ("outer header invalid");
-
-- while self.metadata.parse (codec):
-- pass
-
- msg.complete ()
-
-- def __init__ (self, host = "localhost", port = 5672,
-- username = "guest", password = "guest"):
-
-- self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
-- self.client = None
-- self.channel = None
-- self.queue = None
-- self.qname = None
-- self.metadata = ManagementMetadata (self)
-
- # Initialize the callback records
- self.schema_cb = None
- self.config_cb = None
- self.inst_cb = None
-
- self.host = host
- self.port = port
- self.username = username
- self.password = password
-
- def schemaListener (self, context, callback):
- self.schema_cb = (context, callback)
-
- def configListener (self, context, callback):
- self.config_cb = (context, callback)
-
- def instrumentationListener (self, context, callback):
- self.inst_cb = (context, callback)
-
- def start (self):
-- print "Connecting to broker", self.host
-
- try:
- self.client = Client (self.host, self.port, self.spec)
- self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
- self.channel = self.client.channel (1)
-- response = self.channel.session_open (detached_lifetime=300)
-- self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
-
-- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
-- self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
-- routing_key="mgmt")
-- self.channel.message_subscribe (queue=self.qname, destination="dest")
-- self.queue = self.client.queue ("dest")
-- self.queue.listen (self.receive_cb)
-
-- self.channel.message_flow_mode (destination="dest", mode=1)
-- self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
-- self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
-
- except socket.error, e:
- print "Socket Error Detected:", e[1]
- raise
- except:
- raise
---- 195,335 ----
- return 0
- return 1
-
-+ def publish_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
-
- if self.checkHeader (codec) == 0:
- raise ValueError ("outer header invalid");
-
-+ self.metadata.parse (codec)
-+ msg.complete ()
-
-+ def reply_cb (self, msg):
-+ codec = Codec (StringIO (msg.content.body), self.spec)
-+ methodId = codec.decode_long ()
-+ status = codec.decode_long ()
-+ sText = codec.decode_shortstr ()
-+
-+ args = {}
-+ if status == 0:
-+ args["sequence"] = codec.decode_long ()
-+ args["body"] = codec.decode_longstr ()
-+
-+ if self.method_cb != None:
-+ self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
-+
- msg.complete ()
-
-+ def __init__ (self,
-+ host = "localhost",
-+ port = 5672,
-+ username = "guest",
-+ password = "guest",
-+ specfile = "../specs/amqp.0-10-preview.xml"):
-
-+ self.spec = qpid.spec.load (specfile)
-+ self.client = None
-+ self.channel = None
-+ self.queue = None
-+ self.rqueue = None
-+ self.qname = None
-+ self.rqname = None
-+ self.metadata = ManagementMetadata (self)
-+ self.connected = 0
-+ self.lastConnectError = None
-
- # Initialize the callback records
-+ self.status_cb = None
- self.schema_cb = None
- self.config_cb = None
- self.inst_cb = None
-+ self.method_cb = None
-
- self.host = host
- self.port = port
- self.username = username
- self.password = password
-
-+ def statusListener (self, context, callback):
-+ self.status_cb = (context, callback)
-+
- def schemaListener (self, context, callback):
- self.schema_cb = (context, callback)
-
- def configListener (self, context, callback):
- self.config_cb = (context, callback)
-
-+ def methodListener (self, context, callback):
-+ self.method_cb = (context, callback)
-+
- def instrumentationListener (self, context, callback):
- self.inst_cb = (context, callback)
-
-+ def method (self, methodId, objId, className,
-+ methodName, args=None, packageName="qpid"):
-+ codec = Codec (StringIO (), self.spec);
-+ codec.encode_long (methodId)
-+ codec.encode_longlong (objId)
-+ codec.encode_shortstr (self.rqname)
-+
-+ # TODO: Encode args according to schema
-+ if methodName == "echo":
-+ codec.encode_long (args["sequence"])
-+ codec.encode_longstr (args["body"])
-+
-+ msg = Content (codec.stream.getvalue ())
-+ msg["content_type"] = "application/octet-stream"
-+ msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
-+ msg["reply_to"] = self.spec.struct ("reply_to")
-+ self.channel.message_transfer (destination="qpid.management", content=msg)
-+
-+ def isConnected (self):
-+ return connected
-+
- def start (self):
-+ print "Connecting to broker %s:%d" % (self.host, self.port)
-
- try:
- self.client = Client (self.host, self.port, self.spec)
- self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
- self.channel = self.client.channel (1)
-+ response = self.channel.session_open (detached_lifetime=10)
-+ self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
-+ self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
-
-+ self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
-+ self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
-+
-+ self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
-+ routing_key="mgmt.#")
-+ self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
-+ routing_key=self.rqname)
-
-+ self.channel.message_subscribe (queue=self.qname, destination="mdest")
-+ self.channel.message_subscribe (queue=self.rqname, destination="rdest")
-
-+ self.queue = self.client.queue ("mdest")
-+ self.queue.listen (self.publish_cb)
-+
-+ self.channel.message_flow_mode (destination="mdest", mode=1)
-+ self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
-+ self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
-+
-+ self.rqueue = self.client.queue ("rdest")
-+ self.rqueue.listen (self.reply_cb)
-+
-+ self.channel.message_flow_mode (destination="rdest", mode=1)
-+ self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
-+ self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
-+
-+ self.connected = 1
-+
- except socket.error, e:
- print "Socket Error Detected:", e[1]
-+ self.lastConnectError = e
- raise
- except:
- raise
-+
-+ def stop (self):
-+ pass