summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-23 21:21:17 +0000
committerTed Ross <tross@apache.org>2010-03-23 21:21:17 +0000
commit412859fac1743869f40cf56598ca09dbcfb06379 (patch)
treee55747a5c49c27fc4a3bc6a4fb45fd2d05f90903
parentd1178e0c2abe09f41eba0c1f794edd7caf8b8605 (diff)
downloadqpid-python-412859fac1743869f40cf56598ca09dbcfb06379.tar.gz
Checkpointing updates for the Python console.
- Added 'list' type for QMF. - Updated qmf-agent example to use the new string formats for agent name and object-id. - Major updates in Python qmf.console to handle dual-mode operation. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@926788 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/qmf-agent/example.cpp9
-rw-r--r--qpid/cpp/examples/qmf-agent/schema.xml1
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h1
-rw-r--r--qpid/cpp/managementgen/qmfgen/management-types.xml1
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp24
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h2
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py509
7 files changed, 407 insertions, 140 deletions
diff --git a/qpid/cpp/examples/qmf-agent/example.cpp b/qpid/cpp/examples/qmf-agent/example.cpp
index 7d9043e097..96edc2f4d4 100644
--- a/qpid/cpp/examples/qmf-agent/example.cpp
+++ b/qpid/cpp/examples/qmf-agent/example.cpp
@@ -98,7 +98,7 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent
static uint64_t persistId = 0x111222333444555LL;
mgmtObject = new _qmf::Parent(agent, this, name);
- agent->addObject(mgmtObject, persistId++);
+ agent->addObject(mgmtObject);
mgmtObject->set_state("IDLE");
Variant::Map args;
@@ -109,6 +109,11 @@ CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent
subMap["numeric-data"] = 10000;
args["map-data"] = subMap;
mgmtObject->set_args(args);
+
+ Variant::List list;
+ list.push_back(20000);
+ list.push_back("string-item");
+ mgmtObject->set_list(list);
}
void CoreClass::doLoop()
@@ -190,7 +195,7 @@ int main_int(int argc, char** argv)
_qmf::Package packageInit(agent);
// Name the agent.
- agent->setName("apache.org", "qmf-example");
+ agent->setName("apache.org", "qmf-example", "A");
// Start the agent. It will attempt to make a connection to the
// management broker
diff --git a/qpid/cpp/examples/qmf-agent/schema.xml b/qpid/cpp/examples/qmf-agent/schema.xml
index 5662683eab..3c7755fe83 100644
--- a/qpid/cpp/examples/qmf-agent/schema.xml
+++ b/qpid/cpp/examples/qmf-agent/schema.xml
@@ -30,6 +30,7 @@
<property name="name" type="sstr" access="RC" index="y"/>
<property name="args" type="map" access="RO"/>
+ <property name="list" type="list" access="RO"/>
<statistic name="state" type="sstr" desc="Operational state of the link"/>
<statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index 50c396d2a9..2a5e5b6e52 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -97,6 +97,7 @@ public:
static const uint8_t TYPE_S16 = 17;
static const uint8_t TYPE_S32 = 18;
static const uint8_t TYPE_S64 = 19;
+ static const uint8_t TYPE_LIST = 21;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;
diff --git a/qpid/cpp/managementgen/qmfgen/management-types.xml b/qpid/cpp/managementgen/qmfgen/management-types.xml
index 139be6c5df..95434a278b 100644
--- a/qpid/cpp/managementgen/qmfgen/management-types.xml
+++ b/qpid/cpp/managementgen/qmfgen/management-types.xml
@@ -41,6 +41,7 @@
<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble(#)" decode="# = @.getDouble()" stream="#" size="8" accessor="direct" init="0."/>
<type name="uuid" base="UUID" cpp="::qpid::messaging::Uuid" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="16" accessor="direct" init="::qpid::messaging::Uuid()" byRef="y" unmap="(#).asUuid().data()" map="::qpid::messaging::Uuid((#).data())" />
<type name="map" base="FTABLE" cpp="::qpid::messaging::VariantMap" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::VariantMap()" byRef="y" unmap="::qpid::messaging::VariantMap(); assert(false); /*TBD*/"/>
+<type name="list" base="LIST" cpp="::qpid::messaging::Variant::List" encode="#.encode(@)" decode="#.decode(@)" stream="#" size="#.encodedSize()" accessor="direct" init="::qpid::messaging::Variant::List()" byRef="y" unmap="::qpid::messaging::Variant::List(); assert(false); /*TBD*/"/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" style="wm" stream="#" size="1" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" style="wm" stream="#" size="2" accessor="counter" init="0"/>
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 845d74d1ea..1633e77a4f 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -156,7 +156,7 @@ void ManagementAgentImpl::init(const string& brokerHost,
void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
uint16_t intervalSeconds,
bool useExternalThread,
- const std::string& _storeFile)
+ const string& _storeFile)
{
interval = intervalSeconds;
extThread = useExternalThread;
@@ -448,7 +448,7 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
}
}
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
{
Mutex::ScopedLock lock(agentLock);
string packageName;
@@ -468,14 +468,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- std::string body;
+ string body;
encodeHeader(outBuffer, 's', sequence);
schema.writeSchemaCall(body);
outBuffer.putRawData(body);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
@@ -507,7 +507,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
(outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
failed = true;
} else {
- std::string methodName;
+ string methodName;
ObjectId objId;
qpid::messaging::Variant::Map inArgs;
@@ -738,7 +738,7 @@ void ManagementAgentImpl::received(Message& msg)
if (checkHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+ else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
@@ -754,15 +754,15 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
buf.putLong (seq);
}
-qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname,
- const std::string& cname,
+qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
const uint8_t *md5Sum)
{
qpid::messaging::Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
- map_["_hash_str"] = messaging::Uuid((const char*) md5Sum);
+ map_["_hash_str"] = messaging::Uuid(md5Sum);
return map_;
}
@@ -922,7 +922,10 @@ void ManagementAgentImpl::periodicProcessing()
if (send_stats || send_props) {
::qpid::messaging::Variant::Map map_;
::qpid::messaging::Variant::Map values;
+ ::qpid::messaging::Variant::Map oid;
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
object->getClassName(),
object->getMd5Sum());
@@ -938,7 +941,7 @@ void ManagementAgentImpl::periodicProcessing()
}
content.encode();
- const std::string &str = m.getContent();
+ const string &str = m.getContent();
if (str.length()) {
::qpid::messaging::Variant::Map headers;
headers["method"] = "indication";
@@ -1105,6 +1108,7 @@ void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
try {
session.messageTransfer(arg::content=msg, arg::destination=exchange);
} catch(exception& e) {
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 399c9d7944..69a891a1b2 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -264,7 +264,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
void handleGetQuery (const std::string& body, const std::string& content_type,
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 226d586f8b..0b0ec417d0 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -119,10 +119,16 @@ class Object(object):
""" This class defines a 'proxy' object representing a real managed object on an agent.
Actions taken on this proxy are remotely affected on the real managed object.
"""
- def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
+ def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}):
self._session = session
self._broker = broker
self._schema = schema
+ self._properties = []
+ self._statistics = []
+ if v2Map:
+ self.v2Init(v2Map, agentName)
+ return
+
self._managed = managed
if self._managed:
self._currentTime = codec.read_uint64()
@@ -134,8 +140,6 @@ class Object(object):
self._createTime = None
self._deleteTime = None
self._objectId = None
- self._properties = []
- self._statistics = []
if codec:
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
@@ -156,6 +160,27 @@ class Object(object):
for statistic in schema.getStatistics():
self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+ def v2Init(self, omap, agentName):
+ if omap.__class__ != dict:
+ raise Exception("QMFv2 object data must be a map/dict")
+ if '_values' not in omap:
+ raise Exception("QMFv2 object must have '_values' element")
+
+ values = omap['_values']
+ for prop in self._schema.getProperties():
+ if prop.name in values:
+ self._properties.append((prop, values[prop.name]))
+ for stat in self._schema.getStatistics():
+ if stat.name in values:
+ self._statistics.append((stat, values[stat.name]))
+ if '_subtypes' in omap:
+ self._subtypes = omap['_subtypes']
+ if '_object_id' in omap:
+ self._managed = True
+ self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
+ else:
+ self._managed = None
+
def getBroker(self):
""" Return the broker from which this object was sent """
return self._broker
@@ -244,17 +269,17 @@ class Object(object):
for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self._properties:
- if name == property.name:
+ for prop, value in self._properties:
+ if name == prop.name:
return value
- if name == "_" + property.name + "_" and property.type == 10: # Dereference references
+ if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references
deref = self._session.getObjects(_objectId=value, _broker=self._broker)
if len(deref) != 1:
return None
else:
return deref[0]
- for statistic, value in self._statistics:
- if name == statistic.name:
+ for stat, value in self._statistics:
+ if name == stat.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
@@ -449,7 +474,7 @@ class Session:
"""
self.console = console
self.brokers = []
- self.packages = {}
+ self.schemaCache = SchemaCache()
self.seqMgr = SequenceManager()
self.cv = Condition()
self.syncSequenceList = []
@@ -470,6 +495,134 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+ """
+ ##
+ ## v2_data_queues is used to store object data received from QMFv2 agents.
+ ## It is stored here in case we need to go and query schema data from the
+ ## agent before reporting to the user.
+ ##
+ ## v2_data_queues is a map, keyed by agent address of queues of entries
+ ## The format of entries in the queue is a data map
+ ## This list must be protected by self.cv
+ ##
+ """
+ self.v2_data_queues = {}
+ self.v2_pending_queues = {}
+
+ def _getBrokerForAgentAddr(self, agent_addr):
+ broker = None
+ try:
+ self.cv.acquire()
+ key = (1, agent_addr)
+ for b in self.brokers:
+ if key in b.agents:
+ broker = b
+ finally:
+ self.cv.release()
+ return broker
+
+ def _processV2Data(self):
+ """
+ Attempt to make progress on the entries in the v2_data_queue. If an entry has a schema
+ that is in our schema cache, process it. Otherwise, send a request for the schema information
+ to the agent that manages the object.
+ """
+ try:
+ self.cv.acquire()
+ pop_list = []
+ for agent_addr in self.v2_data_queues:
+ entries = self.v2_data_queues[agent_addr]
+ keep_going = True
+ while keep_going and len(entries) > 0:
+ schemaId = self._getSchemaIdforV2ObjectLH(entries[0])
+ schema = self.schemaCache.getSchema(schemaId)
+ if schema:
+ broker = self._getBrokerForAgentAddr(agent_addr)
+ obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr)
+ entries.pop(0)
+
+ """
+ TODO: This following code assumes that the data indication came unsolicited.
+ This needs to be enhanced to handle the case of a query response.
+ """
+ if self.console:
+ self.console.objectProps(broker, obj)
+
+ else:
+ """
+ We have no schema for this data object, move the queue to the pending map and request
+ schema data from the agent
+ """
+ self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr]
+ pop_list.append(agent_addr)
+ self._v2SendSchemaRequest(agent_addr, schemaId)
+ keep_going = None
+ for agent_addr in pop_list:
+ self.v2_data_queues.pop(agent_addr)
+ finally:
+ self.cv.release()
+
+ def _addV2Data(self, agent_addr, data_map):
+ """
+ Add data-for-processing to the work queue
+ """
+ process = None
+ try:
+ self.cv.acquire()
+ if agent_addr in self.v2_pending_queues:
+ self.v2_pending_queues[agent_addr].append(data_map)
+ else:
+ if agent_addr not in self.v2_data_queues:
+ self.v2_data_queues[agent_addr] = []
+ self.v2_data_queues[agent_addr].append(data_map)
+ process = True
+ finally:
+ self.cv.release()
+
+ if process:
+ self._processV2Data()
+
+ def _removeV2Agent(self, agent):
+ """
+ Remove entries in the data queues related to a lost agent.
+ """
+ agent_name = agent.getAgentBank()
+ try:
+ self.cv.acquire()
+ if agent_name in self.v2_data_queues:
+ self.v2_data_queues.pop(agent_name)
+ if agent_name in self.v2_pending_queues:
+ self.v2_pending_queues.pop(agent_name)
+ finally:
+ self.cv.release()
+
+ def _schemaInfoFromV2Agent(self, agent_addr):
+ """
+ We have just received new schema information from an agent. Check to see if there's
+ more work that can now be done.
+ """
+ re_process = None
+ try:
+ self.cv.acquire()
+ if agent_addr in self.v2_pending_queues:
+ self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr)
+ re_process = True
+ finally:
+ self.cv.release()
+
+ if re_process:
+ self._processV2Data()
+
+ def _getSchemaIdforV2ObjectLH(self, data):
+ """
+ Given a data map, extract the schema-identifier.
+ """
+ if data.__class__ != dict:
+ return None
+ if '_schema_id' in data:
+ return ClassKey(data['_schema_id'])
+ return None
+
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
@@ -489,6 +642,7 @@ class Session:
returned from the addBroker call """
if self.console:
for agent in broker.getAgents():
+ self.console.removev2Agent(agent)
self.console.delAgent(agent)
broker._shutdown()
self.brokers.remove(broker)
@@ -498,30 +652,19 @@ class Session:
""" Get the list of known QMF packages """
for broker in self.brokers:
broker._waitForStable()
- list = []
- for package in self.packages:
- list.append(package)
- return list
+ return self.schemaCache.getPackages()
def getClasses(self, packageName):
""" Get the list of known classes within a QMF package """
for broker in self.brokers:
broker._waitForStable()
- list = []
- if packageName in self.packages:
- for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
- return list
+ return self.schemaCache.getClasses(packageName)
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
- pname = classKey.getPackageName()
- pkey = classKey.getPackageKey()
- if pname in self.packages:
- if pkey in self.packages[pname]:
- return self.packages[pname][pkey]
+ return self.schemaCache.getSchema(classKey)
def bindPackage(self, packageName):
""" Request object updates for all table classes within a package. """
@@ -743,6 +886,7 @@ class Session:
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
+ self.session._removeV2Agent(agent)
self.console.delAgent(agent)
self.console.brokerDisconnected(broker)
@@ -761,14 +905,7 @@ class Session:
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
- notify = False
- try:
- self.cv.acquire()
- if pname not in self.packages:
- self.packages[pname] = {}
- notify = True
- finally:
- self.cv.release()
+ notify = self.schemaCache.declarePackage(pname)
if notify and self.console != None:
self.console.newPackage(pname)
@@ -806,17 +943,9 @@ class Session:
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
- unknown = False
+ schema = self.schemaCache.getSchema(classKey)
- try:
- self.cv.acquire()
- if classKey.getPackageName() in self.packages:
- if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
- unknown = True
- finally:
- self.cv.release()
-
- if unknown:
+ if not schema:
# Send a schema request for the unknown class
broker._incOutstanding()
sendCodec = Codec()
@@ -879,37 +1008,27 @@ class Session:
event = Event(self, broker, codec)
self.console.event(broker, event)
- def _handleSchemaResp(self, broker, codec, seq):
+ def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec, self)
- try:
- self.cv.acquire()
- self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
- finally:
- self.cv.release()
-
+ self.schemaCache.declareClass(classKey, _class)
self.seqMgr._release(seq)
broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
+ if agent_addr:
+ self._schemaInfoFromV2Agent(agent_addr)
+
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return
object = Object(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+ if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
broker._updateAgent(object)
try:
@@ -949,7 +1068,15 @@ class Session:
self._v2HandleHeartbeatInd(broker, mp, ah, content)
def _v2HandleDataInd(self, broker, mp, ah, content):
- pass
+ kind = "_data"
+ if "qmf.content" in ah:
+ kind = ah["qmf.content"]
+ agent_addr = ah["qmf.agent"]
+ if content.__class__ != list:
+ return
+ if kind == "_data":
+ for omap in content:
+ self._addV2Data(agent_addr, omap)
def _v2HandleQueryRsp(self, broker, mp, ah, content):
pass
@@ -960,6 +1087,24 @@ class Session:
def _v2HandleException(self, broker, mp, ah, content):
pass
+ def _v2SendSchemaRequest(self, agent_addr, schemaId):
+ """
+ Send a query to an agent to request details on a particular schema class.
+ IMPORTANT: This function currently sends a QMFv1 schema-request to the address of
+ the agent. The agent will send its response to amq.direct/<our-key>.
+ Eventually, this will be converted to a proper QMFv2 schema query.
+ """
+ broker = self._getBrokerForAgentAddr(agent_addr)
+ if not broker:
+ return
+
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve(None)
+ broker._setHeader(sendCodec, 'S', seq)
+ schemaId.encode(sendCodec)
+ smsg = broker._message(sendCodec.encoded, agent_addr)
+ broker._send(smsg, "qmf.default.direct")
+
def _handleError(self, error):
try:
self.cv.acquire()
@@ -1004,17 +1149,9 @@ class Session:
inner_type_code = codec.read_uint8()
if inner_type_code == 20:
classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return None
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return None
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return None
data = Object(self, broker, schema, codec, True, True, False)
else:
data = self._decodeValue(codec, inner_type_code, broker)
@@ -1206,15 +1343,86 @@ class Session:
return seq
return None
-class Package:
- """ """
- def __init__(self, name):
- self.name = name
+class SchemaCache(object):
+ """
+ The SchemaCache is a data structure that stores learned schema information.
+ """
+ def __init__(self):
+ """
+ Create a map of schema packages and a lock to protect this data structure.
+ Note that this lock is at the bottom of any lock hierarchy. If it is held, no other
+ lock in the system should attempt to be acquired.
+ """
+ self.packages = {}
+ self.lock = Lock()
+
+ def getPackages(self):
+ """ Get the list of known QMF packages """
+ list = []
+ try:
+ self.lock.acquire()
+ for package in self.packages:
+ list.append(package)
+ finally:
+ self.lock.release()
+ return list
+
+ def getClasses(self, packageName):
+ """ Get the list of known classes within a QMF package """
+ list = []
+ try:
+ self.lock.acquire()
+ if packageName in self.packages:
+ for pkey in self.packages[packageName]:
+ list.append(self.packages[packageName][pkey].getKey())
+ finally:
+ self.lock.release()
+ return list
+
+ def getSchema(self, classKey):
+ """ Get the schema for a QMF class """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ if pkey in self.packages[pname]:
+ return self.packages[pname][pkey]
+ finally:
+ self.lock.release()
+ return None
+
+ def declarePackage(self, pname):
+ """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ return None
+ self.packages[pname] = {}
+ finally:
+ self.lock.release()
+ return True
+
+ def declareClass(self, classKey, classDef):
+ """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+ packageMap = self.packages[pname]
+ if pkey in packageMap:
+ return None
+ packageMap[pkey] = classDef
+ finally:
+ self.lock.release()
+ return True
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
def __init__(self, constructor):
- if type(constructor) == str:
+ if constructor.__class__ == str:
# construct from __repr__ string
try:
self.pname, cls = constructor.split(":")
@@ -1225,20 +1433,30 @@ class ClassKey:
h1 = int(hexValues[1], 16)
h2 = int(hexValues[2], 16)
h3 = int(hexValues[3], 16)
- self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
+ h4 = int(hexValues[4][0:4], 16)
+ h5 = int(hexValues[4][4:12], 16)
+ self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5))
except:
raise Exception("Invalid ClassKey format")
+ elif constructor.__class__ == dict:
+ # construct from QMFv2 map
+ try:
+ self.pname = constructor['_package_name']
+ self.cname = constructor['_class_name']
+ self.hash = constructor['_hash_str']
+ except:
+ raise Exception("Invalid ClassKey map format")
else:
# construct from codec
codec = constructor
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
- self.hash = codec.read_bin128()
+ self.hash = UUID(codec.read_bin128())
def encode(self, codec):
codec.write_str8(self.pname)
codec.write_str8(self.cname)
- codec.write_bin128(self.hash)
+ codec.write_bin128(self.hash.bytes)
def getPackageName(self):
return self.pname
@@ -1250,7 +1468,7 @@ class ClassKey:
return self.hash
def getHashString(self):
- return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
+ return str(self.hash)
def getPackageKey(self):
return (self.cname, self.hash)
@@ -1442,61 +1660,93 @@ class SchemaArgument:
class ObjectId:
""" Object that represents QMF object identifiers """
- def __init__(self, codec, first=0, second=0):
- if codec:
- self.first = codec.read_uint64()
- self.second = codec.read_uint64()
+ def __init__(self, constructor, first=0, second=0, agentName=None):
+ if constructor.__class__ == dict:
+ self.agentName = agentName
+ self.agentEpoch = 0
+ if '_agent_name' in constructor: self.agentName = constructor['_agent_name']
+ if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch']
+ if '_object_name' not in constructor:
+ raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.")
+ self.objectName = constructor['_object_name']
else:
- self.first = first
- self.second = second
+ if not constructor:
+ first = first
+ second = second
+ else:
+ first = constructor.read_uint64()
+ second = constructor.read_uint64()
+ self.agentName = str((first & 0x0000FFFFF0000000) >> 28)
+ self.agentEpoch = (first & 0x0FFF000000000000) >> 48
+ self.objectName = str(second)
def __cmp__(self, other):
if other == None or not isinstance(other, ObjectId) :
return 1
- if self.first < other.first:
+
+ if self.objectName < other.objectName:
+ return -1
+ if self.objectName > other.objectName:
+ return 1
+
+ if self.agentName < other.agentName:
return -1
- if self.first > other.first:
+ if self.agentName > other.agentName:
return 1
- if self.second < other.second:
+
+ if self.agentEpoch < other.agentEpoch:
return -1
- if self.second > other.second:
+ if self.agentEpoch > other.agentEpoch:
return 1
return 0
def __repr__(self):
- return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
+ return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
self.getBrokerBank(), self.getAgentBank(), self.getObject())
def index(self):
- return (self.first, self.second)
+ return self.__repr__()
def getFlags(self):
- return (self.first & 0xF000000000000000) >> 60
+ return 0
def getSequence(self):
- return (self.first & 0x0FFF000000000000) >> 48
+ return self.agentEpoch
def getBrokerBank(self):
- return (self.first & 0x0000FFFFF0000000) >> 28
+ return 1
def getAgentBank(self):
- return self.first & 0x000000000FFFFFFF
+ return self.agentName
def getObject(self):
- return self.second
+ return self.objectName
def isDurable(self):
return self.getSequence() == 0
def encode(self, codec):
- codec.write_uint64(self.first)
- codec.write_uint64(self.second)
+ first = self.agentEpoch << 48
+ second = 0
+
+ try:
+ first += int(self.agentName) << 28
+ except:
+ pass
+
+ try:
+ second = int(self.objectName)
+ except:
+ pass
+
+ codec.write_uint64(first)
+ codec.write_uint64(second)
def __hash__(self):
- return (self.first, self.second).__hash__()
+ return self.__repr__().__hash__()
def __eq__(self, other):
- return (self.first, self.second).__eq__(other)
+ return self.__repr__().__eq__(other)
class MethodResult(object):
""" """
@@ -1590,7 +1840,6 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
- self.agentLock = Lock()
self.error = None
self.brokerId = None
self.connected = False
@@ -1625,11 +1874,11 @@ class Broker:
""" Return the agent object associated with a particular broker and agent bank value."""
bankKey = (brokerBank, agentBank)
try:
- self.agentLock.acquire()
+ self.cv.acquire()
if bankKey in self.agents:
return self.agents[bankKey]
finally:
- self.agentLock.release()
+ self.cv.release()
return None
def getSessionId(self):
@@ -1639,10 +1888,10 @@ class Broker:
def getAgents(self):
""" Get the list of agents reachable via this broker """
try:
- self.agentLock.acquire()
+ self.cv.acquire()
return self.agents.values()
finally:
- self.agentLock.release()
+ self.cv.release()
def getAmqpSession(self):
""" Get the AMQP session object for this connected broker. """
@@ -1672,11 +1921,11 @@ class Broker:
def _tryToConnect(self):
try:
try:
- self.agentLock.acquire()
+ self.cv.acquire()
self.agents = {}
self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
finally:
- self.agentLock.release()
+ self.cv.release()
self.topicBound = False
self.syncInFlight = False
@@ -1768,35 +2017,35 @@ class Broker:
agent = None
if obj._deleteTime == 0:
try:
- self.agentLock.acquire()
+ self.cv.acquire()
if bankKey not in self.agents:
agent = Agent(self, obj.agentBank, obj.label)
self.agents[bankKey] = agent
finally:
- self.agentLock.release()
+ self.cv.release()
if agent and self.session.console:
self.session.console.newAgent(agent)
else:
try:
- self.agentLock.acquire()
+ self.cv.acquire()
agent = self.agents.pop(bankKey, None)
finally:
- self.agentLock.release()
+ self.cv.release()
if agent and self.session.console:
self.session.console.delAgent(agent)
def _addAgent(self, name, agent):
try:
- self.agentLock.acquire()
+ self.cv.acquire()
self.agents[(1, name)] = agent
finally:
- self.agentLock.release()
+ self.cv.release()
if self.session.console:
self.session.console.newAgent(agent)
def _ageAgents(self):
try:
- self.agentLock.acquire()
+ self.cv.acquire()
to_delete = []
to_notify = []
for key in self.agents:
@@ -1805,12 +2054,16 @@ class Broker:
for key in to_delete:
to_notify.append(self.agents.pop(key, None))
finally:
- self.agentLock.release()
+ self.cv.release()
if self.session.console:
for agent in to_notify:
+ self.session._removeV2Agent(agent)
self.session.console.delAgent(agent)
def _v2SendAgentLocate(self, predicate={}):
+ """
+ Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
+ """
dp = self.amqpSession.delivery_properties()
dp.routing_key = "console.request.agent_locate"
mp = self.amqpSession.message_properties()
@@ -1915,6 +2168,11 @@ class Broker:
self.cv.release()
def _replyCb(self, msg):
+ agent_addr = None
+ mp = msg.get("message_properties")
+ ah = mp.application_headers
+ if ah and 'qmf.agent' in ah:
+ agent_addr = ah['qmf.agent']
codec = Codec(msg.body)
while True:
opcode, seq = self._checkHeader(codec)
@@ -1926,7 +2184,7 @@ class Broker:
elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
@@ -2014,15 +2272,12 @@ class Event:
self.classKey = ClassKey(codec)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
- self.schema = None
- pname = self.classKey.getPackageName()
- pkey = self.classKey.getPackageKey()
- if pname in session.packages:
- if pkey in session.packages[pname]:
- self.schema = session.packages[pname][pkey]
- self.arguments = {}
- for arg in self.schema.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
+ self.schema = session.schemaCache.getSchema(self.classKey)
+ if not self.schema:
+ return
+ self.arguments = {}
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
def __repr__(self):
if self.schema == None: