summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/engine/BrokerProxyImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf/engine/BrokerProxyImpl.cpp')
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.cpp827
1 files changed, 0 insertions, 827 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
deleted file mode 100644
index 5fc71979fd..0000000000
--- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ /dev/null
@@ -1,827 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/BrokerProxyImpl.h"
-#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qpid/Address.h"
-#include "qpid/sys/SystemInfo.h"
-#include <qpid/log/Statement.h>
-#include <qpid/StringUtils.h>
-#include <string.h>
-#include <iostream>
-#include <fstream>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace {
- const char* QMF_EXCHANGE = "qpid.management";
- const char* DIR_EXCHANGE = "amq.direct";
- const char* BROKER_KEY = "broker";
- const char* BROKER_PACKAGE = "org.apache.qpid.broker";
- const char* AGENT_CLASS = "agent";
- const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
- vector<ObjectPtr>::const_iterator iter = results.begin();
-
- while (idx > 0) {
- if (iter == results.end())
- return 0;
- iter++;
- idx--;
- }
-
- return iter->get();
-}
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-BrokerEvent BrokerEventImpl::copy()
-{
- BrokerEvent item;
-
- ::memset(&item, 0, sizeof(BrokerEvent));
- item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
- item.context = context;
- item.queryResponse = queryResponse.get();
- item.methodResponse = methodResponse.get();
-
- return item;
-}
-
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console)
-{
- stringstream qn;
- qpid::Address addr;
-
- SystemInfo::getLocalHostname(addr);
- qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
- queueName = qn.str();
-
- seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
- {
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList[0] = agent;
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
- }
-
- console.impl->eventAgentAdded(agent);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++)
- if (idx-- == 0)
- return iter->second.get();
- return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
- SequenceContext::Ptr queryContext(new QueryContext(*this, context));
- Mutex::ScopedLock _lock(lock);
- bool sent = false;
- if (agent != 0) {
- if (sendGetRequestLH(queryContext, query, agent))
- sent = true;
- } else {
- // TODO (optimization) only send queries to agents that have the requested class+package
- for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++) {
- if (sendGetRequestLH(queryContext, query, iter->second.get()))
- sent = true;
- }
- }
-
- if (!sent) {
- queryContext->reserve();
- queryContext->release();
- }
-}
-
-bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
-{
- if (query.impl->singleAgent()) {
- if (query.impl->agentBank() != agent->getAgentBank())
- return false;
- }
- stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(queryContext));
- agent->impl->addSequence(sequence);
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- query.impl->encode(outBuffer);
- key << "agent.1." << agent->impl->agentBank;
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
- return true;
-}
-
-string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
-{
- int argCount = schema->getArgumentCount();
-
- if (argmap == 0 || !argmap->isMap())
- return string("Arguments must be in a map value");
-
- for (int aIdx = 0; aIdx < argCount; aIdx++) {
- const SchemaArgument* arg(schema->getArgument(aIdx));
- if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
- if (argmap->keyInMap(arg->getName())) {
- const Value* argVal(argmap->byKey(arg->getName()));
- if (argVal->getType() != arg->getType())
- return string("Argument is the wrong type: ") + arg->getName();
- argVal->impl->encode(buffer);
- } else {
- Value defaultValue(arg->getType());
- defaultValue.impl->encode(buffer);
- }
- }
- }
-
- return string();
-}
-
-string BrokerProxyImpl::encodedSizeMethodArguments(const SchemaMethod* schema, const Value* argmap, uint32_t& size)
-{
- int argCount = schema->getArgumentCount();
-
- if (argmap == 0 || !argmap->isMap())
- return string("Arguments must be in a map value");
-
- for (int aIdx = 0; aIdx < argCount; aIdx++) {
- const SchemaArgument* arg(schema->getArgument(aIdx));
- if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
- if (argmap->keyInMap(arg->getName())) {
- const Value* argVal(argmap->byKey(arg->getName()));
- if (argVal->getType() != arg->getType())
- return string("Argument is the wrong type: ") + arg->getName();
- size += argVal->impl->encodedSize();
- } else {
- Value defaultValue(arg->getType());
- size += defaultValue.impl->encodedSize();
- }
- }
- }
-
- return string();
-}
-
-void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
- const string& methodName, const Value* args, void* userContext)
-{
- int methodCount = cls->getMethodCount();
- int idx;
- for (idx = 0; idx < methodCount; idx++) {
- const SchemaMethod* method = cls->getMethod(idx);
- if (string(method->getName()) == methodName) {
- Mutex::ScopedLock _lock(lock);
- SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
- stringstream key;
- char* buf(outputBuffer);
- uint32_t bufLen(1024);
- bool allocated(false);
-
- string argErrorString = encodedSizeMethodArguments(method, args, bufLen);
- if (!argErrorString.empty()) {
- MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
- eventQueue.push_back(eventMethodResponse(userContext, argError));
- return;
- }
-
- if (bufLen > MA_BUFFER_SIZE) {
- buf = (char*) malloc(bufLen);
- allocated = true;
- }
-
- Buffer outBuffer(buf, bufLen);
- uint32_t sequence(seqMgr.reserve(methodContext));
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
- oid->impl->encode(outBuffer);
- cls->getClassKey()->impl->encode(outBuffer);
- outBuffer.putShortString(methodName);
-
- encodeMethodArguments(method, args, outBuffer);
- key << "agent.1." << oid->impl->getAgentBank();
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
-
- if (allocated)
- free(buf);
-
- return;
- }
- }
-
- MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(eventMethodResponse(userContext, error));
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
- event->name = queueName;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
- QPID_LOG(trace, "Console Link to Broker Stable");
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
- event->context = context;
- event->queryResponse = response;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
- event->context = context;
- event->methodResponse = response;
- return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
- brokerId.decode(inBuffer);
- QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
- string package;
-
- inBuffer.getShortString(package);
- QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console.impl->learnPackage(package);
-
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
- outBuffer.putShortString(package);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
- string text;
- uint32_t code = inBuffer.getLong();
- inBuffer.getShortString(text);
- QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
- uint8_t kind = inBuffer.getOctet();
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
-
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
-
- if (!console.impl->haveClass(classKey.get())) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey->impl->encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
- }
-}
-
-MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
-{
- MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
-
- QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
- response->getException()->asString());
-
- return response;
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
-{
- vector<string> tokens = qpid::split(routingKey, ".");
- uint32_t agentBank;
- uint64_t timestamp;
-
- if (routingKey.empty() || tokens.size() != 4)
- agentBank = 0;
- else
- agentBank = ::atoi(tokens[3].c_str());
-
- timestamp = inBuffer.getLongLong();
- map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
- if (iter != agentList.end()) {
- console.impl->eventAgentHeartbeat(iter->second, timestamp);
- }
- QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& inBuffer, uint32_t seq)
-{
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
- const SchemaEventClass *schema = console.impl->getEventClass(classKey.get());
- if (schema == 0) {
- QPID_LOG(trace, "No Schema Found for EventIndication. seq=" << seq << " key=" << classKey->impl->str());
- return;
- }
-
- EventPtr eptr(EventImpl::factory(schema, inBuffer));
-
- console.impl->eventEventReceived(eptr);
- QPID_LOG(trace, "RCVD EventIndication seq=" << seq << " key=" << classKey->impl->str());
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
- SchemaObjectClass* oClassPtr;
- SchemaEventClass* eClassPtr;
- uint8_t kind = inBuffer.getOctet();
- const SchemaClassKey* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
- console.impl->learnClass(oClassPtr);
- key = oClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr = SchemaEventClassImpl::factory(inBuffer);
- console.impl->learnClass(eClassPtr);
- key = eClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
-}
-
-ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
-
- SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
- if (schema == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
- return ObjectPtr();
- }
-
- ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
- if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
- //
- // We've intercepted information about a remote agent... update the agent list accordingly
- //
- updateAgentList(optr);
- }
- return optr;
-}
-
-void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
-{
- Value* value = obj->getValue("agentBank");
- Mutex::ScopedLock _lock(lock);
- if (value != 0 && value->isUint()) {
- uint32_t agentBank = value->asUint();
- if (obj->isDeleted()) {
- map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
- if (iter != agentList.end()) {
- AgentProxyPtr agent(iter->second);
- console.impl->eventAgentDeleted(agent);
- agentList.erase(agentBank);
- QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
-
- //
- // Release all sequence numbers for requests in-flight to this agent.
- // Since the agent is no longer connected, these requests would not
- // otherwise complete.
- //
- agent->impl->releaseInFlight(seqMgr);
- }
- } else {
- Value* str = obj->getValue("label");
- string label;
- if (str != 0 && str->isString())
- label = str->asString();
- map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
- if (iter == agentList.end()) {
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
- agentList[agentBank] = agent;
- console.impl->eventAgentAdded(agent);
- QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
- }
- }
- }
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
- requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding--;
- if (requestsOutstanding == 0 && !topicBound) {
- topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
- iter != console.impl->bindingList.end(); iter++) {
- string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
- string key(iter->second);
- eventQueue.push_back(eventBind(exchange, queueName, key));
- }
- eventQueue.push_back(eventStable());
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
- status(from.status), schema(from.schema)
-{
- if (from.exception.get())
- exception.reset(new Value(*(from.exception)));
- if (from.arguments.get())
- arguments.reset(new Value(*(from.arguments)));
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
-{
- string text;
-
- status = buf.getLong();
- buf.getMediumString(text);
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-
- if (status != 0)
- return;
-
- arguments.reset(new Value(TYPE_MAP));
- int argCount(schema->getArgumentCount());
- for (int idx = 0; idx < argCount; idx++) {
- const SchemaArgument* arg = schema->getArgument(idx);
- if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
- Value* value(ValueImpl::factory(arg->getType(), buf));
- arguments->insert(arg->getName(), value);
- }
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
-{
- status = s;
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-}
-
-MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
-{
- MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
- return new MethodResponse(impl);
-}
-
-MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
-{
- MethodResponseImpl* impl(new MethodResponseImpl(status, text));
- return new MethodResponse(impl);
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
-{
- ObjectPtr object;
- bool completeContext = false;
-
- if (opcode == Protocol::OP_BROKER_RESPONSE) {
- broker.handleBrokerResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
- broker.handleSchemaResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_PACKAGE_INDICATION)
- broker.handlePackageIndication(buffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION)
- broker.handleClassIndication(buffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence, routingKey);
- else if (opcode == Protocol::OP_EVENT_INDICATION)
- broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, false);
- broker.console.impl->eventObjectUpdate(object, true, false);
- }
- else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, false, true);
- broker.console.impl->eventObjectUpdate(object, false, true);
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- broker.console.impl->eventObjectUpdate(object, true, true);
- }
- else {
- QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void QueryContext::reserve()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
- {
- Mutex::ScopedLock _lock(lock);
- if (--requestsOutstanding > 0)
- return;
- }
-
- Mutex::ScopedLock _block(broker.lock);
- broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
-{
- bool completeContext = false;
- ObjectPtr object;
-
- if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
-
- //
- // Visit each agent and remove the sequence from that agent's in-flight list.
- // This could be made more efficient because only one agent will have this sequence
- // in its list.
- //
- map<uint32_t, AgentProxyPtr> copy;
- {
- Mutex::ScopedLock _block(broker.lock);
- copy = broker.agentList;
- }
- for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
- iter->second->impl->delSequence(sequence);
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- if (object.get() != 0)
- queryResponse->impl->results.push_back(object);
- }
- else {
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void MethodContext::release()
-{
- Mutex::ScopedLock _block(broker.lock);
- broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
-}
-
-bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
-{
- if (opcode == Protocol::OP_METHOD_RESPONSE)
- methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
- else
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
-
- return true;
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }
-uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
-
-BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-