diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-22 20:40:49 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-22 20:40:49 +0000 |
commit | d3c6291c047d0c954bce5d652ce3e6abec9466e5 (patch) | |
tree | 655f733e832250b53ff7ee0cf6308663f76fcfe4 | |
parent | 77431e5a5d165975bf3e39c6b9ae9e0ed3228464 (diff) | |
download | qpid-python-d3c6291c047d0c954bce5d652ce3e6abec9466e5.tar.gz |
get query req handling
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@926323 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 155 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 179 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 7 |
4 files changed, 231 insertions, 113 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 3d05e02bcc..f160bf004b 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -544,89 +544,102 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, const string& cid, const string& replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType, + const string& cid, const string& replyTo) { - FieldTable ft; - FieldTable::ValuePtr value; - moveNewObjectsLH(); - ft.decode(inBuffer); + if (contentType != "_query_v1") { + QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); + return; + } - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + qpid::messaging::Message inMsg(body); + qpid::messaging::MapView inMap(inMsg); + qpid::messaging::MapView::const_iterator i; + ::qpid::messaging::Variant::Map headers; - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo<string>()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo<string>()) - return; + QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); - ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map headers; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); - - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"]; - content.encode(); - connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo); + ::qpid::messaging::Message outMsg; + ::qpid::messaging::ListContent content(outMsg); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + string className; - QPID_LOG(trace, "SENT ObjectInd"); + i = inMap.find("_class"); + if (i != inMap.end()) + try { + className = i->second.asString(); + } catch(exception& e) { + className.clear(); + QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); } - //sendCommandComplete(replyTo, sequence); - return; - } - string className(value->get<string>()); + if (className.empty()) { + ObjectId objId; + i = inMap.find("_object_id"); + if (i != inMap.end()) { + + try { + objId = ObjectId(i->second.asMap()); + } catch (exception &e) { + objId = ObjectId(); // empty object id - won't find a match (I hope). + QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); + } - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className) { - ::qpid::messaging::Message m; - ::qpid::messaging::ListContent content(m); - ::qpid::messaging::Variant::List &list_ = content.asList(); - ::qpid::messaging::Variant::Map map_; - ::qpid::messaging::Variant::Map values; - ::qpid::messaging::Variant::Map headers; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - object->mapEncodeValues(values, true, true); // write both stats and properties - map_["_values"] = values; - list_.push_back(map_); - - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); - content.encode(); - connThreadBody.sendBuffer(m.getContent(), cid, headers, "qmf.default.direct", replyTo); + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); - QPID_LOG(trace, "SENT ObjectInd"); + content.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); + } + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + content.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); + } } } - //sendCommandComplete(replyTo, sequence); + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + content.encode(); + connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo); + QPID_LOG(trace, "SENT ObjectInd"); } void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) @@ -698,8 +711,11 @@ void ManagementAgentImpl::received(Message& msg) if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), + mp.getApplicationHeaders().getAsString("qmf.content"), + cid, replyToKey); else { - QPID_LOG(trace, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); } return; } @@ -717,8 +733,7 @@ void ManagementAgentImpl::received(Message& msg) if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'M') - QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!"); + QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 4f838d2317..399c9d7944 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -267,7 +267,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); - void handleGetQuery (qpid::framing::Buffer& inBuffer, const std::string& cid, const std::string& replyTo); + void handleGetQuery (const std::string& body, const std::string& content_type, + const std::string& cid, const std::string& replyTo); void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 0adafc7dd5..bbb14f98e9 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -422,8 +422,8 @@ void ManagementAgent::sendBuffer(Buffer& buf, void ManagementAgent::sendBuffer(const std::string& data, - const uint32_t sequence, - const qpid::messaging::VariantMap headers, + const std::string& cid, + const qpid::messaging::VariantMap& headers, qpid::broker::Exchange::shared_ptr exchange, string routingKey) { @@ -451,8 +451,8 @@ void ManagementAgent::sendBuffer(const std::string& data, MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(data.length()); - if (sequence) { - props->setCorrelationId(boost::lexical_cast<std::string>(sequence)); + if (!cid.empty()) { + props->setCorrelationId(cid); } for (i = headers.begin(); i != headers.end(); ++i) { @@ -854,7 +854,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo, - uint32_t sequence, const ConnectionToken* connToken) + const std::string& cid, const ConnectionToken* connToken) { string methodName; qpid::messaging::Message inMsg(body); @@ -876,8 +876,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); return; } @@ -897,8 +897,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what(); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); return; } @@ -908,8 +908,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); return; } @@ -922,8 +922,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = i->second; outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); return; } @@ -937,8 +937,8 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); return; } } @@ -951,14 +951,14 @@ void ManagementAgent::handleMethodRequestLH (const std::string& body, string rep ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what(); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); return; } outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo); - QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << sequence); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND MethodResponse to=" << replyTo << " seq=" << cid); } @@ -1333,7 +1333,8 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin content.encode(); - sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey); + sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence), + headers, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } @@ -1370,7 +1371,8 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin content.encode(); - sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey); + sendBuffer(m.getContent(), boost::lexical_cast<std::string>(sequence), + headers, dExchange, replyToKey); QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); } } @@ -1379,6 +1381,110 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin sendCommandComplete(replyToKey, sequence); } + +void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& replyTo, const std::string& cid, const std::string& contentType) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + if (contentType != "_query_v1") { + QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); + return; + } + + qpid::messaging::Message inMsg(body); + qpid::messaging::MapView inMap(inMsg); + qpid::messaging::MapView::const_iterator i; + ::qpid::messaging::Variant::Map headers; + + QPID_LOG(trace, "RECV GetQuery: map=" << inMap << " seq=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = std::string(agentName); + headers["partial"]; + + ::qpid::messaging::Message outMsg; + ::qpid::messaging::ListContent content(outMsg); + ::qpid::messaging::Variant::List &list_ = content.asList(); + ::qpid::messaging::Variant::Map map_; + ::qpid::messaging::Variant::Map values; + string className; + + i = inMap.find("_class"); + if (i != inMap.end()) + try { + className = i->second.asString(); + } catch(exception& e) { + className.clear(); + QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); + } + + if (className.empty()) { + ObjectId objId; + i = inMap.find("_object_id"); + if (i != inMap.end()) { + + try { + objId = ObjectId(i->second.asMap()); + } catch (exception &e) { + objId = ObjectId(); // empty object id - won't find a match (I hope). + QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + content.encode(); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + } + } + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + + // @todo: support multiple objects per message reply + values.clear(); + list_.clear(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + content.encode(); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + } + } + } + } + + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + content.encode(); + sendBuffer(outMsg.getContent(), cid, headers, dExchange, replyTo); + QPID_LOG(trace, "SEND GetResponse to=" << replyTo << " seq=" << cid); +} + bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); @@ -1521,7 +1627,8 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); outMap.encode(); - sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyToKey); + sendBuffer(outMsg.getContent(), boost::lexical_cast<std::string>(sequence), + headers, dExchange, replyToKey); } else { @@ -1547,7 +1654,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { - uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -1577,26 +1683,20 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) if (headers && headers->getAsString("app_id") == "qmf2") { std::string opcode = headers->getAsString("qmf.opcode"); + std::string contentType = headers->getAsString("qmf.content"); + std::string body; + std::string cid; + + inBuffer.getRawData(body, bufferLen); - sequence = 0; if (p && p->hasCorrelationId()) { - std::string cid = p->getCorrelationId(); - if (!cid.empty()) { - try { - sequence = boost::lexical_cast<uint32_t>(cid); - } catch(const boost::bad_lexical_cast&) { - QPID_LOG(warning, "Bad correlation Id for received QMF request."); - return; - } - } + cid = p->getCorrelationId(); } - if (opcode == "_method_request") { - std::string body; - inBuffer.getRawData(body, bufferLen); - handleMethodRequestLH(body, replyToKey, sequence, msg.getPublisher()); - return; - } + if (opcode == "_method_request") + return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher()); + else if (opcode == "_query_request") + return handleGetQueryLH(body, replyToKey, cid, contentType); QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; @@ -1606,6 +1706,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) while (inBuffer.getPosition() < bufferLen) { + uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 47f78e48bd..9ec28500c4 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -278,8 +278,8 @@ private: qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); void sendBuffer(const std::string& data, - const uint32_t sequence, - const qpid::messaging::VariantMap headers, + const std::string& cid, + const qpid::messaging::VariantMap& headers, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); void moveNewObjectsLH(); @@ -314,7 +314,8 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleMethodRequestLH (const std::string& body, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (const std::string& body, std::string& replyToKey, const std::string& cid, const std::string& contentType); + void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken); size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); |