diff options
author | Ted Ross <tross@apache.org> | 2009-08-28 22:03:26 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2009-08-28 22:03:26 +0000 |
commit | dc6068ce7a6bd0d9467886b44cd6252ba491e615 (patch) | |
tree | 7bf0b42a58d7f020fc61e81ae53ba7415e8fe91a /cpp | |
parent | a435a75437cd389c6fa08ae171f7d25b1d3a7e77 (diff) | |
download | qpid-python-dc6068ce7a6bd0d9467886b44cd6252ba491e615.tar.gz |
Major work in the QMF engine.
- The console framework now establishes connectivity with the broker.
- The Ruby binding for console is tracking the engine development.
- Overall improvements (thread safety in Ruby, etc.) have been added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@809042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/bindings/qmf/qmfengine.i | 4 | ||||
-rw-r--r-- | cpp/bindings/qmf/ruby/qmf.rb | 298 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/agent_ruby.rb | 10 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/python_console.py | 12 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/ruby_console.rb | 43 | ||||
-rwxr-xr-x | cpp/bindings/qmf/tests/run_interop_tests | 13 | ||||
-rw-r--r-- | cpp/src/qmf.mk | 1 | ||||
-rw-r--r-- | cpp/src/qmf/AgentEngine.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qmf/AgentEngine.h | 4 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.cpp | 488 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleEngine.h | 178 | ||||
-rw-r--r-- | cpp/src/qmf/ObjectImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/ResilientConnection.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qmf/ResilientConnection.h | 11 | ||||
-rw-r--r-- | cpp/src/qmf/Schema.h | 23 | ||||
-rw-r--r-- | cpp/src/qmf/SchemaImpl.cpp | 75 | ||||
-rw-r--r-- | cpp/src/qmf/SchemaImpl.h | 27 |
17 files changed, 1096 insertions, 163 deletions
diff --git a/cpp/bindings/qmf/qmfengine.i b/cpp/bindings/qmf/qmfengine.i index 8ae28730e5..d3500c9b8f 100644 --- a/cpp/bindings/qmf/qmfengine.i +++ b/cpp/bindings/qmf/qmfengine.i @@ -20,7 +20,8 @@ %{ #include "qmf/AgentEngine.h" -#include <qmf/ResilientConnection.h> +#include "qmf/ConsoleEngine.h" +#include "qmf/ResilientConnection.h" %} @@ -28,6 +29,7 @@ %include <qmf/Query.h> %include <qmf/Message.h> %include <qmf/AgentEngine.h> +%include <qmf/ConsoleEngine.h> %include <qmf/ConnectionSettings.h> %include <qmf/ResilientConnection.h> %include <qmf/Typecode.h> diff --git a/cpp/bindings/qmf/ruby/qmf.rb b/cpp/bindings/qmf/ruby/qmf.rb index badd53942d..38c42b5aa7 100644 --- a/cpp/bindings/qmf/ruby/qmf.rb +++ b/cpp/bindings/qmf/ruby/qmf.rb @@ -20,6 +20,7 @@ require 'qmfengine' require 'thread' require 'socket' +require 'monitor' module Qmf @@ -70,39 +71,18 @@ module Qmf def sess_event_recv(context, message); end end - class Query - attr_reader :impl - def initialize(i) - @impl = i - end - - def package_name - @impl.getPackage - end - - def class_name - @impl.getClass - end - - def object_id - objid = @impl.getObjectId - if objid.class == NilClass - return nil - end - return ObjectId.new(objid) - end - end - class Connection + include MonitorMixin + attr_reader :impl def initialize(settings) + super() @impl = Qmfengine::ResilientConnection.new(settings.impl) @sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0) @impl.setNotifyFd(@sockEngine.fileno) @new_conn_handlers = Array.new @conn_handlers = Array.new - @sess_handlers = Array.new @thread = Thread.new do run @@ -110,47 +90,56 @@ module Qmf end def add_conn_handler(handler) - @new_conn_handlers.push(handler) + synchronize do + @new_conn_handlers.push(handler) + end @sockEngine.write("x") end - def add_sess_handler(handler) - @sess_handlers.push(handler) - end - def run() - event = Qmfengine::ResilientConnectionEvent.new + eventImpl = Qmfengine::ResilientConnectionEvent.new connected = nil + new_handlers = nil + bt_count = 0 + while :true @sock.read(1) - @new_conn_handlers.each do |nh| + synchronize do + new_handlers = @new_conn_handlers + @new_conn_handlers = Array.new + end + + new_handlers.each do |nh| @conn_handlers.push(nh) nh.conn_event_connected() if connected end - @new_conn_handlers = Array.new + new_handlers = nil - valid = @impl.getEvent(event) + valid = @impl.getEvent(eventImpl) while valid begin - case event.kind + case eventImpl.kind when Qmfengine::ResilientConnectionEvent::CONNECTED connected = :true @conn_handlers.each { |h| h.conn_event_connected() } when Qmfengine::ResilientConnectionEvent::DISCONNECTED connected = nil - @conn_handlers.each { |h| h.conn_event_disconnected(event.errorText) } + @conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) } when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED - event.sessionContext.handler.sess_event_session_closed(event.sessionContext, event.errorText) + eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) when Qmfengine::ResilientConnectionEvent::RECV - event.sessionContext.handler.sess_event_recv(event.sessionContext, event.message) + eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) end rescue Exception => ex puts "Event Exception: #{ex}" - puts ex.backtrace + if bt_count < 2 + puts ex.backtrace + bt_count += 1 + end end @impl.popEvent - valid = @impl.getEvent(event) + valid = @impl.getEvent(eventImpl) end end end @@ -164,9 +153,12 @@ module Qmf @label = label @handler = handler @handle = Qmfengine::SessionHandle.new - @conn.add_sess_handler(@handler) result = @conn.impl.createSession(label, self, @handle) end + + def destroy() + @conn.impl.destroySession(@handle) + end end ##============================================================================== @@ -262,6 +254,30 @@ module Qmf end end + class ConsoleObject < QmfObject + attr_reader :current_time, :create_time, :delete_time + + def initialize(cls) + super(cls) + end + + def update() + end + + def mergeUpdate(newObject) + end + + def deleted?() + @delete_time > 0 + end + + def index() + end + + def method_missing(name, *args) + end + end + class ObjectId attr_reader :impl def initialize(impl=nil) @@ -357,6 +373,29 @@ module Qmf end end + class Query + attr_reader :impl + def initialize(i) + @impl = i + end + + def package_name + @impl.getPackage + end + + def class_name + @impl.getClass + end + + def object_id + objid = @impl.getObjectId + if objid.class == NilClass + return nil + end + return ObjectId.new(objid) + end + end + ##============================================================================== ## SCHEMA ##============================================================================== @@ -410,6 +449,21 @@ module Qmf end end + class SchemaClassKey + attr_reader :impl + def initialize(i) + @impl = i + end + + def get_package() + @impl.getPackageName() + end + + def get_class() + @impl.getClassName() + end + end + class SchemaObjectClass attr_reader :impl def initialize(package, name, kwargs={}) @@ -467,6 +521,170 @@ module Qmf ## CONSOLE ##============================================================================== + class ConsoleHandler + def agent_added(agent); end + def agent_deleted(agent); end + def new_package(package); end + def new_class(class_key); end + def object_update(object, hasProps, hasStats); end + def event_received(event); end + def agent_heartbeat(agent, timestamp); end + def method_response(resp); end + def broker_info(broker); end + end + + class Console + attr_reader :impl + + def initialize(handler, kwargs={}) + @handler = handler + @impl = Qmfengine::ConsoleEngine.new + @event = Qmfengine::ConsoleEvent.new + @broker_list = Array.new + end + + def add_connection(conn) + broker = Broker.new(self, conn) + @broker_list.push(broker) + return broker + end + + def del_connection(broker) + end + + def get_packages() + end + + def get_classes(package) + end + + def get_schema(class_key) + end + + def bind_package(package) + end + + def bind_class(kwargs = {}) + end + + def get_agents(broker = nil) + end + + def get_objects(query, kwargs = {}) + end + + def start_sync(query) + end + + def touch_sync(sync) + end + + def end_sync(sync) + end + + def do_console_events() + count = 0 + valid = @impl.getEvent(@event) + while valid + count += 1 + case @event.kind + when Qmfengine::ConsoleEvent::AGENT_ADDED + when Qmfengine::ConsoleEvent::AGENT_DELETED + when Qmfengine::ConsoleEvent::NEW_PACKAGE + when Qmfengine::ConsoleEvent::NEW_CLASS + when Qmfengine::ConsoleEvent::OBJECT_UPDATE + when Qmfengine::ConsoleEvent::EVENT_RECEIVED + when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT + when Qmfengine::ConsoleEvent::METHOD_RESPONSE + end + @impl.popEvent + valid = @impl.getEvent(@event) + end + return count + end + end + + class Broker < ConnectionHandler + attr_reader :impl + + def initialize(console, conn) + @console = console + @conn = conn + @session = nil + @event = Qmfengine::BrokerEvent.new + @xmtMessage = Qmfengine::Message.new + @impl = Qmfengine::BrokerProxy.new(@console.impl) + @console.impl.addConnection(@impl, self) + @conn.add_conn_handler(self) + end + + def do_broker_events() + count = 0 + valid = @impl.getEvent(@event) + while valid + count += 1 + puts "Broker Event: #{@event.kind}" + case @event.kind + when Qmfengine::BrokerEvent::BROKER_INFO + when Qmfengine::BrokerEvent::DECLARE_QUEUE + @conn.impl.declareQueue(@session.handle, @event.name) + when Qmfengine::BrokerEvent::DELETE_QUEUE + @conn.impl.deleteQueue(@session.handle, @event.name) + when Qmfengine::BrokerEvent::BIND + @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey) + when Qmfengine::BrokerEvent::UNBIND + @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey) + when Qmfengine::BrokerEvent::SETUP_COMPLETE + @impl.startProtocol + end + @impl.popEvent + valid = @impl.getEvent(@event) + end + return count + end + + def do_broker_messages() + count = 0 + valid = @impl.getXmtMessage(@xmtMessage) + while valid + count += 1 + @conn.impl.sendMessage(@session.handle, @xmtMessage) + @impl.popXmt + valid = @impl.getXmtMessage(@xmtMessage) + end + return count + end + + def do_events() + begin + ccnt = @console.do_console_events + bcnt = do_broker_events + mcnt = do_broker_messages + end until ccnt == 0 and bcnt == 0 and mcnt == 0 + end + + def conn_event_connected() + puts "Console Connection Established..." + @session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self) + @impl.sessionOpened(@session.handle) + do_events + end + + def conn_event_disconnected(error) + puts "Console Connection Lost" + end + + def sess_event_session_closed(context, error) + puts "Console Session Lost" + @impl.sessionClosed() + end + + def sess_event_recv(context, message) + @impl.handleRcvMessage(message) + do_events + end + end + ##============================================================================== ## AGENT ##============================================================================== diff --git a/cpp/bindings/qmf/tests/agent_ruby.rb b/cpp/bindings/qmf/tests/agent_ruby.rb index 0f85c90f37..75de2b5fa1 100755 --- a/cpp/bindings/qmf/tests/agent_ruby.rb +++ b/cpp/bindings/qmf/tests/agent_ruby.rb @@ -53,7 +53,10 @@ class Model method = Qmf::SchemaMethod.new("create_child", :desc => "Create a new child object") method.add_argument(Qmf::SchemaArgument.new("child_name", Qmf::TYPE_LSTR, :dir => Qmf::DIR_IN)) method.add_argument(Qmf::SchemaArgument.new("child_ref", Qmf::TYPE_REF, :dir => Qmf::DIR_OUT)) + @parent_class.add_method(method) + method = Qmf::SchemaMethod.new("probe_userid", :desc => "Return the user-id for this method call") + method.add_argument(Qmf::SchemaArgument.new("userid", Qmf::TYPE_SSTR, :dir => Qmf::DIR_OUT)) @parent_class.add_method(method) @child_class = Qmf::SchemaObjectClass.new("org.apache.qpid.qmf", "child") @@ -136,6 +139,13 @@ class App < Qmf::AgentHandler @child.set_attr("name", args.by_key("child_name")) @child.set_object_id(oid) @agent.method_response(context, 0, "OK", args) + + elsif name == "probe_userid" + args['userid'] = userId + @agent.method_response(context, 0, "OK", args) + + else + @agent.method_response(context, 1, "Unimplemented Method: #{name}", args) end end diff --git a/cpp/bindings/qmf/tests/python_console.py b/cpp/bindings/qmf/tests/python_console.py index 365f2ac33a..bcd3063fe3 100755 --- a/cpp/bindings/qmf/tests/python_console.py +++ b/cpp/bindings/qmf/tests/python_console.py @@ -128,6 +128,18 @@ class QmfInteropTests(TestBase010): self.assertEqual(parent.int16val, -1000) self.assertEqual(parent.int8val, -100) + def test_D_userid_for_method(self): + self.startQmf(); + qmf = self.qmf + + parents = qmf.getObjects(_class="parent") + self.assertEqual(len(parents), 1) + parent = parents[0] + + result = parent.probe_userid() + self.assertEqual(result.status, 0) + self.assertEqual(result.userid, "guest") + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) diff --git a/cpp/bindings/qmf/tests/ruby_console.rb b/cpp/bindings/qmf/tests/ruby_console.rb new file mode 100755 index 0000000000..c7ee9c3686 --- /dev/null +++ b/cpp/bindings/qmf/tests/ruby_console.rb @@ -0,0 +1,43 @@ +#!/usr/bin/ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'qmf' +require 'socket' + +class App < Qmf::ConsoleHandler + + def main + @settings = Qmf::ConnectionSettings.new + @settings.set_attr("host", ARGV[0]) if ARGV.size > 0 + @settings.set_attr("port", ARGV[1].to_i) if ARGV.size > 1 + @connection = Qmf::Connection.new(@settings) + @qmf = Qmf::Console.new(self) + + @qmf.add_connection(@connection) + + sleep + end +end + +app = App.new +app.main + + diff --git a/cpp/bindings/qmf/tests/run_interop_tests b/cpp/bindings/qmf/tests/run_interop_tests index e6fc872dbb..f3f78185c7 100755 --- a/cpp/bindings/qmf/tests/run_interop_tests +++ b/cpp/bindings/qmf/tests/run_interop_tests @@ -54,12 +54,23 @@ if test -d ${PYTHON_DIR} ; then echo "Running qmf interop tests using broker on port $BROKER_PORT" PYTHONPATH=${PYTHON_DIR}:${MY_DIR} export PYTHONPATH - echo " Ruby Agent vs. Pure-Python Console" + echo " Ruby Agent (external storage) vs. Pure-Python Console" start_ruby_agent echo " Ruby agent started at pid $AGENT_PID" ${PYTHON_DIR}/qpid-python-test -m python_console -b localhost:$BROKER_PORT $@ RETCODE=$? stop_ruby_agent + + # Also against the Pure-Python console: + # Ruby agent (internal storage) + # Python agent (external and internal) + # C++ agent (external and internal) + # + # Other consoles against the same set of agents: + # Wrapped Python console + # Ruby console + # C++ console + stop_broker if test x$RETCODE != x0; then echo "FAIL qmf interop tests"; exit 1; diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index 9d24709b6e..a9c2f24922 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -37,6 +37,7 @@ nobase_include_HEADERS += \ libqmfcommon_la_SOURCES = \ qmf/ConnectionSettingsImpl.cpp \ qmf/ConnectionSettingsImpl.h \ + qmf/ConsoleEngine.cpp \ qmf/ConsoleEngine.h \ qmf/Event.h \ qmf/Message.h \ diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp index bef8b3d102..ec5b117337 100644 --- a/cpp/src/qmf/AgentEngine.cpp +++ b/cpp/src/qmf/AgentEngine.cpp @@ -85,9 +85,9 @@ namespace qmf { void setStoreDir(const char* path); void setTransferDir(const char* path); void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item); + bool getXmtMessage(Message& item) const; void popXmt(); - bool getEvent(AgentEvent& event); + bool getEvent(AgentEvent& event) const; void popEvent(); void newSession(); void startProtocol(); @@ -103,7 +103,7 @@ namespace qmf { void raiseEvent(Event& event); private: - Mutex lock; + mutable Mutex lock; Mutex addLock; string label; string queueName; @@ -134,13 +134,13 @@ namespace qmf { # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; - struct SchemaClassKey { + struct AgentClassKey { string name; uint8_t hash[16]; - SchemaClassKey(const string& n, const uint8_t* h) : name(n) { + AgentClassKey(const string& n, const uint8_t* h) : name(n) { memcpy(hash, h, 16); } - SchemaClassKey(Buffer& buffer) { + AgentClassKey(Buffer& buffer) { buffer.getShortString(name); buffer.getBin128(hash); } @@ -149,8 +149,8 @@ namespace qmf { } }; - struct SchemaClassKeyComp { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + struct AgentClassKeyComp { + bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const { if (lhs.name != rhs.name) return lhs.name < rhs.name; @@ -162,8 +162,8 @@ namespace qmf { } }; - typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap; - typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap; + typedef map<AgentClassKey, SchemaObjectClassImpl*, AgentClassKeyComp> ObjectClassMap; + typedef map<AgentClassKey, SchemaEventClassImpl*, AgentClassKeyComp> EventClassMap; struct ClassMaps { ObjectClassMap objectClasses; @@ -185,7 +185,7 @@ namespace qmf { void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); void sendPackageIndicationLH(const string& packageName); - void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key); + void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); @@ -277,7 +277,7 @@ void AgentEngineImpl::handleRcvMessage(Message& message) } } -bool AgentEngineImpl::getXmtMessage(Message& item) +bool AgentEngineImpl::getXmtMessage(Message& item) const { Mutex::ScopedLock _lock(lock); if (xmtQueue.empty()) @@ -293,7 +293,7 @@ void AgentEngineImpl::popXmt() xmtQueue.pop_front(); } -bool AgentEngineImpl::getEvent(AgentEvent& event) +bool AgentEngineImpl::getEvent(AgentEvent& event) const { Mutex::ScopedLock _lock(lock); if (eventQueue.empty()) @@ -423,11 +423,11 @@ void AgentEngineImpl::registerClass(SchemaObjectClass* cls) map<string, ClassMaps>::iterator iter = packages.find(impl->package); if (iter == packages.end()) { packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); + iter = packages.find(impl->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - SchemaClassKey key(impl->name, impl->getHash()); + AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); iter->second.objectClasses[key] = impl; // TODO: Indicate this schema if connected. @@ -441,11 +441,11 @@ void AgentEngineImpl::registerClass(SchemaEventClass* cls) map<string, ClassMaps>::iterator iter = packages.find(impl->package); if (iter == packages.end()) { packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); + iter = packages.find(impl->getClassKey()->getPackageName()); // TODO: Indicate this package if connected } - SchemaClassKey key(impl->name, impl->getHash()); + AgentClassKey key(impl->getClassKey()->getClassName(), impl->getClassKey()->getHash()); iter->second.eventClasses[key] = impl; // TODO: Indicate this schema if connected. @@ -576,7 +576,7 @@ void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); } -void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key) +void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) { Buffer buffer(outputBuffer, MA_BUFFER_SIZE); encodeHeader(buffer, 'q'); @@ -690,7 +690,7 @@ void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, string rKey(replyKey); string packageName; inBuffer.getShortString(packageName); - SchemaClassKey key(inBuffer); + AgentClassKey key(inBuffer); if (rExchange.empty()) rExchange = QMF_EXCHANGE; @@ -791,7 +791,7 @@ void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, con ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); boost::shared_ptr<ObjectId> oid(oidImpl->envelope); buffer.getShortString(pname); - SchemaClassKey classKey(buffer); + AgentClassKey classKey(buffer); buffer.getShortString(method); map<string, ClassMaps>::const_iterator pIter = packages.find(pname); @@ -876,7 +876,7 @@ void AgentEngine::handleRcvMessage(Message& message) impl->handleRcvMessage(message); } -bool AgentEngine::getXmtMessage(Message& item) +bool AgentEngine::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } @@ -886,7 +886,7 @@ void AgentEngine::popXmt() impl->popXmt(); } -bool AgentEngine::getEvent(AgentEvent& event) +bool AgentEngine::getEvent(AgentEvent& event) const { return impl->getEvent(event); } diff --git a/cpp/src/qmf/AgentEngine.h b/cpp/src/qmf/AgentEngine.h index d18a104e96..bbfbada80c 100644 --- a/cpp/src/qmf/AgentEngine.h +++ b/cpp/src/qmf/AgentEngine.h @@ -101,7 +101,7 @@ namespace qmf { *@param item The Message structure describing the message to be produced. *@return true if the Message is valid, false if there are no messages to send. */ - bool getXmtMessage(Message& item); + bool getXmtMessage(Message& item) const; /** * Remove and discard one message from the head of the transmit queue. @@ -113,7 +113,7 @@ namespace qmf { *@param event The event iff the return value is true *@return true if event is valid, false if there are no events to process */ - bool getEvent(AgentEvent& event); + bool getEvent(AgentEvent& event) const; /** * Remove and discard one event from the head of the event queue. diff --git a/cpp/src/qmf/ConsoleEngine.cpp b/cpp/src/qmf/ConsoleEngine.cpp new file mode 100644 index 0000000000..28b2852d67 --- /dev/null +++ b/cpp/src/qmf/ConsoleEngine.cpp @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/ConsoleEngine.h" +#include "qmf/MessageImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/Typecode.h" +#include "qmf/ObjectImpl.h" +#include "qmf/ObjectIdImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/ValueImpl.h" +#include <qpid/framing/Buffer.h> +#include <qpid/framing/Uuid.h> +#include <qpid/framing/FieldTable.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/sys/Mutex.h> +#include <qpid/log/Statement.h> +#include <qpid/sys/Time.h> +#include <string.h> +#include <string> +#include <deque> +#include <map> +#include <iostream> +#include <fstream> +#include <boost/shared_ptr.hpp> + +using namespace std; +using namespace qmf; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qmf { + + struct MethodResponseImpl { + typedef boost::shared_ptr<MethodResponseImpl> Ptr; + MethodResponse* envelope; + uint32_t status; + auto_ptr<Value> exception; + auto_ptr<Value> arguments; + + MethodResponseImpl(Buffer& buf); + ~MethodResponseImpl() {} + uint32_t getStatus() const { return status; } + const Value* getException() const { return exception.get(); } + const Value* getArgs() const { return arguments.get(); } + }; + + struct ConsoleEventImpl { + typedef boost::shared_ptr<ConsoleEventImpl> Ptr; + ConsoleEvent::EventKind kind; + boost::shared_ptr<AgentProxyImpl> agent; + string name; + boost::shared_ptr<SchemaClassKey> classKey; + Object* object; + void* context; + Event* event; + uint64_t timestamp; + uint32_t methodHandle; + MethodResponseImpl::Ptr methodResponse; + + ConsoleEventImpl(ConsoleEvent::EventKind k) : + kind(k), object(0), context(0), event(0), timestamp(0), methodHandle(0) {} + ~ConsoleEventImpl() {} + ConsoleEvent copy(); + }; + + struct BrokerEventImpl { + typedef boost::shared_ptr<BrokerEventImpl> Ptr; + BrokerEvent::EventKind kind; + string name; + string exchange; + string bindingKey; + + BrokerEventImpl(BrokerEvent::EventKind k) : kind(k) {} + ~BrokerEventImpl() {} + BrokerEvent copy(); + }; + + struct BrokerProxyImpl { + typedef boost::shared_ptr<BrokerProxyImpl> Ptr; + mutable Mutex lock; + BrokerProxy* envelope; + ConsoleEngineImpl* console; + string queueName; + deque<MessageImpl::Ptr> xmtQueue; + deque<BrokerEventImpl::Ptr> eventQueue; + + static const char* QMF_EXCHANGE; + static const char* DIR_EXCHANGE; + static const char* BROKER_KEY; + + BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console); + ~BrokerProxyImpl() {} + + void sessionOpened(SessionHandle& sh); + void sessionClosed(); + void startProtocol(); + + void handleRcvMessage(Message& message); + bool getXmtMessage(Message& item) const; + void popXmt(); + + bool getEvent(BrokerEvent& event) const; + void popEvent(); + + BrokerEventImpl::Ptr eventDeclareQueue(const string& queueName); + BrokerEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); + BrokerEventImpl::Ptr eventSetupComplete(); + }; + + struct AgentProxyImpl { + typedef boost::shared_ptr<AgentProxyImpl> Ptr; + AgentProxy* envelope; + ConsoleEngineImpl* console; + + AgentProxyImpl(AgentProxy* e, ConsoleEngine& _console) : + envelope(e), console(_console.impl) {} + ~AgentProxyImpl() {} + }; + + class ConsoleEngineImpl { + public: + ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& settings = ConsoleSettings()); + ~ConsoleEngineImpl(); + + void handleRcvMessage(BrokerProxy& broker, Message& message); + bool getXmtMessage(Message& item) const; + void popXmt(); + + bool getEvent(ConsoleEvent& event) const; + void popEvent(); + + void addConnection(BrokerProxy& broker, void* context); + void delConnection(BrokerProxy& broker); + + uint32_t packageCount() const; + const string& getPackageName(uint32_t idx) const; + + uint32_t classCount(const char* packageName) const; + const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; + + ClassKind getClassKind(const SchemaClassKey& key) const; + const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const; + const SchemaEventClass* getEventClass(const SchemaClassKey& key) const; + + void bindPackage(const char* packageName); + void bindClass(const SchemaClassKey& key); + void bindClass(const char* packageName, const char* className); + + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + + void sendQuery(const Query& query, void* context); + + /* + void startSync(const Query& query, void* context, SyncQuery& sync); + void touchSync(SyncQuery& sync); + void endSync(SyncQuery& sync); + */ + + private: + ConsoleEngine* envelope; + const ConsoleSettings& settings; + mutable Mutex lock; + deque<ConsoleEventImpl::Ptr> eventQueue; + }; +} + +const char* BrokerProxyImpl::QMF_EXCHANGE = "qpid.management"; +const char* BrokerProxyImpl::DIR_EXCHANGE = "amq.direct"; +const char* BrokerProxyImpl::BROKER_KEY = "broker"; + + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} + +ConsoleEvent ConsoleEventImpl::copy() +{ + ConsoleEvent item; + + ::memset(&item, 0, sizeof(ConsoleEvent)); + item.kind = kind; + item.agent = agent.get() ? agent->envelope : 0; + item.classKey = classKey.get(); + item.object = object; + item.context = context; + item.event = event; + item.timestamp = timestamp; + item.methodHandle = methodHandle; + item.methodResponse = methodResponse.get() ? methodResponse->envelope : 0; + + STRING_REF(name); + + return item; +} + +BrokerEvent BrokerEventImpl::copy() +{ + BrokerEvent item; + + ::memset(&item, 0, sizeof(BrokerEvent)); + item.kind = kind; + + STRING_REF(name); + STRING_REF(exchange); + STRING_REF(bindingKey); + + return item; +} + +BrokerProxyImpl::BrokerProxyImpl(BrokerProxy* e, ConsoleEngine& _console) : + envelope(e), console(_console.impl), queueName("qmfc-") +{ + // TODO: Give the queue name a unique suffix +} + +void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) +{ + Mutex::ScopedLock _lock(lock); + eventQueue.clear(); + xmtQueue.clear(); + eventQueue.push_back(eventDeclareQueue(queueName)); + eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); + eventQueue.push_back(eventSetupComplete()); + + // TODO: Store session handle +} + +void BrokerProxyImpl::sessionClosed() +{ + Mutex::ScopedLock _lock(lock); + eventQueue.clear(); + xmtQueue.clear(); +} + +void BrokerProxyImpl::startProtocol() +{ + cout << "BrokerProxyImpl::startProtocol" << endl; +} + +void BrokerProxyImpl::handleRcvMessage(Message& /*message*/) +{ + // TODO: Dispatch the messages types +} + +bool BrokerProxyImpl::getXmtMessage(Message& item) const +{ + Mutex::ScopedLock _lock(lock); + if (xmtQueue.empty()) + return false; + item = xmtQueue.front()->copy(); + return true; +} + +void BrokerProxyImpl::popXmt() +{ + Mutex::ScopedLock _lock(lock); + if (!xmtQueue.empty()) + xmtQueue.pop_front(); +} + +bool BrokerProxyImpl::getEvent(BrokerEvent& event) const +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void BrokerProxyImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE)); + event->name = queueName; + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key) +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND)); + event->name = queue; + event->exchange = exchange; + event->bindingKey = key; + + return event; +} + +BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete() +{ + BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE)); + return event; +} + +MethodResponseImpl::MethodResponseImpl(Buffer& buf) : envelope(new MethodResponse(this)) +{ + string text; + + status = buf.getLong(); + buf.getMediumString(text); + exception.reset(new Value(TYPE_LSTR)); + exception->setString(text.c_str()); + + // TODO: Parse schema-specific output arguments. + arguments.reset(new Value(TYPE_MAP)); +} + +ConsoleEngineImpl::ConsoleEngineImpl(ConsoleEngine* e, const ConsoleSettings& s) : + envelope(e), settings(s) +{ +} + +ConsoleEngineImpl::~ConsoleEngineImpl() +{ + // This function intentionally left blank. +} + +bool ConsoleEngineImpl::getEvent(ConsoleEvent& event) const +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void ConsoleEngineImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +void ConsoleEngineImpl::addConnection(BrokerProxy& /*broker*/, void* /*context*/) +{ +} + +void ConsoleEngineImpl::delConnection(BrokerProxy& /*broker*/) +{ +} + +uint32_t ConsoleEngineImpl::packageCount() const +{ + return 0; +} + +const string& ConsoleEngineImpl::getPackageName(uint32_t /*idx*/) const +{ + static string temp; + return temp; +} + +uint32_t ConsoleEngineImpl::classCount(const char* /*packageName*/) const +{ + return 0; +} + +const SchemaClassKey* ConsoleEngineImpl::getClass(const char* /*packageName*/, uint32_t /*idx*/) const +{ + return 0; +} + +ClassKind ConsoleEngineImpl::getClassKind(const SchemaClassKey& /*key*/) const +{ + return CLASS_OBJECT; +} + +const SchemaObjectClass* ConsoleEngineImpl::getObjectClass(const SchemaClassKey& /*key*/) const +{ + return 0; +} + +const SchemaEventClass* ConsoleEngineImpl::getEventClass(const SchemaClassKey& /*key*/) const +{ + return 0; +} + +void ConsoleEngineImpl::bindPackage(const char* /*packageName*/) +{ +} + +void ConsoleEngineImpl::bindClass(const SchemaClassKey& /*key*/) +{ +} + +void ConsoleEngineImpl::bindClass(const char* /*packageName*/, const char* /*className*/) +{ +} + +uint32_t ConsoleEngineImpl::agentCount() const +{ + return 0; +} + +const AgentProxy* ConsoleEngineImpl::getAgent(uint32_t /*idx*/) const +{ + return 0; +} + +void ConsoleEngineImpl::sendQuery(const Query& /*query*/, void* /*context*/) +{ +} + +/* +void ConsoleEngineImpl::startSync(const Query& query, void* context, SyncQuery& sync) +{ +} + +void ConsoleEngineImpl::touchSync(SyncQuery& sync) +{ +} + +void ConsoleEngineImpl::endSync(SyncQuery& sync) +{ +} +*/ + + + +//================================================================== +// Wrappers +//================================================================== + +BrokerProxy::BrokerProxy(ConsoleEngine& console) : impl(new BrokerProxyImpl(this, console)) {} +BrokerProxy::~BrokerProxy() { delete impl; } +void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } +void BrokerProxy::sessionClosed() { impl->sessionClosed(); } +void BrokerProxy::startProtocol() { impl->startProtocol(); } +void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } +bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } +void BrokerProxy::popXmt() { impl->popXmt(); } +bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } +void BrokerProxy::popEvent() { impl->popEvent(); } + +AgentProxy::AgentProxy(ConsoleEngine& console) : impl(new AgentProxyImpl(this, console)) {} +AgentProxy::~AgentProxy() { delete impl; } + +MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {} +MethodResponse::~MethodResponse() { delete impl; } // TODO: correct to delete here? +uint32_t MethodResponse::getStatus() const { return impl->getStatus(); } +const Value* MethodResponse::getException() const { return impl->getException(); } +const Value* MethodResponse::getArgs() const { return impl->getArgs(); } + +ConsoleEngine::ConsoleEngine(const ConsoleSettings& settings) : impl(new ConsoleEngineImpl(this, settings)) {} +ConsoleEngine::~ConsoleEngine() { delete impl; } +bool ConsoleEngine::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); } +void ConsoleEngine::popEvent() { impl->popEvent(); } +void ConsoleEngine::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); } +void ConsoleEngine::delConnection(BrokerProxy& broker) { impl->delConnection(broker); } +uint32_t ConsoleEngine::packageCount() const { return impl->packageCount(); } +const char* ConsoleEngine::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); } +uint32_t ConsoleEngine::classCount(const char* packageName) const { return impl->classCount(packageName); } +const SchemaClassKey* ConsoleEngine::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); } +ClassKind ConsoleEngine::getClassKind(const SchemaClassKey& key) const { return impl->getClassKind(key); } +const SchemaObjectClass* ConsoleEngine::getObjectClass(const SchemaClassKey& key) const { return impl->getObjectClass(key); } +const SchemaEventClass* ConsoleEngine::getEventClass(const SchemaClassKey& key) const { return impl->getEventClass(key); } +void ConsoleEngine::bindPackage(const char* packageName) { impl->bindPackage(packageName); } +void ConsoleEngine::bindClass(const SchemaClassKey& key) { impl->bindClass(key); } +void ConsoleEngine::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); } +uint32_t ConsoleEngine::agentCount() const { return impl->agentCount(); } +const AgentProxy* ConsoleEngine::getAgent(uint32_t idx) const { return impl->getAgent(idx); } +void ConsoleEngine::sendQuery(const Query& query, void* context) { impl->sendQuery(query, context); } +//void ConsoleEngine::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); } +//void ConsoleEngine::touchSync(SyncQuery& sync) { impl->touchSync(sync); } +//void ConsoleEngine::endSync(SyncQuery& sync) { impl->endSync(sync); } + + diff --git a/cpp/src/qmf/ConsoleEngine.h b/cpp/src/qmf/ConsoleEngine.h index 823e281b14..4ee52f6114 100644 --- a/cpp/src/qmf/ConsoleEngine.h +++ b/cpp/src/qmf/ConsoleEngine.h @@ -20,62 +20,178 @@ * under the License. */ -#include <qmf/ManagedConnection.h> -#include <qmf/Broker.h> -#include <qmf/Package.h> -#include <qmf/SchemaClassTable.h> +#include <qmf/ResilientConnection.h> +#include <qmf/Schema.h> +#include <qmf/ObjectId.h> #include <qmf/Object.h> -#include <qmf/ConsoleHandler.h> -#include <set> -#include <vector> -#include <string> +#include <qmf/Event.h> +#include <qmf/Query.h> +#include <qmf/Value.h> +#include <qmf/Message.h> namespace qmf { + class ConsoleEngine; + class ConsoleEngineImpl; + class BrokerProxyImpl; + class AgentProxy; + class AgentProxyImpl; + class MethodResponseImpl; + + /** + * + */ + class MethodResponse { + public: + MethodResponse(MethodResponseImpl* impl); + ~MethodResponse(); + uint32_t getStatus() const; + const Value* getException() const; + const Value* getArgs() const; + + private: + friend class ConsoleEngineImpl; + MethodResponseImpl* impl; + }; + + /** + * + */ + struct ConsoleEvent { + enum EventKind { + AGENT_ADDED = 1, + AGENT_DELETED = 2, + NEW_PACKAGE = 3, + NEW_CLASS = 4, + OBJECT_UPDATE = 5, + QUERY_COMPLETE = 6, + EVENT_RECEIVED = 7, + AGENT_HEARTBEAT = 8, + METHOD_RESPONSE = 9 + }; + + EventKind kind; + AgentProxy* agent; // (AGENT_[ADDED|DELETED|HEARTBEAT]) + char* name; // (NEW_PACKAGE) + SchemaClassKey* classKey; // (NEW_CLASS) + Object* object; // (OBJECT_UPDATE) + void* context; // (OBJECT_UPDATE, QUERY_COMPLETE) + Event* event; // (EVENT_RECEIVED) + uint64_t timestamp; // (AGENT_HEARTBEAT) + uint32_t methodHandle; // (METHOD_RESPONSE) + MethodResponse* methodResponse; // (METHOD_RESPONSE) + }; + + /** + * + */ + struct BrokerEvent { + enum EventKind { + BROKER_INFO = 10, + DECLARE_QUEUE = 11, + DELETE_QUEUE = 12, + BIND = 13, + UNBIND = 14, + SETUP_COMPLETE = 15 + }; + + EventKind kind; + char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND) + char* exchange; // ([UN]BIND) + char* bindingKey; // ([UN]BIND) + }; + + /** + * + */ + class BrokerProxy { + public: + BrokerProxy(ConsoleEngine& console); + ~BrokerProxy(); + + void sessionOpened(SessionHandle& sh); + void sessionClosed(); + void startProtocol(); + + void handleRcvMessage(Message& message); + bool getXmtMessage(Message& item) const; + void popXmt(); + + bool getEvent(BrokerEvent& event) const; + void popEvent(); + + private: + friend class ConsoleEngineImpl; + BrokerProxyImpl* impl; + }; + + /** + * + */ + class AgentProxy { + public: + AgentProxy(ConsoleEngine& console); + ~AgentProxy(); + + private: + friend class ConsoleEngineImpl; + AgentProxyImpl* impl; + }; + + // TODO - move this to a public header struct ConsoleSettings { bool rcvObjects; bool rcvEvents; bool rcvHeartbeats; bool userBindings; - uint32_t methodTimeout; - uint32_t getTimeout; ConsoleSettings() : rcvObjects(true), rcvEvents(true), rcvHeartbeats(true), - userBindings(false), - methodTimeout(20), - getTimeout(20) {} + userBindings(false) {} }; class ConsoleEngine { public: - ConsoleEngine(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings()); + ConsoleEngine(const ConsoleSettings& settings = ConsoleSettings()); ~ConsoleEngine(); - Broker* addConnection(ManagedConnection& connection); - void delConnection(Broker* broker); - void delConnection(ManagedConnection& connection); + bool getEvent(ConsoleEvent& event) const; + void popEvent(); + + void addConnection(BrokerProxy& broker, void* context); + void delConnection(BrokerProxy& broker); + + uint32_t packageCount() const; + const char* getPackageName(uint32_t idx) const; + + uint32_t classCount(const char* packageName) const; + const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const; - const PackageMap& getPackages() const; + ClassKind getClassKind(const SchemaClassKey& key) const; + const SchemaObjectClass* getObjectClass(const SchemaClassKey& key) const; + const SchemaEventClass* getEventClass(const SchemaClassKey& key) const; - void bindPackage(const Package& package); - void bindPackage(const std::string& packageName); - void bindClass(const SchemaClass& otype); - void bindClass(const std::string& packageName, const std::string& className); + void bindPackage(const char* packageName); + void bindClass(const SchemaClassKey& key); + void bindClass(const char* packageName, const char* className); + + uint32_t agentCount() const; + const AgentProxy* getAgent(uint32_t idx) const; + + void sendQuery(const Query& query, void* context); /* - void getAgents(std::set<Agent>& agents, Broker* = 0); - void getObjects(std::vector<Object>& objects, const std::string& typeName, - const std::string& packageName = "", - Broker* broker = 0, - Agent* agent = 0); - void getObjects(std::vector<Object>& objects, - const std::map<std::string, std::string>& query, - Broker* broker = 0, - Agent* agent = 0); + void startSync(const Query& query, void* context, SyncQuery& sync); + void touchSync(SyncQuery& sync); + void endSync(SyncQuery& sync); */ + + private: + friend class BrokerProxyImpl; + friend class AgentProxyImpl; + ConsoleEngineImpl* impl; }; } diff --git a/cpp/src/qmf/ObjectImpl.cpp b/cpp/src/qmf/ObjectImpl.cpp index d3882935e4..645ccd5c81 100644 --- a/cpp/src/qmf/ObjectImpl.cpp +++ b/cpp/src/qmf/ObjectImpl.cpp @@ -123,9 +123,9 @@ void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList) void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const { - buffer.putShortString(objectClass->getPackage()); - buffer.putShortString(objectClass->getName()); - buffer.putBin128(const_cast<uint8_t*>(objectClass->getHash())); + buffer.putShortString(objectClass->getClassKey()->getPackageName()); + buffer.putShortString(objectClass->getClassKey()->getClassName()); + buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash())); } void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const diff --git a/cpp/src/qmf/ResilientConnection.cpp b/cpp/src/qmf/ResilientConnection.cpp index 3531e104b0..88b9169c75 100644 --- a/cpp/src/qmf/ResilientConnection.cpp +++ b/cpp/src/qmf/ResilientConnection.cpp @@ -79,7 +79,7 @@ namespace qmf { class ResilientConnectionImpl : public qpid::sys::Runnable { public: - ResilientConnectionImpl(ConnectionSettings& settings); + ResilientConnectionImpl(const ConnectionSettings& settings); ~ResilientConnectionImpl(); bool isConnected() const; @@ -108,7 +108,7 @@ namespace qmf { bool connected; bool shutdown; string lastError; - ConnectionSettings settings; + const ConnectionSettings settings; Connection connection; mutable qpid::sys::Mutex lock; int delayMin; @@ -175,7 +175,7 @@ void RCSession::received(qpid::client::Message& msg) connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); } -ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings) : +ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) : notifyFd(-1), connected(false), shutdown(false), settings(_settings), connThread(*this) { connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this)); @@ -222,7 +222,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext)); - handle.handle = (void*) sess.get(); + handle.impl = (void*) sess.get(); sessions.insert(sess); return true; @@ -231,7 +231,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte void ResilientConnectionImpl::destroySession(SessionHandle handle) { Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); set<RCSession::Ptr>::iterator iter = sessions.find(sess); if (iter != sessions.end()) { for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++) @@ -247,7 +247,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle) void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message) { Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); set<RCSession::Ptr>::iterator iter = sessions.find(sess); qpid::client::Message msg; string data(message.body, message.length); @@ -267,7 +267,7 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true); sess->subscriptions->subscribe(*sess, queue, queue); @@ -277,7 +277,7 @@ void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; sess->session.queueDelete(arg::queue=queue); for (vector<string>::iterator iter = sess->dests.begin(); @@ -293,7 +293,7 @@ void ResilientConnectionImpl::bind(SessionHandle handle, char* exchange, char* queue, char* key) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); } @@ -302,7 +302,7 @@ void ResilientConnectionImpl::unbind(SessionHandle handle, char* exchange, char* queue, char* key) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); } @@ -396,7 +396,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k // Wrappers //================================================================== -ResilientConnection::ResilientConnection(ConnectionSettings& settings) +ResilientConnection::ResilientConnection(const ConnectionSettings& settings) { impl = new ResilientConnectionImpl(settings); } diff --git a/cpp/src/qmf/ResilientConnection.h b/cpp/src/qmf/ResilientConnection.h index 6e05541253..03f1b9c0d5 100644 --- a/cpp/src/qmf/ResilientConnection.h +++ b/cpp/src/qmf/ResilientConnection.h @@ -26,6 +26,8 @@ namespace qmf { + class ResilientConnectionImpl; + /** * Represents events that occur, unsolicited, from ResilientConnection. */ @@ -43,12 +45,11 @@ namespace qmf { Message message; // RECV }; - struct SessionHandle { - void* handle; + class SessionHandle { + friend class ResilientConnectionImpl; + void* impl; }; - class ResilientConnectionImpl; - /** * ResilientConnection represents a Qpid connection that is resilient. * @@ -67,7 +68,7 @@ namespace qmf { *@param delayMax Maximum delay (in seconds) between retries. *@param delayFactor Factor to multiply retry delay by after each failure. */ - ResilientConnection(ConnectionSettings& settings); + ResilientConnection(const ConnectionSettings& settings); ~ResilientConnection(); /** diff --git a/cpp/src/qmf/Schema.h b/cpp/src/qmf/Schema.h index e3ab90e3e3..1123acc3b8 100644 --- a/cpp/src/qmf/Schema.h +++ b/cpp/src/qmf/Schema.h @@ -35,6 +35,7 @@ namespace qmf { struct SchemaStatisticImpl; struct SchemaObjectClassImpl; struct SchemaEventClassImpl; + struct SchemaClassKeyImpl; /** */ @@ -114,6 +115,20 @@ namespace qmf { /** */ + class SchemaClassKey { + public: + SchemaClassKey(SchemaClassKeyImpl* impl); + ~SchemaClassKey(); + + const char* getPackageName() const; + const char* getClassName() const; + const uint8_t* getHash() const; + + SchemaClassKeyImpl* impl; + }; + + /** + */ class SchemaObjectClass { public: SchemaObjectClass(const char* package, const char* name); @@ -123,9 +138,7 @@ namespace qmf { void addStatistic(const SchemaStatistic& statistic); void addMethod(const SchemaMethod& method); - const char* getPackage() const; - const char* getName() const; - const uint8_t* getHash() const; + const SchemaClassKey* getClassKey() const; int getPropertyCount() const; int getStatisticCount() const; int getMethodCount() const; @@ -146,9 +159,7 @@ namespace qmf { void addArgument(const SchemaArgument& argument); void setDesc(const char* desc); - const char* getPackage() const; - const char* getName() const; - const uint8_t* getHash() const; + const SchemaClassKey* getClassKey() const; int getArgumentCount() const; const SchemaArgument* getArgument(int idx) const; diff --git a/cpp/src/qmf/SchemaImpl.cpp b/cpp/src/qmf/SchemaImpl.cpp index 665c94f2a1..be30cdb642 100644 --- a/cpp/src/qmf/SchemaImpl.cpp +++ b/cpp/src/qmf/SchemaImpl.cpp @@ -240,15 +240,20 @@ void SchemaStatisticImpl::updateHash(SchemaHash& hash) const hash.update(description); } -SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : envelope(new SchemaObjectClass(this)), hasHash(true) +SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : + envelope(new SchemaClassKey(this)), package(p), name(n), hash(h) {} + +SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : + envelope(new SchemaObjectClass(this)), hasHash(true), classKey(package, name, hash) { buffer.getShortString(package); buffer.getShortString(name); hash.decode(buffer); - uint16_t propCount = buffer.getShort(); - uint16_t statCount = buffer.getShort(); - uint16_t methodCount = buffer.getShort(); + /*uint8_t hasParentClass =*/ buffer.getOctet(); // TODO: Parse parent-class indicator + uint16_t propCount = buffer.getShort(); + uint16_t statCount = buffer.getShort(); + uint16_t methodCount = buffer.getShort(); for (uint16_t idx = 0; idx < propCount; idx++) { SchemaPropertyImpl* property = new SchemaPropertyImpl(buffer); @@ -288,7 +293,7 @@ void SchemaObjectClassImpl::encode(Buffer& buffer) const (*iter)->encode(buffer); } -const uint8_t* SchemaObjectClassImpl::getHash() const +const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const { if (!hasHash) { hasHash = true; @@ -305,7 +310,7 @@ const uint8_t* SchemaObjectClassImpl::getHash() const (*iter)->updateHash(hash); } - return hash.get(); + return classKey.envelope; } void SchemaObjectClassImpl::addProperty(const SchemaProperty& property) @@ -353,7 +358,8 @@ const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const return 0; } -SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : envelope(new SchemaEventClass(this)), hasHash(true) +SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : + envelope(new SchemaEventClass(this)), hasHash(true), classKey(package, name, hash) { buffer.getShortString(package); buffer.getShortString(name); @@ -380,7 +386,7 @@ void SchemaEventClassImpl::encode(Buffer& buffer) const (*iter)->encode(buffer); } -const uint8_t* SchemaEventClassImpl::getHash() const +const SchemaClassKey* SchemaEventClassImpl::getClassKey() const { if (!hasHash) { hasHash = true; @@ -390,7 +396,7 @@ const uint8_t* SchemaEventClassImpl::getHash() const iter != arguments.end(); iter++) (*iter)->updateHash(hash); } - return hash.get(); + return classKey.envelope; } void SchemaEventClassImpl::addArgument(const SchemaArgument& argument) @@ -408,6 +414,7 @@ const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const return 0; } + //================================================================== // Wrappers //================================================================== @@ -620,6 +627,28 @@ const char* SchemaStatistic::getDesc() const return impl->getDesc().c_str(); } +SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {} + +SchemaClassKey::~SchemaClassKey() +{ + delete impl; +} + +const char* SchemaClassKey::getPackageName() const +{ + return impl->getPackageName().c_str(); +} + +const char* SchemaClassKey::getClassName() const +{ + return impl->getClassName().c_str(); +} + +const uint8_t* SchemaClassKey::getHash() const +{ + return impl->getHash(); +} + SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) { impl = new SchemaObjectClassImpl(this, package, name); @@ -647,19 +676,9 @@ void SchemaObjectClass::addMethod(const SchemaMethod& method) impl->addMethod(method); } -const char* SchemaObjectClass::getPackage() const +const SchemaClassKey* SchemaObjectClass::getClassKey() const { - return impl->getPackage().c_str(); -} - -const char* SchemaObjectClass::getName() const -{ - return impl->getName().c_str(); -} - -const uint8_t* SchemaObjectClass::getHash() const -{ - return impl->getHash(); + return impl->getClassKey(); } int SchemaObjectClass::getPropertyCount() const @@ -714,19 +733,9 @@ void SchemaEventClass::setDesc(const char* desc) impl->setDesc(desc); } -const char* SchemaEventClass::getPackage() const +const SchemaClassKey* SchemaEventClass::getClassKey() const { - return impl->getPackage().c_str(); -} - -const char* SchemaEventClass::getName() const -{ - return impl->getName().c_str(); -} - -const uint8_t* SchemaEventClass::getHash() const -{ - return impl->getHash(); + return impl->getClassKey(); } int SchemaEventClass::getArgumentCount() const diff --git a/cpp/src/qmf/SchemaImpl.h b/cpp/src/qmf/SchemaImpl.h index 2c30a8851f..3a18703ae6 100644 --- a/cpp/src/qmf/SchemaImpl.h +++ b/cpp/src/qmf/SchemaImpl.h @@ -138,27 +138,39 @@ namespace qmf { void updateHash(SchemaHash& hash) const; }; + struct SchemaClassKeyImpl { + const SchemaClassKey* envelope; + const std::string& package; + const std::string& name; + const SchemaHash& hash; + + SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash); + + const std::string& getPackageName() const { return package; } + const std::string& getClassName() const { return name; } + const uint8_t* getHash() const { return hash.get(); } + }; + struct SchemaObjectClassImpl { SchemaObjectClass* envelope; std::string package; std::string name; mutable SchemaHash hash; mutable bool hasHash; + SchemaClassKeyImpl classKey; std::vector<SchemaPropertyImpl*> properties; std::vector<SchemaStatisticImpl*> statistics; std::vector<SchemaMethodImpl*> methods; SchemaObjectClassImpl(SchemaObjectClass* e, const char* p, const char* n) : - envelope(e), package(p), name(n), hasHash(false) {} + envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {} SchemaObjectClassImpl(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void addProperty(const SchemaProperty& property); void addStatistic(const SchemaStatistic& statistic); void addMethod(const SchemaMethod& method); - const std::string& getPackage() const { return package; } - const std::string& getName() const { return name; } - const uint8_t* getHash() const; + const SchemaClassKey* getClassKey() const; int getPropertyCount() const { return properties.size(); } int getStatisticCount() const { return statistics.size(); } int getMethodCount() const { return methods.size(); } @@ -173,19 +185,18 @@ namespace qmf { std::string name; mutable SchemaHash hash; mutable bool hasHash; + SchemaClassKeyImpl classKey; std::string description; std::vector<SchemaArgumentImpl*> arguments; SchemaEventClassImpl(SchemaEventClass* e, const char* p, const char* n) : - envelope(e), package(p), name(n), hasHash(false) {} + envelope(e), package(p), name(n), hasHash(false), classKey(package, name, hash) {} SchemaEventClassImpl(qpid::framing::Buffer& buffer); void encode(qpid::framing::Buffer& buffer) const; void addArgument(const SchemaArgument& argument); void setDesc(const char* desc) { description = desc; } - const std::string& getPackage() const { return package; } - const std::string& getName() const { return name; } - const uint8_t* getHash() const; + const SchemaClassKey* getClassKey() const; int getArgumentCount() const { return arguments.size(); } const SchemaArgument* getArgument(int idx) const; }; |