diff options
author | Ted Ross <tross@apache.org> | 2009-10-22 13:42:29 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-10-22 13:42:29 +0000 |
commit | f7de7d4cf111a1dbe2acc220d5b16ebb06f27cae (patch) | |
tree | 11fdb70bae670127a9a0429d0146a640c3dd3cea | |
parent | e95e725bb5c8040061b96613dea017c1335b31e0 (diff) | |
download | qpid-python-f7de7d4cf111a1dbe2acc220d5b16ebb06f27cae.tar.gz |
Added immediate-publish for new connections and agents. This solves a race condition where
a QMF console may learn about an object before it learns about the agent that controls that
object.
Changed log category for QMF messages from debug to trace.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@828685 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 83 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 3 |
3 files changed, 53 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 3b8a6c71d4..824a2ee75d 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -97,7 +97,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std // TODO set last bool true if system connection if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); - agent->addObject(mgmtObject, objectId); + agent->addObject(mgmtObject, objectId, true); } ConnectionState::setUrl(mgmtId); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 0e462342d4..ae317512b8 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -46,7 +46,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () { if (mgmtObject != 0) mgmtObject->resourceDestroy(); - QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); + QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); } ManagementAgent::ManagementAgent () : @@ -170,8 +170,9 @@ void ManagementAgent::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } -ObjectId ManagementAgent::addObject (ManagementObject* object, - uint64_t persistId) +ObjectId ManagementAgent::addObject(ManagementObject* object, + uint64_t persistId, + bool publishNow) { Mutex::ScopedLock lock (addLock); uint16_t sequence; @@ -189,6 +190,22 @@ ObjectId ManagementAgent::addObject (ManagementObject* object, object->setObjectId(objId); newManagementObjects[objId] = object; + + if (publishNow) { +#define IMM_BUFSIZE 65536 + char rawBuf[IMM_BUFSIZE]; + Buffer msgBuffer(rawBuf, IMM_BUFSIZE); + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + uint32_t contentSize = msgBuffer.getPosition(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + msgBuffer.reset(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str()); + } + return objId; } @@ -240,7 +257,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey) outLen = outBuffer.getPosition(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey); - QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey); + QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey); } } @@ -396,7 +413,7 @@ void ManagementAgent::periodicProcessing (void) stringstream key; key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -422,7 +439,7 @@ void ManagementAgent::periodicProcessing (void) msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); + QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } } @@ -438,7 +455,7 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << + QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } @@ -495,7 +512,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - QPID_LOG(debug, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -507,7 +524,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) return; } @@ -523,7 +540,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) return; } } @@ -553,7 +570,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); } void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -561,7 +578,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); + QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); @@ -569,12 +586,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_ outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); + QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey); } void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) { - QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); + QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); @@ -588,7 +605,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence); } sendCommandComplete (replyToKey, sequence); @@ -600,7 +617,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, u inBuffer.getShortString(packageName); - QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); findOrAddPackageLH(packageName); } @@ -611,7 +628,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui inBuffer.getShortString(packageName); - QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) @@ -631,7 +648,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name << + QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name << "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence); } } @@ -649,7 +666,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); PackageMap::iterator pIter = findOrAddPackageLH(packageName); @@ -666,7 +683,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); if (cIter != pIter->second.end()) @@ -697,7 +714,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); - QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << + QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); @@ -715,7 +732,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); } else sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); @@ -739,7 +756,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK inBuffer.getBin128(key.hash); inBuffer.restore(); - QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { @@ -764,7 +781,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, mExchange, "schema.class"); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << + QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } } @@ -849,7 +866,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey requestedBrokerBank = inBuffer.getLong(); requestedAgentBank = inBuffer.getLong(); - QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << + QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); assignedBank = assignBankLH(requestedAgentBank); @@ -866,11 +883,11 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_systemId (systemId); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); - addObject (agent->mgmtObject); + addObject (agent->mgmtObject, 0, true); remoteAgents[connectionRef] = agent; - QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); + QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -882,7 +899,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << + QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } @@ -895,7 +912,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ft.decode(inBuffer); - QPID_LOG(debug, "RECV GetQuery query=" << ft << " seq=" << sequence); + QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -920,7 +937,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } sendCommandComplete(replyToKey, sequence); @@ -947,7 +964,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } } @@ -1011,7 +1028,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) } return false; @@ -1083,7 +1100,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer (outBuffer, outLen, mExchange, "schema.package"); - QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package") + QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package") return result.first; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 84e84e3daa..d00dbf2b24 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -80,7 +80,8 @@ public: uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0); + uint64_t persistId = 0, + bool publishNow = false); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); |