summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-10-22 13:42:29 +0000
committerTed Ross <tross@apache.org>2009-10-22 13:42:29 +0000
commitf7de7d4cf111a1dbe2acc220d5b16ebb06f27cae (patch)
tree11fdb70bae670127a9a0429d0146a640c3dd3cea
parente95e725bb5c8040061b96613dea017c1335b31e0 (diff)
downloadqpid-python-f7de7d4cf111a1dbe2acc220d5b16ebb06f27cae.tar.gz
Added immediate-publish for new connections and agents. This solves a race condition where
a QMF console may learn about an object before it learns about the agent that controls that object. Changed log category for QMF messages from debug to trace. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@828685 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp83
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h3
3 files changed, 53 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 3b8a6c71d4..824a2ee75d 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -97,7 +97,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
- agent->addObject(mgmtObject, objectId);
+ agent->addObject(mgmtObject, objectId, true);
}
ConnectionState::setUrl(mgmtId);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 0e462342d4..ae317512b8 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -46,7 +46,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy();
- QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
+ QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
}
ManagementAgent::ManagementAgent () :
@@ -170,8 +170,9 @@ void ManagementAgent::registerEvent (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
-ObjectId ManagementAgent::addObject (ManagementObject* object,
- uint64_t persistId)
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+ uint64_t persistId,
+ bool publishNow)
{
Mutex::ScopedLock lock (addLock);
uint16_t sequence;
@@ -189,6 +190,22 @@ ObjectId ManagementAgent::addObject (ManagementObject* object,
object->setObjectId(objId);
newManagementObjects[objId] = object;
+
+ if (publishNow) {
+#define IMM_BUFSIZE 65536
+ char rawBuf[IMM_BUFSIZE];
+ Buffer msgBuffer(rawBuf, IMM_BUFSIZE);
+
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ uint32_t contentSize = msgBuffer.getPosition();
+ stringstream key;
+ key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ msgBuffer.reset();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
+ }
+
return objId;
}
@@ -240,7 +257,7 @@ void ManagementAgent::clientAdded (const std::string& routingKey)
outLen = outBuffer.getPosition();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey);
- QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
+ QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey);
}
}
@@ -396,7 +413,7 @@ void ManagementAgent::periodicProcessing (void)
stringstream key;
key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
}
}
@@ -422,7 +439,7 @@ void ManagementAgent::periodicProcessing (void)
msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
+ QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
}
@@ -438,7 +455,7 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
+ QPID_LOG(trace, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
@@ -495,7 +512,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- QPID_LOG(debug, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -507,7 +524,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
return;
}
@@ -523,7 +540,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
return;
}
}
@@ -553,7 +570,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
}
void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -561,7 +578,7 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
+ QPID_LOG(trace, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
@@ -569,12 +586,12 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
+ QPID_LOG(trace, "SEND BrokerResponse to=" << replyToKey);
}
void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
{
- QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
+ QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey);
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -588,7 +605,7 @@ void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence);
}
sendCommandComplete (replyToKey, sequence);
@@ -600,7 +617,7 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string replyToKey, u
inBuffer.getShortString(packageName);
- QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
findOrAddPackageLH(packageName);
}
@@ -611,7 +628,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui
inBuffer.getShortString(packageName);
- QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end())
@@ -631,7 +648,7 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, ui
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
+ QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name <<
"(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence);
}
}
@@ -649,7 +666,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -666,7 +683,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
if (cIter != pIter->second.end())
@@ -697,7 +714,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey,
inBuffer.getShortString (key.name);
inBuffer.getBin128 (key.hash);
- QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
+ QPID_LOG(trace, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
@@ -715,7 +732,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey,
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
}
else
sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -739,7 +756,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
inBuffer.getBin128(key.hash);
inBuffer.restore();
- QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ QPID_LOG(trace, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -764,7 +781,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToK
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, mExchange, "schema.class");
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
+ QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
}
@@ -849,7 +866,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
requestedBrokerBank = inBuffer.getLong();
requestedAgentBank = inBuffer.getLong();
- QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
+ QPID_LOG(trace, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
" reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
assignedBank = assignBankLH(requestedAgentBank);
@@ -866,11 +883,11 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
agent->mgmtObject->set_systemId (systemId);
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
- addObject (agent->mgmtObject);
+ addObject (agent->mgmtObject, 0, true);
remoteAgents[connectionRef] = agent;
- QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
+ QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -882,7 +899,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
+ QPID_LOG(trace, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
@@ -895,7 +912,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
ft.decode(inBuffer);
- QPID_LOG(debug, "RECV GetQuery query=" << ft << " seq=" << sequence);
+ QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -920,7 +937,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandComplete(replyToKey, sequence);
@@ -947,7 +964,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
}
}
}
@@ -1011,7 +1028,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
}
return false;
@@ -1083,7 +1100,7 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer (outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package")
+ QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
return result.first;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 84e84e3daa..d00dbf2b24 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -80,7 +80,8 @@ public:
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0);
+ uint64_t persistId = 0,
+ bool publishNow = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);