diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-05 13:25:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-05 13:25:36 +0000 |
commit | b2efcb6ed3e1e2104836928cda81ed69f2f24559 (patch) | |
tree | 392ae403dcb0d32da3edaeaf8a1f497679d9102c /cpp | |
parent | b2fadec5d86e278d96112e915e67aec934e91046 (diff) | |
download | qpid-python-b2efcb6ed3e1e2104836928cda81ed69f2f24559.tar.gz |
Added first cut of generated client interface.
Old channel interface still supported; shares SessionCore with the new interface.
Todo: allow applications to signal completion of received commands; keywrod args for interface.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
23 files changed, 732 insertions, 311 deletions
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb index 5cad9d6336..0da1bfe824 100755 --- a/cpp/rubygen/amqpgen.rb +++ b/cpp/rubygen/amqpgen.rb @@ -76,6 +76,10 @@ end class AmqpMethod < AmqpElement def initialize(xml, amqp) super; end + def content() + attributes["content"] + end + def index() attributes["index"]; end def fields() @@ -84,7 +88,7 @@ class AmqpMethod < AmqpElement # Responses to this method (0-9) def responses() - @cache_responses ||= elements.collect("response") { |el| new AmqpMethod(el,self) } + @cache_responses ||= elements.collect("response") { |el| AmqpMethod.new(el,self) } end # Methods this method responds to (0-9) @@ -178,7 +182,7 @@ class Generator if (@outdir != "-") path=Pathname.new "#{@outdir}/#{file}" path.parent.mkpath - path.open('w') { |@out| yield } + path.open('w') { |@out| yield } end end diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb index f58ce3a539..d369d23da1 100755 --- a/cpp/rubygen/cppgen.rb +++ b/cpp/rubygen/cppgen.rb @@ -128,7 +128,7 @@ class CppGen < Generator # Write a .cpp file. def cpp_file(path) - file (path) do + file(path) do gen Copyright yield end diff --git a/cpp/rubygen/samples/Operations.rb b/cpp/rubygen/samples/Operations.rb index 4d84e33b9f..1c245ca188 100755 --- a/cpp/rubygen/samples/Operations.rb +++ b/cpp/rubygen/samples/Operations.rb @@ -80,6 +80,6 @@ EOS end end -OperationsGen.new("client",ARGV[0], amqp).generate() -OperationsGen.new("server",ARGV[0], amqp).generate() +OperationsGen.new("client",ARGV[0], Amqp).generate() +OperationsGen.new("server",ARGV[0], Amqp).generate() diff --git a/cpp/rubygen/samples/Proxy.rb b/cpp/rubygen/samples/Proxy.rb index c63a2a9799..f7765f3729 100755 --- a/cpp/rubygen/samples/Proxy.rb +++ b/cpp/rubygen/samples/Proxy.rb @@ -148,6 +148,6 @@ EOS end -ProxyGen.new("client", ARGV[0], amqp).generate; -ProxyGen.new("server", ARGV[0], amqp).generate; +ProxyGen.new("client", ARGV[0], Amqp).generate; +ProxyGen.new("server", ARGV[0], Amqp).generate; diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb new file mode 100644 index 0000000000..5289a6af30 --- /dev/null +++ b/cpp/rubygen/templates/Session.rb @@ -0,0 +1,136 @@ +#!/usr/bin/env ruby +# Usage: output_directory xml_spec_file [xml_spec_file...] +# +$: << '..' +require 'cppgen' + +class SessionGen < CppGen + + def initialize(outdir, amqp) + super(outdir, amqp) + @chassis="server" + @classname="Session" + end + + def declare_method (m) + gen "Response #{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" + if (m.content()) + params=m.signature + ["const MethodContent& content"] + else + params=m.signature + end + indent { gen params.join(",\n") } + gen ");\n\n" + end + + def declare_class(c) + c.methods_on(@chassis).each { |m| declare_method(m) } + end + + def define_method (m) + gen "Response Session::#{m.amqp_parent.name.lcaps}#{m.cppname.caps}(" + if (m.content()) + params=m.signature + ["const MethodContent& content"] + else + params=m.signature + end + indent { gen params.join(",\n") } + gen "){\n\n" + indent (2) { + gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}(" + params = ["version"] + m.param_names + gen params.join(", ") + other_params=[] + if (m.content()) + other_params << "content" + end + if m.responses().empty? + other_params << "false" + else + other_params << "true" + end + gen ")), #{other_params.join(", ")});\n" + } + gen "}\n\n" + end + + def define_class(c) + c.methods_on(@chassis).each { |m| define_method(m) } + end + + def generate() + excludes = ["channel", "connection", "session", "execution"] + + h_file("qpid/client/#{@classname}.h") { + gen <<EOS +#include <sstream> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/MethodContent.h" +#include "ConnectionImpl.h" +#include "Response.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +using std::string; +using framing::Content; +using framing::FieldTable; +using framing::MethodContent; +using framing::SequenceNumberSet; + +class #{@classname} { + ConnectionImpl::shared_ptr parent; + SessionCore::shared_ptr impl; + framing::ProtocolVersion version; +public: + #{@classname}(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); + ~#{@classname}(); + + ReceivedContent::shared_ptr get() { return impl->get(); } + void close() { impl->close(); parent->released(impl); } + +EOS + indent { @amqp.classes.each { |c| declare_class(c) if !excludes.include?(c.name) } } + gen <<EOS +}; /* class #{@classname} */ +} +} +EOS +} + + # .cpp file + cpp_file("qpid/client/#{@classname}.cpp") { + gen <<EOS +#include "#{@classname}.h" +#include "qpid/framing/AMQMethodBody.h" + +using std::string; +using namespace qpid::framing; + +namespace qpid { +namespace client { + +#{@classname}::#{@classname}(ConnectionImpl::shared_ptr _parent, SessionCore::shared_ptr _impl) : parent(_parent), impl(_impl) {} + +#{@classname}::~#{@classname}() +{ + impl->stop(); + parent->released(impl); +} + +EOS + + @amqp.classes.each { |c| define_class(c) if !excludes.include?(c.name) } + + gen <<EOS +}} // namespace qpid::client +EOS + } + + end +end + +SessionGen.new(ARGV[0], Amqp).generate() + diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index abb3587952..ad720754c1 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -46,10 +46,10 @@ rgen_tdir=$(rgen_dir)/templates rgen_script=$(rgen_dir)/generate rgen_cmd=ruby -I $(rgen_dir) $(rgen_script) -rgen_templates=$(rgen_tdir)/frame_body_lists.rb +rgen_templates=$(rgen_tdir)/frame_body_lists.rb $(rgen_tdir)/Session.rb rubygen.mk: $(rgen_script) $(specs) $(rgen_templates) - gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; echo "rgen_srcs=$$gen" > $@ + gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; echo rgen_srcs=$$gen > $@ $(rgen_srcs): rubygen.mk @@ -228,6 +228,7 @@ libqpidclient_la_SOURCES = \ qpid/client/ClientChannel.cpp \ qpid/client/ClientExchange.cpp \ qpid/client/ClientQueue.cpp \ + qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ qpid/client/MessageListener.cpp \ qpid/client/ResponseHandler.cpp \ @@ -241,6 +242,8 @@ libqpidclient_la_SOURCES = \ qpid/client/FutureResponse.cpp \ qpid/client/FutureFactory.cpp \ qpid/client/ReceivedContent.cpp \ + qpid/client/Session.cpp \ + qpid/client/SessionCore.cpp \ qpid/client/StateManager.cpp @@ -319,6 +322,7 @@ nobase_include_HEADERS = \ qpid/client/ClientMessage.h \ qpid/client/ClientQueue.h \ qpid/client/Connection.h \ + qpid/client/ConnectionImpl.h \ qpid/client/Connector.h \ qpid/client/MessageChannel.h \ qpid/client/MessageListener.h \ @@ -336,6 +340,9 @@ nobase_include_HEADERS = \ qpid/client/FutureResponse.h \ qpid/client/FutureFactory.h \ qpid/client/ReceivedContent.h \ + qpid/client/Response.h \ + qpid/client/Session.h \ + qpid/client/SessionCore.h \ qpid/client/StateManager.h \ qpid/framing/AMQBody.h \ qpid/framing/AMQContentBody.h \ @@ -356,6 +363,7 @@ nobase_include_HEADERS = \ qpid/framing/HeaderProperties.h \ qpid/framing/InitiationHandler.h \ qpid/framing/InputHandler.h \ + qpid/framing/MethodContent.h \ qpid/framing/MethodContext.h \ qpid/framing/OutputHandler.h \ qpid/framing/ProtocolInitiation.h \ diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 8b85017ba0..f407b5a2f9 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -49,28 +49,19 @@ const std::string empty; }} Channel::Channel(bool _transactional, u_int16_t _prefetch) : - connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) + prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { } -Channel::~Channel(){ - closeInternal(); -} +Channel::~Channel(){} -void Channel::open(ChannelId id, Connection& con) +void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) { if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); - connection = &con; - channelId = id; - //link up handlers: - channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1); - channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1); - executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1); - //set up close notification: - channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2); - - channelHandler.open(id); + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + + connection = c; + session = s; } bool Channel::isOpen() const { @@ -79,10 +70,10 @@ bool Channel::isOpen() const { } void Channel::setQos() { - executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - executionHandler.send(make_shared_ptr(new TxSelectBody(version))); + sendSync(false, make_shared_ptr(new TxSelectBody(version))); } } @@ -133,63 +124,52 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri } void Channel::commit(){ - executionHandler.send(make_shared_ptr(new TxCommitBody(version))); + sendSync(false, make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - executionHandler.send(make_shared_ptr(new TxRollbackBody(version))); + sendSync(false, make_shared_ptr(new TxRollbackBody(version))); } void Channel::close() { - channelHandler.close(); + session->close(); { Mutex::ScopedLock l(lock); if (connection); { - connection->erase(channelId); - connection = 0; + connection->released(session); + connection.reset(); } } stop(); } - // Channel closed by peer. void Channel::peerClose(uint16_t code, const std::string& message) { assert(isOpen()); //record reason: errorCode = code; errorText = message; - closeInternal(); stop(); - futures.close(code, message); -} - -void Channel::closeInternal() { - Mutex::ScopedLock l(lock); - if (connection); - { - connection = 0; - } } AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) { - - boost::shared_ptr<FutureResponse> fr(futures.createResponse()); - executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1)); - return fr->getResponse(); + session->setSync(true); + Response r = session->send(toSend, true); + session->setSync(false); + return r.getPtr(); } void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) { if(sync) { - boost::shared_ptr<FutureCompletion> fc(futures.createCompletion()); - executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc)); - fc->waitForCompletion(); + session->setSync(true); + session->send(command, false); + session->setSync(false); } else { - executionHandler.send(command); + session->send(command); } } @@ -199,7 +179,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - executionHandler.send(body); + session->send(body); return AMQMethodBody::shared_ptr(); } } @@ -246,8 +226,8 @@ void Channel::cancel(const std::string& tag, bool synch) { bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); - AMQMethodBody::shared_ptr response = sendAndReceive(request); - if (response && response->isA<BasicGetEmptyBody>()) { + Response response = session->send(request, true); + if (response.isA<BasicGetEmptyBody>()) { return false; } else { ReceivedContent::shared_ptr content = gets.pop(); @@ -263,38 +243,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), - msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this! - /* - // Make a header for the message - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy( - *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); - header->setContentSize(msg.getData().size()); - - executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate))); - executionHandler.sendContent(header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - u_int32_t frag_size = connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data))); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag))); - - offset += length; - remaining = data_length - offset; - } - } - } - */ + session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false); } void Channel::start(){ @@ -303,7 +252,7 @@ void Channel::start(){ } void Channel::stop() { - executionHandler.received.close(); + session->stop(); gets.close(); Mutex::ScopedLock l(stopLock); if(running) { @@ -315,7 +264,7 @@ void Channel::stop() { void Channel::run() { try { while (true) { - ReceivedContent::shared_ptr content = executionHandler.received.pop(); + ReceivedContent::shared_ptr content = session->get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index 4853603281..5feba6262f 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -26,9 +26,8 @@ #include "ClientExchange.h" #include "ClientMessage.h" #include "ClientQueue.h" -#include "ChannelHandler.h" -#include "ExecutionHandler.h" -#include "FutureFactory.h" +#include "ConnectionImpl.h" +#include "SessionCore.h" #include "qpid/Exception.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" @@ -71,7 +70,6 @@ class Channel : private sys::Runnable typedef std::map<std::string, Consumer> ConsumerMap; mutable sys::Mutex lock; - Connection* connection; sys::Thread dispatcher; uint16_t prefetch; @@ -85,11 +83,10 @@ class Channel : private sys::Runnable bool running; ConsumerMap consumers; - ExecutionHandler executionHandler; - ChannelHandler channelHandler; + ConnectionImpl::shared_ptr connection; + SessionCore::shared_ptr session; framing::ChannelId channelId; BlockingQueue<ReceivedContent::shared_ptr> gets; - FutureFactory futures; void stop(); @@ -121,7 +118,7 @@ class Channel : private sys::Runnable sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } - void open(framing::ChannelId, Connection&); + void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); void closeInternal(); void peerClose(uint16_t, const std::string&); @@ -257,9 +254,6 @@ class Channel : private sys::Runnable /** True if the channel is open */ bool isOpen() const; - /** Get the connection associated with this channel */ - Connection& getConnection() { return *connection; } - /** Return the protocol version */ framing::ProtocolVersion getVersion() const { return version ; } diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index c998ec30df..3ae1478152 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -41,31 +41,20 @@ using namespace qpid::sys; namespace qpid { namespace client { -const std::string Connection::OK("OK"); - -Connection::Connection( - bool _debug, uint32_t _max_frame_size, - framing::ProtocolVersion _version - ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, _debug, _max_frame_size), - isOpen(false), debug(_debug) -{ - setConnector(defaultConnector); - - handler.maxFrameSize = _max_frame_size; -} +Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version), + max_frame_size(_max_frame_size), + impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))), + isOpen(false) {} + +Connection::Connection(boost::shared_ptr<Connector> c) : + channelIdCounter(0), version(framing::highestProtocolVersion), + max_frame_size(65536), + impl(new ConnectionImpl(c)), + isOpen(false) {} Connection::~Connection(){} -void Connection::setConnector(Connector& con) -{ - connector = &con; - connector->setInputHandler(&handler); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); -} - void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost) @@ -73,97 +62,28 @@ void Connection::open( if (isOpen) THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - //wire up the handler: - handler.in = boost::bind(&Connection::received, this, _1); - handler.out = boost::bind(&Connector::send, connector, _1); - handler.onClose = boost::bind(&Connection::closeChannels, this); - - handler.uid = uid; - handler.pwd = pwd; - handler.vhost = vhost; - - connector->connect(host, port); - connector->init(); - handler.waitForOpen(); + impl->open(host, port, uid, pwd, vhost); isOpen = true; } -void Connection::shutdown() { - //this indicates that the socket to the server has closed we do - //not want to send a close request (or any other requests) - if(markClosed()) { - QPID_LOG(info, "Connection to peer closed!"); - closeChannels(); - } -} - -void Connection::close( - ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/ -) -{ - if(markClosed()) { - try { - handler.close(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception closing channel: " << e.what()); - } - closeChannels(); - connector->close(); - } -} - -bool Connection::markClosed() -{ - Mutex::ScopedLock locker(shutdownLock); - if (isOpen) { - isOpen = false; - return true; - } else { - return false; - } -} - -void Connection::closeChannels() -{ - using boost::bind; - for_each(channels.begin(), channels.end(), - bind(&Channel::closeInternal, - bind(&ChannelMap::value_type::second, _1))); - channels.clear(); -} - void Connection::openChannel(Channel& channel) { ChannelId id = ++channelIdCounter; - assert (channels.find(id) == channels.end()); - assert(out); - channels[id] = &channel; - channel.open(id, *this); -} - -void Connection::erase(ChannelId id) { - channels.erase(id); + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + impl->allocated(session); + channel.open(impl, session); + session->open(); } -void Connection::received(AMQFrame& frame){ - ChannelId id = frame.getChannel(); - Channel* channel = channels[id]; - if (channel == 0) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - channel->channelHandler.incoming(frame); -} - -void Connection::send(AMQFrame& frame) { - out->send(frame); -} - -void Connection::idleIn(){ - connector->close(); +Session Connection::newSession() { + ChannelId id = ++channelIdCounter; + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + impl->allocated(session); + return Session(impl, session); } -void Connection::idleOut(){ - AMQFrame frame(version, 0, new AMQHeartbeatBody()); - out->send(frame); +void Connection::close() +{ + impl->close(); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 3de3a693b9..fd33fbc830 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -23,6 +23,7 @@ */ #include <string> #include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/MethodContent.h" namespace qpid { namespace client { @@ -35,11 +36,11 @@ namespace client { */ // FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not // basic header properties. -class Message : public framing::BasicHeaderProperties { +class Message : public framing::BasicHeaderProperties, public framing::MethodContent { public: Message(const std::string& data_=std::string()) : data(data_) {} - std::string getData() const { return data; } + const std::string& getData() const { return data; } void setData(const std::string& _data) { data = _data; } std::string getDestination() const { return destination; } @@ -52,6 +53,8 @@ class Message : public framing::BasicHeaderProperties { bool isRedelivered() const { return redelivered; } void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + const HeaderProperties& getMethodHeaders() const { return *this; } + private: std::string data; std::string destination; diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 4d32456c40..6f58986f25 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -25,11 +25,8 @@ #include <string> #include "qpid/QpidError.h" #include "ClientChannel.h" -#include "Connector.h" -#include "ConnectionHandler.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" +#include "ConnectionImpl.h" +#include "Session.h" namespace qpid { @@ -42,22 +39,6 @@ namespace qpid { */ namespace client { -/** - * \internal provide access to selected private channel functions - * for the Connection without making it a friend of the entire channel. - */ -class ConnectionForChannel : - public framing::InputHandler, - public framing::OutputHandler, - public sys::TimeoutHandler, - public sys::ShutdownHandler - -{ - private: - friend class Channel; - virtual void erase(framing::ChannelId) = 0; -}; - /** * \defgroup clientapi Application API for an AMQP client @@ -70,30 +51,17 @@ class ConnectionForChannel : * * \ingroup clientapi */ -class Connection : public ConnectionForChannel +class Connection { - typedef std::map<framing::ChannelId, Channel*> ChannelMap; - framing::ChannelId channelIdCounter; - static const std::string OK; - framing::ProtocolVersion version; const uint32_t max_frame_size; - ChannelMap channels; - ConnectionHandler handler; - Connector defaultConnector; - Connector* connector; - framing::OutputHandler* out; + ConnectionImpl::shared_ptr impl; bool isOpen; - sys::Mutex shutdownLock; bool debug; - - void erase(framing::ChannelId); - void closeChannels(); - bool markClosed(); // TODO aconway 2007-01-26: too many friendships, untagle these classes. - friend class Channel; + friend class Channel; public: /** @@ -111,6 +79,7 @@ class Connection : public ConnectionForChannel */ Connection(bool debug = false, uint32_t max_frame_size = 65536, framing::ProtocolVersion=framing::highestProtocolVersion); + Connection(boost::shared_ptr<Connector>); ~Connection(); /** @@ -136,13 +105,12 @@ class Connection : public ConnectionForChannel const std::string& virtualhost = "/"); /** - * Close the connection with optional error information for the peer. + * Close the connection * * Any further use of this connection (without reopening it) will * not succeed. */ - void close(framing::ReplyCode=200, const std::string& msg=OK, - framing::ClassId = 0, framing::MethodId = 0); + void close(); /** * Associate a Channel with this connection and open it for use. @@ -156,24 +124,7 @@ class Connection : public ConnectionForChannel */ void openChannel(Channel&); - - // TODO aconway 2007-01-26: can these be private? - void send(framing::AMQFrame&); - void received(framing::AMQFrame&); - void idleOut(); - void idleIn(); - void shutdown(); - - /**\internal used for testing */ - void setConnector(Connector& connector); - - /** - * @return the maximum frame size in use on this connection - */ - inline uint32_t getMaxFrameSize(){ return max_frame_size; } - - /** @return protocol version in use on this connection. */ - //framing::ProtocolVersion getVersion() const { return version; } + Session newSession(); }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index ada3fa4fb0..f20f597d1f 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -175,8 +175,9 @@ void ConnectionHandler::handle(AMQMethodBody::shared_ptr method) if (method->isA<ConnectionCloseBody>()) { send(make_shared_ptr(new ConnectionCloseOkBody(version))); setState(CLOSED); - if (onClose) { - onClose(); + if (onError) { + ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method)); + onError(c->getReplyCode(), c->getReplyText()); } } else { error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 50618b50b1..464d0ca26d 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -61,6 +61,7 @@ class ConnectionHandler : private StateManager, public: typedef boost::function<void()> CloseListener; + typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; ConnectionHandler(); @@ -73,6 +74,7 @@ public: void close(); CloseListener onClose; + ErrorListener onError; }; }} diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp new file mode 100644 index 0000000000..887790e4f0 --- /dev/null +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionImpl.h" +#include <boost/bind.hpp> +#include <boost/format.hpp> + +using namespace qpid::client; +using namespace qpid::framing; + +ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c) +{ + handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); + handler.out = boost::bind(&Connector::send, connector, _1); + handler.onClose = boost::bind(&ConnectionImpl::closed, this); + connector->setInputHandler(&handler); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); +} + +void ConnectionImpl::allocated(SessionCore::shared_ptr session) +{ + if (sessions.find(session->getId()) != sessions.end()) { + throw Exception("Id already in use."); + } + sessions[session->getId()] = session; +} + +void ConnectionImpl::released(SessionCore::shared_ptr session) +{ + SessionMap::iterator i = sessions.find(session->getId()); + if (i == sessions.end()) { + throw Exception("Id not in use."); + } + sessions.erase(i); +} + +void ConnectionImpl::handle(framing::AMQFrame& frame) +{ + handler.outgoing(frame); +} + +void ConnectionImpl::incoming(framing::AMQFrame& frame) +{ + uint16_t id = frame.getChannel(); + SessionCore::shared_ptr session = sessions[id]; + if (!session) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); + } + session->handle(frame); +} + +void ConnectionImpl::open(const std::string& host, int port, + const std::string& uid, const std::string& pwd, + const std::string& vhost) +{ + //TODO: better management of connection properties + handler.uid = uid; + handler.pwd = pwd; + handler.vhost = vhost; + + connector->connect(host, port); + connector->init(); + handler.waitForOpen(); +} + +void ConnectionImpl::close() +{ + handler.close(); +} + +void ConnectionImpl::closed() +{ + closed(200, "OK"); +} + +void ConnectionImpl::closed(uint16_t code, const std::string& text) +{ + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { + i->second->closed(code, text); + } + sessions.clear(); + connector->close(); +} + +void ConnectionImpl::idleIn() +{ + connector->close(); +} + +void ConnectionImpl::idleOut() +{ + AMQFrame frame(version, 0, new AMQHeartbeatBody()); + connector->send(frame); +} + +void ConnectionImpl::shutdown() { + //this indicates that the socket to the server has closed + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { + i->second->closed(0, "Unexpected scoket closure."); + } + sessions.clear(); +} diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h new file mode 100644 index 0000000000..8b46d774bf --- /dev/null +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _ConnectionImpl_ +#define _ConnectionImpl_ + +#include <map> +#include <boost/shared_ptr.hpp> +#include "qpid/framing/FrameHandler.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "ConnectionHandler.h" +#include "Connector.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +class ConnectionImpl : public framing::FrameHandler, + public sys::TimeoutHandler, + public sys::ShutdownHandler + +{ + typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap; + SessionMap sessions; + ConnectionHandler handler; + boost::shared_ptr<Connector> connector; + framing::ProtocolVersion version; + + void incoming(framing::AMQFrame& frame); + void closed(); + void closed(uint16_t, const std::string&); + void idleOut(); + void idleIn(); + void shutdown(); +public: + typedef boost::shared_ptr<ConnectionImpl> shared_ptr; + + ConnectionImpl(boost::shared_ptr<Connector> c); + void allocated(SessionCore::shared_ptr); + void released(SessionCore::shared_ptr); + void open(const std::string& host, int port = 5672, + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& virtualhost = "/"); + void close(); + void handle(framing::AMQFrame& frame); +}; + +}} + + +#endif diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index e4270f4e98..abfce4f9d1 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -50,7 +50,8 @@ bool invoke(AMQBody::shared_ptr body, Invocable* target) return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target); } -ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {} +ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : + version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {} //incoming: void ExecutionHandler::handle(AMQFrame& frame) @@ -97,6 +98,12 @@ void ExecutionHandler::flush() //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } +void ExecutionHandler::sendFlush() +{ + AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); + out(frame); +} + void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g) { //allocate id: @@ -111,21 +118,9 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command); out(frame); - - if (f) { - AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); - out(frame); - } -} - -void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content) -{ - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content); - out(frame); } void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, - uint64_t frameSize, CompletionTracker::Listener f, Correlator::Listener g) { send(command, f, g); @@ -139,7 +134,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade u_int64_t data_length = data.length(); if(data_length > 0){ //frame itself uses 8 bytes - u_int32_t frag_size = frameSize - 8; + u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data))); out(frame); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 99b0f4b915..f62598ef95 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -43,6 +43,7 @@ class ExecutionHandler : Correlator correlation; CompletionTracker completion; framing::ProtocolVersion version; + uint64_t maxFrameSize; void complete(uint32_t mark, framing::SequenceNumberSet range); void flush(); @@ -50,7 +51,9 @@ class ExecutionHandler : public: BlockingQueue<ReceivedContent::shared_ptr> received; - ExecutionHandler(); + ExecutionHandler(uint64_t maxFrameSize = 65536); + + void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } void handle(framing::AMQFrame& frame); void send(framing::AMQBody::shared_ptr command, @@ -58,11 +61,9 @@ public: Correlator::Listener g = Correlator::Listener()); void sendContent(framing::AMQBody::shared_ptr command, const framing::BasicHeaderProperties& headers, const std::string& data, - uint64_t frameSize, CompletionTracker::Listener f = CompletionTracker::Listener(), Correlator::Listener g = Correlator::Listener()); - - void sendContent(framing::AMQBody::shared_ptr content); + void sendFlush(); }; }} diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h new file mode 100644 index 0000000000..f44cd72783 --- /dev/null +++ b/cpp/src/qpid/client/Response.h @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _Response_ +#define _Response_ + +#include <boost/shared_ptr.hpp> +#include "qpid/framing/amqp_framing.h" +#include "FutureResponse.h" + +namespace qpid { +namespace client { + +class Response +{ + boost::shared_ptr<FutureResponse> future; + +public: + Response(boost::shared_ptr<FutureResponse> f) : future(f) {} + + template <class T> T& as() + { + framing::AMQMethodBody::shared_ptr response(future->getResponse()); + return boost::shared_polymorphic_cast<T>(*response); + } + template <class T> bool isA() + { + return future->getResponse()->isA<T>(); + } + + void sync() + { + return future->waitForCompletion(); + } + + //TODO: only exposed for old channel class, may want to hide this eventually + framing::AMQMethodBody::shared_ptr getPtr() + { + return future->getResponse(); + } +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp new file mode 100644 index 0000000000..391dcd909d --- /dev/null +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionCore.h" +#include <boost/bind.hpp> + +using namespace qpid::client; +using namespace qpid::framing; + +SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out, + uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false) +{ + l2.out = boost::bind(&FrameHandler::handle, out, _1); + l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1); + l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1); + l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2); +} + +void SessionCore::open() +{ + l2.open(id); +} + +void SessionCore::flush() +{ + l3.sendFlush(); +} + +Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse) +{ + boost::shared_ptr<FutureResponse> f(futures.createResponse()); + if (expectResponse) { + l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); + } else { + l3.send(method, boost::bind(&FutureResponse::completed, f)); + } + if (sync) { + flush(); + f->waitForCompletion(); + } + return Response(f); +} + +Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse) +{ + //TODO: lots of duplication between these two send methods; refactor + boost::shared_ptr<FutureResponse> f(futures.createResponse()); + if (expectResponse) { + l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), + boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); + } else { + l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), + boost::bind(&FutureResponse::completed, f)); + } + if (sync) { + flush(); + f->waitForCompletion(); + } + return Response(f); +} + +ReceivedContent::shared_ptr SessionCore::get() +{ + return l3.received.pop(); +} + +void SessionCore::setSync(bool s) +{ + sync = s; +} + +bool SessionCore::isSync() +{ + return sync; +} + +void SessionCore::close() +{ + l2.close(); + l3.received.close(); +} + +void SessionCore::stop() +{ + l3.received.close(); +} + +void SessionCore::handle(AMQFrame& frame) +{ + l2.incoming(frame); +} + +void SessionCore::closed(uint16_t code, const std::string& text) +{ + l3.received.close(); + futures.close(code, text); +} diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h new file mode 100644 index 0000000000..15cd36114f --- /dev/null +++ b/cpp/src/qpid/client/SessionCore.h @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _SessionCore_ +#define _SessionCore_ + +#include <boost/shared_ptr.hpp> +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/MethodContent.h" +#include "ChannelHandler.h" +#include "ExecutionHandler.h" +#include "FutureFactory.h" +#include "ReceivedContent.h" +#include "Response.h" + +namespace qpid { +namespace client { + +class SessionCore : public framing::FrameHandler +{ + ExecutionHandler l3; + ChannelHandler l2; + FutureFactory futures; + const uint16_t id; + bool sync; + +public: + typedef boost::shared_ptr<SessionCore> shared_ptr; + + SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize); + Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false); + Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false); + ReceivedContent::shared_ptr get(); + uint16_t getId() const { return id; } + void setSync(bool); + bool isSync(); + void flush(); + void open(); + void close(); + void stop(); + void closed(uint16_t code, const std::string& text); + + //for incoming frames: + void handle(framing::AMQFrame& frame); +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h new file mode 100644 index 0000000000..11d8d42cab --- /dev/null +++ b/cpp/src/qpid/framing/MethodContent.h @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _MethodContent_ +#define _MethodContent_ + +#include "HeaderProperties.h" + +namespace qpid { +namespace framing { + +class MethodContent +{ +public: + virtual ~MethodContent() {} + //TODO: rethink this interface + virtual const HeaderProperties& getMethodHeaders() const = 0; + virtual const std::string& getData() const = 0; +}; + +}} +#endif diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 582c7d6e55..f172d1765e 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -33,6 +33,7 @@ #include "qpid/framing/Responder.h" #include "InProcessBroker.h" #include "qpid/client/Connection.h" +#include "qpid/client/Connector.h" #include "qpid/client/ClientExchange.h" #include "qpid/client/ClientQueue.h" #include "qpid/framing/Correlator.h" @@ -386,9 +387,8 @@ class FramingTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame)) void testRequestResponseRoundtrip() { - broker::InProcessBroker ibroker(version); - client::Connection clientConnection; - clientConnection.setConnector(ibroker); + boost::shared_ptr<broker::InProcessBroker> ibroker(new broker::InProcessBroker(version)); + client::Connection clientConnection(boost::static_pointer_cast<client::Connector>(ibroker)); clientConnection.open(""); client::Channel c; clientConnection.openChannel(c); @@ -399,7 +399,9 @@ class FramingTest : public CppUnit::TestCase c.declareExchange(exchange); c.declareQueue(queue); c.bind(exchange, queue, "MyTopic", framing::FieldTable()); - broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); + c.close(); + clientConnection.close(); + broker::InProcessBroker::Conversation::const_iterator i = ibroker->conversation.begin(); ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 48ac80d30a..9f30ee584d 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -134,30 +134,7 @@ std::ostream& operator<<( } } // namespace broker - - -namespace client { -/** An in-process client+broker all in one. */ -class InProcessBrokerClient : public client::Connection { - public: - broker::InProcessBroker broker; - broker::InProcessBroker::Conversation& conversation; - - /** Constructor creates broker and opens client connection. */ - InProcessBrokerClient( - u_int32_t max_frame_size=65536, - framing::ProtocolVersion version= framing::highestProtocolVersion - ) : client::Connection(false, max_frame_size, version), - broker(version), - conversation(broker.conversation) - { - setConnector(broker); - open(""); - } -}; - - -}} // namespace qpid::client +} // namespace qpid #endif // _tests_InProcessBroker_h |