diff options
Diffstat (limited to 'cpp/src/qmf/ConsoleEngine.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.cpp | 1091 |
1 files changed, 0 insertions, 1091 deletions
diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp deleted file mode 100644 index e7991328ee..0000000000 --- a/cpp/src/qmf/ConsoleEngine.cpp +++ /dev/null @@ -1,1091 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/ConsoleEngine.h" -#include "qmf/MessageImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/Typecode.h" -#include "qmf/ObjectImpl.h" -#include "qmf/ObjectIdImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/ValueImpl.h" -#include "qmf/Protocol.h" -#include "qmf/SequenceManager.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> -#include <qpid/sys/Mutex.h> -#include <qpid/log/Statement.h> -#include <qpid/sys/Time.h> -#include <qpid/sys/SystemInfo.h> -#include <string.h> -#include <string> -#include <deque> -#include <map> -#include <vector> -#include <iostream> -#include <fstream> -#include <boost/shared_ptr.hpp> - -using namespace std; -using namespace qmf; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qmf { - - struct MethodResponseImpl { - typedef boost::shared_ptr<MethodResponseImpl> Ptr; - MethodResponse* envelope; - uint32_t status; - auto_ptr<Value> exception; - auto_ptr<Value> arguments; - - MethodResponseImpl(Buffer& buf); - ~MethodResponseImpl() { delete envelope; } - uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } - const Value* getArgs() const { return arguments.get(); } - }; - - struct QueryResponseImpl { - typedef boost::shared_ptr<QueryResponseImpl> Ptr; - QueryResponse *envelope; - uint32_t status; - auto_ptr<Value> exception; - vector<ObjectImpl::Ptr> results; - - QueryResponseImpl() : envelope(new QueryResponse(this)), status(0) {} - ~QueryResponseImpl() { delete envelope; } - uint32_t getStatus() const { return status; } - const Value* getException() const { return exception.get(); } - uint32_t getObjectCount() const { return results.size(); } - const Object* getObject(uint32_t idx) const; - }; - - struct ConsoleEventImpl { - typedef boost::shared_ptr<ConsoleEventImpl> Ptr; - ConsoleEvent::EventKind kind; - boost::shared_ptr<AgentProxyImpl> agent; - string name; - boost::shared_ptr<SchemaClassKey> classKey; - Object* object; - void* context; - Event* event; - uint64_t timestamp; - uint32_t methodHandle; - MethodResponseImpl::Ptr methodResponse; - - ConsoleEventImpl(ConsoleEvent::EventKind k) : - kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {} - ~ConsoleEventImpl() {} - ConsoleEvent copy(); - }; - - struct BrokerEventImpl { - typedef boost::shared_ptr<BrokerEventImpl> Ptr; - BrokerEvent::EventKind kind; - string name; - string exchange; - string bindingKey; - void* context; - QueryResponseImpl::Ptr queryResponse; - - BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {} - ~BrokerEventImpl() {} - BrokerEvent copy(); - }; - - struct AgentProxyImpl { - typedef boost::shared_ptr<AgentProxyImpl> Ptr; - AgentProxy* envelope; - ConsoleEngineImpl* console; - BrokerProxyImpl* broker; - uint32_t agentBank; - string label; - - AgentProxyImpl(ConsoleEngineImpl* c, BrokerProxyImpl* b, uint32_t ab, const string& l) : - envelope(new AgentProxy(this)), console(c), broker(b), agentBank(ab), label(l) {} - ~AgentProxyImpl() {} - const string& getLabel() const { return label; } - }; - - class BrokerProxyImpl { - public: - typedef boost::shared_ptr<BrokerProxyImpl> Ptr; - - BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console); - ~BrokerProxyImpl() {} - - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - - void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - - bool getEvent(BrokerEvent& event) const; - void popEvent(); - - uint32_t agentCount() const; - const AgentProxy* getAgent(uint32_t idx) const; - void sendQuery(const Query& query, void* context, const AgentProxy* agent); - void sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent); - - void addBinding(const string& exchange, const string& key); - void staticRelease() { decOutstanding(); } - - private: - friend class StaticContext; - friend class QueryContext; - mutable Mutex lock; - BrokerProxy* envelope; - ConsoleEngineImpl* console; - string queueName; - Uuid brokerId; - SequenceManager seqMgr; - uint32_t requestsOutstanding; - bool topicBound; - vector<AgentProxyImpl::Ptr> agentList; - deque<MessageImpl::Ptr> xmtQueue; - deque<BrokerEventImpl::Ptr> eventQueue; - -# define MA_BUFFER_SIZE 65536 - char outputBuffer[MA_BUFFER_SIZE]; - - BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); - BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); - BrokerEventImpl::Ptr eventSetupComplete(); - BrokerEventImpl::Ptr eventStable(); - BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponseImpl::Ptr response); - - void handleBrokerResponse(Buffer& inBuffer, uint32_t seq); - void handlePackageIndication(Buffer& inBuffer, uint32_t seq); - void handleCommandComplete(Buffer& inBuffer, uint32_t seq); - void handleClassIndication(Buffer& inBuffer, uint32_t seq); - void handleMethodResponse(Buffer& inBuffer, uint32_t seq); - void handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq); - void handleEventIndication(Buffer& inBuffer, uint32_t seq); - void handleSchemaResponse(Buffer& inBuffer, uint32_t seq); - ObjectImpl::Ptr handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat); - void incOutstandingLH(); - void decOutstanding(); - }; - - struct StaticContext : public SequenceContext { - StaticContext(BrokerProxyImpl& b) : broker(b) {} - ~StaticContext() {} - void reserve() {} - void release() { broker.staticRelease(); } - bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); - BrokerProxyImpl& broker; - }; - - struct QueryContext : public SequenceContext { - QueryContext(BrokerProxyImpl& b, void* u) : - broker(b), userContext(u), requestsOutstanding(0), queryResponse(new QueryResponseImpl()) {} - ~QueryContext() {} - void reserve(); - void release(); - bool handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer); - - mutable Mutex lock; - BrokerProxyImpl& broker; - void* userContext; - uint32_t requestsOutstanding; - QueryResponseImpl::Ptr queryResponse; - }; - - class ConsoleEngineImpl { - public: - ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings()); - ~ConsoleEngineImpl(); - - bool getEvent(ConsoleEvent& event) const; - void popEvent(); - - void addConnection(BrokerProxy& broker, void* context); - void delConnection(BrokerProxy& broker); - - uint32_t packageCount() const; - const string& getPackageName(uint32_t idx) const; - - uint32_t classCount(const char* packageName) const; - const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; - - ClassKind getClassKind(const SchemaClassKey* key) const; - const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const; - const SchemaEventClass* getEventClass(const SchemaClassKey* key) const; - - void bindPackage(const char* packageName); - void bindClass(const SchemaClassKey* key); - void bindClass(const char* packageName, const char* className); - - /* - void startSync(const Query& query, void* context, SyncQuery& sync); - void touchSync(SyncQuery& sync); - void endSync(SyncQuery& sync); - */ - - private: - friend class BrokerProxyImpl; - ConsoleEngine* envelope; - const ConsoleSettings& settings; - mutable Mutex lock; - deque<ConsoleEventImpl::Ptr> eventQueue; - vector<BrokerProxyImpl*> brokerList; - vector<pair<string, string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE) - - // Declare a compare class for the class maps that compares the dereferenced - // class key pointers. The default behavior would be to compare the pointer - // addresses themselves. - struct KeyCompare { - bool operator()(const SchemaClassKeyImpl* left, const SchemaClassKeyImpl* right) const { - return *left < *right; - } - }; - - typedef map<const SchemaClassKeyImpl*, SchemaObjectClassImpl::Ptr, KeyCompare> ObjectClassList; - typedef map<const SchemaClassKeyImpl*, SchemaEventClassImpl::Ptr, KeyCompare> EventClassList; - typedef map<string, pair<ObjectClassList, EventClassList> > PackageList; - - PackageList packages; - - void learnPackage(const string& packageName); - void learnClass(SchemaObjectClassImpl::Ptr cls); - void learnClass(SchemaEventClassImpl::Ptr cls); - bool haveClass(const SchemaClassKeyImpl& key) const; - SchemaObjectClassImpl::Ptr getSchema(const SchemaClassKeyImpl& key) const; - }; -} - -namespace { - const char* QMF_EXCHANGE = "qpid.management"; - const char* DIR_EXCHANGE = "amq.direct"; - const char* BROKER_KEY = "broker"; - const char* BROKER_PACKAGE = "org.apache.qpid.broker"; - const char* AGENT_CLASS = "agent"; - const char* BROKER_AGENT_KEY = "agent.1.0"; -} - -const Object* QueryResponseImpl::getObject(uint32_t idx) const -{ - vector<ObjectImpl::Ptr>::const_iterator iter = results.begin(); - - while (idx > 0) { - if (iter == results.end()) - return 0; - iter++; - idx--; - } - - return (*iter)->envelope; -} - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} - -ConsoleEvent ConsoleEventImpl::copy() -{ - ConsoleEvent item; - - ::memset(&item, 0, sizeof(ConsoleEvent)); - item.kind = kind; - item.agent = agent.get() ? agent->envelope : 0; - item.classKey = classKey.get(); - item.object = object; - item.context = context; - item.event = event; - item.timestamp = timestamp; - item.methodHandle = methodHandle; - item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0; - - STRING_REF(name); - - return item; -} - -BrokerEvent BrokerEventImpl::copy() -{ - BrokerEvent item; - - ::memset(&item, 0, sizeof(BrokerEvent)); - item.kind = kind; - - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); - item.context = context; - item.queryResponse = queryResponse.get() ? queryResponse->envelope : 0; - - return item; -} - -BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : - envelope(e), console(_console.impl) -{ - stringstream qn; - qpid::TcpAddress addr; - - SystemInfo::getLocalHostname(addr); - qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId(); - queueName = qn.str(); - - seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); -} - -void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); - - // TODO: Store session handle -} - -void BrokerProxyImpl::sessionClosed() -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); -} - -void BrokerProxyImpl::startProtocol() -{ - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - agentList.push_back(AgentProxyImpl::Ptr(new AgentProxyImpl(console, this, 0, "Agent embedded in broker"))); - - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); -} - -void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) -{ - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = DIR_EXCHANGE; - message->replyKey = queueName; - - xmtQueue.push_back(message); -} - -void BrokerProxyImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) - seqMgr.dispatch(opcode, sequence, inBuffer); -} - -bool BrokerProxyImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} - -bool BrokerProxyImpl::getEvent(BrokerEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -uint32_t BrokerProxyImpl::agentCount() const -{ - Mutex::ScopedLock _lock(lock); - return agentList.size(); -} - -const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const -{ - Mutex::ScopedLock _lock(lock); - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) - if (idx-- == 0) - return (*iter)->envelope; - return 0; -} - -void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent) -{ - SequenceContext::Ptr queryContext(new QueryContext(*this, context)); - Mutex::ScopedLock _lock(lock); - if (agent != 0) { - sendGetRequestLH(queryContext, query, agent->impl); - } else { - // TODO (optimization) only send queries to agents that have the requested class+package - for (vector<AgentProxyImpl::Ptr>::const_iterator iter = agentList.begin(); - iter != agentList.end(); iter++) { - sendGetRequestLH(queryContext, query, (*iter).get()); - } - } -} - -void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxyImpl* agent) -{ - stringstream key; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve(queryContext)); - - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - query.impl->encode(outBuffer); - key << "agent.1." << agent->agentBank; - sendBufferLH(outBuffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str()); -} - -void BrokerProxyImpl::addBinding(const string& exchange, const string& key) -{ - eventQueue.push_back(eventBind(exchange, queueName, key)); -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); - event->name = queueName; - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; - - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventStable() -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE)); - return event; -} - -BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponseImpl::Ptr response) -{ - BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE)); - event->context = context; - event->queryResponse = response; - return event; -} - -void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq) -{ - brokerId.decode(inBuffer); - QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId); - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT PackageRequest seq=" << sequence); -} - -void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq) -{ - string package; - - inBuffer.getShortString(package); - QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package); - console->learnPackage(package); - - Mutex::ScopedLock _lock(lock); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - incOutstandingLH(); - Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence); - outBuffer.putShortString(package); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package); -} - -void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq) -{ - string text; - uint32_t code = inBuffer.getLong(); - inBuffer.getShortString(text); - QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text); -} - -void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq) -{ - uint8_t kind = inBuffer.getOctet(); - SchemaClassKeyImpl classKey(inBuffer); - - QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey.str()); - - if (!console->haveClass(classKey)) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence); - classKey.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey.str()); - } -} - -void BrokerProxyImpl::handleMethodResponse(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) -{ - // TODO -} - -void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) -{ - SchemaObjectClassImpl::Ptr oClassPtr; - SchemaEventClassImpl::Ptr eClassPtr; - uint8_t kind = inBuffer.getOctet(); - const SchemaClassKeyImpl* key; - if (kind == CLASS_OBJECT) { - oClassPtr.reset(new SchemaObjectClassImpl(inBuffer)); - console->learnClass(oClassPtr); - key = oClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->str()); - - // - // If we have just learned about the org.apache.qpid.broker:agent class, send a get - // request for the current list of agents so we can have it on-hand before we declare - // this session "stable". - // - if (key->getClassName() == AGENT_CLASS && key->getPackageName() == BROKER_PACKAGE) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - FieldTable ft; - ft.setString("_class", AGENT_CLASS); - ft.setString("_package", BROKER_PACKAGE); - ft.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); - } - } else if (kind == CLASS_EVENT) { - eClassPtr.reset(new SchemaEventClassImpl(inBuffer)); - console->learnClass(eClassPtr); - key = eClassPtr->getClassKey()->impl; - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->str()); - } - else { - QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); - } -} - -ObjectImpl::Ptr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) -{ - SchemaClassKeyImpl classKey(inBuffer); - QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey.str()); - - SchemaObjectClassImpl::Ptr schema = console->getSchema(classKey); - if (schema.get() == 0) { - QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey.str()); - return ObjectImpl::Ptr(); - } - - return ObjectImpl::Ptr(new ObjectImpl(schema->envelope, inBuffer, prop, stat, true)); -} - -void BrokerProxyImpl::incOutstandingLH() -{ - requestsOutstanding++; -} - -void BrokerProxyImpl::decOutstanding() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding--; - if (requestsOutstanding == 0 && !topicBound) { - topicBound = true; - for (vector<pair<string, string> >::const_iterator iter = console->bindingList.begin(); - iter != console->bindingList.end(); iter++) { - string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first); - string key(iter->second); - eventQueue.push_back(eventBind(exchange, queueName, key)); - } - eventQueue.push_back(eventStable()); - } -} - -MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this)) -{ - string text; - - status = buf.getLong(); - buf.getMediumString(text); - exception.reset(new Value(TYPE_LSTR)); - exception->setString(text.c_str()); - - // TODO: Parse schema-specific output arguments. - arguments.reset(new Value(TYPE_MAP)); -} - -bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) -{ - bool completeContext = false; - if (opcode == Protocol::OP_BROKER_RESPONSE) { - broker.handleBrokerResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_SCHEMA_RESPONSE) { - broker.handleSchemaResponse(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_PACKAGE_INDICATION) - broker.handlePackageIndication(buffer, sequence); - else if (opcode == Protocol::OP_CLASS_INDICATION) - broker.handleClassIndication(buffer, sequence); - else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) - broker.handleHeartbeatIndication(buffer, sequence); - else if (opcode == Protocol::OP_EVENT_INDICATION) - broker.handleEventIndication(buffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) - broker.handleObjectIndication(buffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, true); - else { - QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -void QueryContext::reserve() -{ - Mutex::ScopedLock _lock(lock); - requestsOutstanding++; -} - -void QueryContext::release() -{ - Mutex::ScopedLock _lock(lock); - if (--requestsOutstanding == 0) { - broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); - } -} - -bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) -{ - bool completeContext = false; - ObjectImpl::Ptr object; - - if (opcode == Protocol::OP_COMMAND_COMPLETE) { - broker.handleCommandComplete(buffer, sequence); - completeContext = true; - } - else if (opcode == Protocol::OP_OBJECT_INDICATION) { - object = broker.handleObjectIndication(buffer, sequence, true, true); - if (object.get() != 0) - queryResponse->results.push_back(object); - } - else { - QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode); - completeContext = true; - } - - return completeContext; -} - -ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : - envelope(e), settings(s) -{ - bindingList.push_back(pair<string, string>(string(), "schema.#")); - if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { - bindingList.push_back(pair<string, string>(string(), "console.#")); - } else { - if (settings.rcvObjects && !settings.userBindings) - bindingList.push_back(pair<string, string>(string(), "console.obj.#")); - else - bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent")); - if (settings.rcvEvents) - bindingList.push_back(pair<string, string>(string(), "console.event.#")); - if (settings.rcvHeartbeats) - bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#")); - } -} - -ConsoleEngineImpl::~ConsoleEngineImpl() -{ - // This function intentionally left blank. -} - -bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void ConsoleEngineImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -void ConsoleEngineImpl::addConnection(BrokerProxy& broker, void* /*context*/) -{ - Mutex::ScopedLock _lock(lock); - brokerList.push_back(broker.impl); -} - -void ConsoleEngineImpl::delConnection(BrokerProxy& broker) -{ - Mutex::ScopedLock _lock(lock); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - if (*iter == broker.impl) { - brokerList.erase(iter); - break; - } -} - -uint32_t ConsoleEngineImpl::packageCount() const -{ - Mutex::ScopedLock _lock(lock); - return packages.size(); -} - -const string& ConsoleEngineImpl::getPackageName(uint32_t idx) const -{ - const static string empty; - - Mutex::ScopedLock _lock(lock); - if (idx >= packages.size()) - return empty; - - PackageList::const_iterator iter = packages.begin(); - for (uint32_t i = 0; i < idx; i++) iter++; - return iter->first; -} - -uint32_t ConsoleEngineImpl::classCount(const char* packageName) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - - return oList.size() + eList.size(); -} - -const SchemaClassKey* ConsoleEngineImpl::getClass(const char* packageName, uint32_t idx) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(packageName); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - uint32_t count = 0; - - for (ObjectClassList::const_iterator oIter = oList.begin(); - oIter != oList.end(); oIter++) { - if (count == idx) - return oIter->second->getClassKey(); - count++; - } - - for (EventClassList::const_iterator eIter = eList.begin(); - eIter != eList.end(); eIter++) { - if (count == idx) - return eIter->second->getClassKey(); - count++; - } - - return 0; -} - -ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return CLASS_OBJECT; - - const EventClassList& eList = pIter->second.second; - if (eList.find(key->impl) != eList.end()) - return CLASS_EVENT; - return CLASS_OBJECT; -} - -const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return 0; - - const ObjectClassList& oList = pIter->second.first; - ObjectClassList::const_iterator iter = oList.find(key->impl); - if (iter == oList.end()) - return 0; - return iter->second->envelope; -} - -const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey* key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return 0; - - const EventClassList& eList = pIter->second.second; - EventClassList::const_iterator iter = eList.find(key->impl); - if (iter == eList.end()) - return 0; - return iter->second->envelope; -} - -void ConsoleEngineImpl::bindPackage(const char* packageName) -{ - stringstream key; - key << "console.obj.*.*." << packageName << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -void ConsoleEngineImpl::bindClass(const SchemaClassKey* classKey) -{ - stringstream key; - key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -void ConsoleEngineImpl::bindClass(const char* packageName, const char* className) -{ - stringstream key; - key << "console.obj.*.*." << packageName << "." << className << ".#"; - Mutex::ScopedLock _lock(lock); - bindingList.push_back(pair<string, string>(string(), key.str())); - for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin(); - iter != brokerList.end(); iter++) - (*iter)->addBinding(QMF_EXCHANGE, key.str()); -} - -/* -void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync) -{ -} - -void ConsoleEngineImpl::touchSync(SyncQuery& sync) -{ -} - -void ConsoleEngineImpl::endSync(SyncQuery& sync) -{ -} -*/ - -void ConsoleEngineImpl::learnPackage(const string& packageName) -{ - Mutex::ScopedLock _lock(lock); - if (packages.find(packageName) == packages.end()) - packages.insert(pair<string, pair<ObjectClassList, EventClassList> > - (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList()))); -} - -void ConsoleEngineImpl::learnClass(SchemaObjectClassImpl::Ptr cls) -{ - Mutex::ScopedLock _lock(lock); - const SchemaClassKey* key = cls->getClassKey(); - PackageList::iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return; - - ObjectClassList& list = pIter->second.first; - if (list.find(key->impl) == list.end()) - list[key->impl] = cls; -} - -void ConsoleEngineImpl::learnClass(SchemaEventClassImpl::Ptr cls) -{ - Mutex::ScopedLock _lock(lock); - const SchemaClassKey* key = cls->getClassKey(); - PackageList::iterator pIter = packages.find(key->getPackageName()); - if (pIter == packages.end()) - return; - - EventClassList& list = pIter->second.second; - if (list.find(key->impl) == list.end()) - list[key->impl] = cls; -} - -bool ConsoleEngineImpl::haveClass(const SchemaClassKeyImpl& key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key.getPackageName()); - if (pIter == packages.end()) - return false; - - const ObjectClassList& oList = pIter->second.first; - const EventClassList& eList = pIter->second.second; - - return oList.find(&key) != oList.end() || eList.find(&key) != eList.end(); -} - -SchemaObjectClassImpl::Ptr ConsoleEngineImpl::getSchema(const SchemaClassKeyImpl& key) const -{ - Mutex::ScopedLock _lock(lock); - PackageList::const_iterator pIter = packages.find(key.getPackageName()); - if (pIter == packages.end()) - return SchemaObjectClassImpl::Ptr(); - - const ObjectClassList& oList = pIter->second.first; - ObjectClassList::const_iterator iter = oList.find(&key); - if (iter == oList.end()) - return SchemaObjectClassImpl::Ptr(); - - return iter->second; -} - -//================================================================== -// Wrappers -//================================================================== - -AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {} -AgentProxy::~AgentProxy() { delete impl; } -const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); } - -BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} -BrokerProxy::~BrokerProxy() { delete impl; } -void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } -void BrokerProxy::sessionClosed() { impl->sessionClosed(); } -void BrokerProxy::startProtocol() { impl->startProtocol(); } -void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void BrokerProxy::popXmt() { impl->popXmt(); } -bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } -void BrokerProxy::popEvent() { impl->popEvent(); } -uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } -const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); } -void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); } - -MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} -MethodResponse::~MethodResponse() {} -uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } -const Value* MethodResponse::getException() const { return impl->getException(); } -const Value* MethodResponse::getArgs() const { return impl->getArgs(); } - -QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {} -QueryResponse::~QueryResponse() {} -uint32_t QueryResponse::getStatus() const { return impl->getStatus(); } -const Value* QueryResponse::getException() const { return impl->getException(); } -uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); } -const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); } - -ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {} -ConsoleEngine::~ConsoleEngine() { delete impl; } -bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } -void ConsoleEngine::popEvent() { impl->popEvent(); } -void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } -void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } -uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); } -const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } -uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); } -const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } -ClassKind ConsoleEngine::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); } -const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); } -const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); } -void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } -void ConsoleEngine::bindClass(const SchemaClassKey* key) { impl->bindClass(key); } -void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } -//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } -//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); } -//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); } - - |