summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-05 13:25:36 +0000
committerGordon Sim <gsim@apache.org>2007-08-05 13:25:36 +0000
commitb2efcb6ed3e1e2104836928cda81ed69f2f24559 (patch)
tree392ae403dcb0d32da3edaeaf8a1f497679d9102c /cpp
parentb2fadec5d86e278d96112e915e67aec934e91046 (diff)
downloadqpid-python-b2efcb6ed3e1e2104836928cda81ed69f2f24559.tar.gz
Added first cut of generated client interface.
Old channel interface still supported; shares SessionCore with the new interface. Todo: allow applications to signal completion of received commands; keywrod args for interface. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@562866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/amqpgen.rb8
-rwxr-xr-xcpp/rubygen/cppgen.rb2
-rwxr-xr-xcpp/rubygen/samples/Operations.rb4
-rwxr-xr-xcpp/rubygen/samples/Proxy.rb4
-rw-r--r--cpp/rubygen/templates/Session.rb136
-rw-r--r--cpp/src/Makefile.am12
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp107
-rw-r--r--cpp/src/qpid/client/ClientChannel.h16
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp128
-rw-r--r--cpp/src/qpid/client/ClientMessage.h7
-rw-r--r--cpp/src/qpid/client/Connection.h67
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp5
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp120
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h71
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp23
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h9
-rw-r--r--cpp/src/qpid/client/Response.h63
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp115
-rw-r--r--cpp/src/qpid/client/SessionCore.h70
-rw-r--r--cpp/src/qpid/framing/MethodContent.h39
-rw-r--r--cpp/src/tests/FramingTest.cpp10
-rw-r--r--cpp/src/tests/InProcessBroker.h25
23 files changed, 732 insertions, 311 deletions
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb
index 5cad9d6336..0da1bfe824 100755
--- a/cpp/rubygen/amqpgen.rb
+++ b/cpp/rubygen/amqpgen.rb
@@ -76,6 +76,10 @@ end
class AmqpMethod < AmqpElement
def initialize(xml, amqp) super; end
+ def content()
+ attributes["content"]
+ end
+
def index() attributes["index"]; end
def fields()
@@ -84,7 +88,7 @@ class AmqpMethod < AmqpElement
# Responses to this method (0-9)
def responses()
- @cache_responses ||= elements.collect("response") { |el| new AmqpMethod(el,self) }
+ @cache_responses ||= elements.collect("response") { |el| AmqpMethod.new(el,self) }
end
# Methods this method responds to (0-9)
@@ -178,7 +182,7 @@ class Generator
if (@outdir != "-")
path=Pathname.new "#{@outdir}/#{file}"
path.parent.mkpath
- path.open('w') { |@out| yield }
+ path.open('w') { |@out| yield }
end
end
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb
index f58ce3a539..d369d23da1 100755
--- a/cpp/rubygen/cppgen.rb
+++ b/cpp/rubygen/cppgen.rb
@@ -128,7 +128,7 @@ class CppGen < Generator
# Write a .cpp file.
def cpp_file(path)
- file (path) do
+ file(path) do
gen Copyright
yield
end
diff --git a/cpp/rubygen/samples/Operations.rb b/cpp/rubygen/samples/Operations.rb
index 4d84e33b9f..1c245ca188 100755
--- a/cpp/rubygen/samples/Operations.rb
+++ b/cpp/rubygen/samples/Operations.rb
@@ -80,6 +80,6 @@ EOS
end
end
-OperationsGen.new("client",ARGV[0], amqp).generate()
-OperationsGen.new("server",ARGV[0], amqp).generate()
+OperationsGen.new("client",ARGV[0], Amqp).generate()
+OperationsGen.new("server",ARGV[0], Amqp).generate()
diff --git a/cpp/rubygen/samples/Proxy.rb b/cpp/rubygen/samples/Proxy.rb
index c63a2a9799..f7765f3729 100755
--- a/cpp/rubygen/samples/Proxy.rb
+++ b/cpp/rubygen/samples/Proxy.rb
@@ -148,6 +148,6 @@ EOS
end
-ProxyGen.new("client", ARGV[0], amqp).generate;
-ProxyGen.new("server", ARGV[0], amqp).generate;
+ProxyGen.new("client", ARGV[0], Amqp).generate;
+ProxyGen.new("server", ARGV[0], Amqp).generate;
diff --git a/cpp/rubygen/templates/Session.rb b/cpp/rubygen/templates/Session.rb
new file mode 100644
index 0000000000..5289a6af30
--- /dev/null
+++ b/cpp/rubygen/templates/Session.rb
@@ -0,0 +1,136 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+#
+$: << '..'
+require 'cppgen'
+
+class SessionGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @chassis="server"
+ @classname="Session"
+ end
+
+ def declare_method (m)
+ gen "Response #{m.amqp_parent.name.lcaps}#{m.cppname.caps}("
+ if (m.content())
+ params=m.signature + ["const MethodContent& content"]
+ else
+ params=m.signature
+ end
+ indent { gen params.join(",\n") }
+ gen ");\n\n"
+ end
+
+ def declare_class(c)
+ c.methods_on(@chassis).each { |m| declare_method(m) }
+ end
+
+ def define_method (m)
+ gen "Response Session::#{m.amqp_parent.name.lcaps}#{m.cppname.caps}("
+ if (m.content())
+ params=m.signature + ["const MethodContent& content"]
+ else
+ params=m.signature
+ end
+ indent { gen params.join(",\n") }
+ gen "){\n\n"
+ indent (2) {
+ gen "return impl->send(AMQMethodBody::shared_ptr(new #{m.body_name}("
+ params = ["version"] + m.param_names
+ gen params.join(", ")
+ other_params=[]
+ if (m.content())
+ other_params << "content"
+ end
+ if m.responses().empty?
+ other_params << "false"
+ else
+ other_params << "true"
+ end
+ gen ")), #{other_params.join(", ")});\n"
+ }
+ gen "}\n\n"
+ end
+
+ def define_class(c)
+ c.methods_on(@chassis).each { |m| define_method(m) }
+ end
+
+ def generate()
+ excludes = ["channel", "connection", "session", "execution"]
+
+ h_file("qpid/client/#{@classname}.h") {
+ gen <<EOS
+#include <sstream>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/MethodContent.h"
+#include "ConnectionImpl.h"
+#include "Response.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+using std::string;
+using framing::Content;
+using framing::FieldTable;
+using framing::MethodContent;
+using framing::SequenceNumberSet;
+
+class #{@classname} {
+ ConnectionImpl::shared_ptr parent;
+ SessionCore::shared_ptr impl;
+ framing::ProtocolVersion version;
+public:
+ #{@classname}(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+ ~#{@classname}();
+
+ ReceivedContent::shared_ptr get() { return impl->get(); }
+ void close() { impl->close(); parent->released(impl); }
+
+EOS
+ indent { @amqp.classes.each { |c| declare_class(c) if !excludes.include?(c.name) } }
+ gen <<EOS
+}; /* class #{@classname} */
+}
+}
+EOS
+}
+
+ # .cpp file
+ cpp_file("qpid/client/#{@classname}.cpp") {
+ gen <<EOS
+#include "#{@classname}.h"
+#include "qpid/framing/AMQMethodBody.h"
+
+using std::string;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace client {
+
+#{@classname}::#{@classname}(ConnectionImpl::shared_ptr _parent, SessionCore::shared_ptr _impl) : parent(_parent), impl(_impl) {}
+
+#{@classname}::~#{@classname}()
+{
+ impl->stop();
+ parent->released(impl);
+}
+
+EOS
+
+ @amqp.classes.each { |c| define_class(c) if !excludes.include?(c.name) }
+
+ gen <<EOS
+}} // namespace qpid::client
+EOS
+ }
+
+ end
+end
+
+SessionGen.new(ARGV[0], Amqp).generate()
+
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index abb3587952..ad720754c1 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,10 +46,10 @@ rgen_tdir=$(rgen_dir)/templates
rgen_script=$(rgen_dir)/generate
rgen_cmd=ruby -I $(rgen_dir) $(rgen_script)
-rgen_templates=$(rgen_tdir)/frame_body_lists.rb
+rgen_templates=$(rgen_tdir)/frame_body_lists.rb $(rgen_tdir)/Session.rb
rubygen.mk: $(rgen_script) $(specs) $(rgen_templates)
- gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; echo "rgen_srcs=$$gen" > $@
+ gen=`$(rgen_cmd) . $(specs) $(rgen_templates)` ; echo Generated $$gen; echo rgen_srcs=$$gen > $@
$(rgen_srcs): rubygen.mk
@@ -228,6 +228,7 @@ libqpidclient_la_SOURCES = \
qpid/client/ClientChannel.cpp \
qpid/client/ClientExchange.cpp \
qpid/client/ClientQueue.cpp \
+ qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
qpid/client/MessageListener.cpp \
qpid/client/ResponseHandler.cpp \
@@ -241,6 +242,8 @@ libqpidclient_la_SOURCES = \
qpid/client/FutureResponse.cpp \
qpid/client/FutureFactory.cpp \
qpid/client/ReceivedContent.cpp \
+ qpid/client/Session.cpp \
+ qpid/client/SessionCore.cpp \
qpid/client/StateManager.cpp
@@ -319,6 +322,7 @@ nobase_include_HEADERS = \
qpid/client/ClientMessage.h \
qpid/client/ClientQueue.h \
qpid/client/Connection.h \
+ qpid/client/ConnectionImpl.h \
qpid/client/Connector.h \
qpid/client/MessageChannel.h \
qpid/client/MessageListener.h \
@@ -336,6 +340,9 @@ nobase_include_HEADERS = \
qpid/client/FutureResponse.h \
qpid/client/FutureFactory.h \
qpid/client/ReceivedContent.h \
+ qpid/client/Response.h \
+ qpid/client/Session.h \
+ qpid/client/SessionCore.h \
qpid/client/StateManager.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQContentBody.h \
@@ -356,6 +363,7 @@ nobase_include_HEADERS = \
qpid/framing/HeaderProperties.h \
qpid/framing/InitiationHandler.h \
qpid/framing/InputHandler.h \
+ qpid/framing/MethodContent.h \
qpid/framing/MethodContext.h \
qpid/framing/OutputHandler.h \
qpid/framing/ProtocolInitiation.h \
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 8b85017ba0..f407b5a2f9 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -49,28 +49,19 @@ const std::string empty;
}}
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
+ prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
}
-Channel::~Channel(){
- closeInternal();
-}
+Channel::~Channel(){}
-void Channel::open(ChannelId id, Connection& con)
+void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
{
if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id);
- connection = &con;
- channelId = id;
- //link up handlers:
- channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1);
- channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1);
- executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1);
- //set up close notification:
- channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2);
-
- channelHandler.open(id);
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+
+ connection = c;
+ session = s;
}
bool Channel::isOpen() const {
@@ -79,10 +70,10 @@ bool Channel::isOpen() const {
}
void Channel::setQos() {
- executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
+ sendSync(false, make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false)));
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- executionHandler.send(make_shared_ptr(new TxSelectBody(version)));
+ sendSync(false, make_shared_ptr(new TxSelectBody(version)));
}
}
@@ -133,63 +124,52 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
}
void Channel::commit(){
- executionHandler.send(make_shared_ptr(new TxCommitBody(version)));
+ sendSync(false, make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- executionHandler.send(make_shared_ptr(new TxRollbackBody(version)));
+ sendSync(false, make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::close()
{
- channelHandler.close();
+ session->close();
{
Mutex::ScopedLock l(lock);
if (connection);
{
- connection->erase(channelId);
- connection = 0;
+ connection->released(session);
+ connection.reset();
}
}
stop();
}
-
// Channel closed by peer.
void Channel::peerClose(uint16_t code, const std::string& message) {
assert(isOpen());
//record reason:
errorCode = code;
errorText = message;
- closeInternal();
stop();
- futures.close(code, message);
-}
-
-void Channel::closeInternal() {
- Mutex::ScopedLock l(lock);
- if (connection);
- {
- connection = 0;
- }
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/)
{
-
- boost::shared_ptr<FutureResponse> fr(futures.createResponse());
- executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1));
- return fr->getResponse();
+ session->setSync(true);
+ Response r = session->send(toSend, true);
+ session->setSync(false);
+ return r.getPtr();
}
void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command)
{
if(sync) {
- boost::shared_ptr<FutureCompletion> fc(futures.createCompletion());
- executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc));
- fc->waitForCompletion();
+ session->setSync(true);
+ session->send(command, false);
+ session->setSync(false);
} else {
- executionHandler.send(command);
+ session->send(command);
}
}
@@ -199,7 +179,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- executionHandler.send(body);
+ session->send(body);
return AMQMethodBody::shared_ptr();
}
}
@@ -246,8 +226,8 @@ void Channel::cancel(const std::string& tag, bool synch) {
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode));
- AMQMethodBody::shared_ptr response = sendAndReceive(request);
- if (response && response->isA<BasicGetEmptyBody>()) {
+ Response response = session->send(request, true);
+ if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
ReceivedContent::shared_ptr content = gets.pop();
@@ -263,38 +243,7 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)),
- msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this!
- /*
- // Make a header for the message
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(
- *static_cast<BasicHeaderProperties*>(header->getProperties()), msg);
- header->setContentSize(msg.getData().size());
-
- executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
- executionHandler.sendContent(header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- //frame itself uses 8 bytes
- u_int32_t frag_size = connection->getMaxFrameSize() - 8;
- if(data_length < frag_size){
- executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data)));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag)));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
- */
+ session->send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), msg, false);
}
void Channel::start(){
@@ -303,7 +252,7 @@ void Channel::start(){
}
void Channel::stop() {
- executionHandler.received.close();
+ session->stop();
gets.close();
Mutex::ScopedLock l(stopLock);
if(running) {
@@ -315,7 +264,7 @@ void Channel::stop() {
void Channel::run() {
try {
while (true) {
- ReceivedContent::shared_ptr content = executionHandler.received.pop();
+ ReceivedContent::shared_ptr content = session->get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index 4853603281..5feba6262f 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -26,9 +26,8 @@
#include "ClientExchange.h"
#include "ClientMessage.h"
#include "ClientQueue.h"
-#include "ChannelHandler.h"
-#include "ExecutionHandler.h"
-#include "FutureFactory.h"
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
#include "qpid/Exception.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -71,7 +70,6 @@ class Channel : private sys::Runnable
typedef std::map<std::string, Consumer> ConsumerMap;
mutable sys::Mutex lock;
- Connection* connection;
sys::Thread dispatcher;
uint16_t prefetch;
@@ -85,11 +83,10 @@ class Channel : private sys::Runnable
bool running;
ConsumerMap consumers;
- ExecutionHandler executionHandler;
- ChannelHandler channelHandler;
+ ConnectionImpl::shared_ptr connection;
+ SessionCore::shared_ptr session;
framing::ChannelId channelId;
BlockingQueue<ReceivedContent::shared_ptr> gets;
- FutureFactory futures;
void stop();
@@ -121,7 +118,7 @@ class Channel : private sys::Runnable
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
- void open(framing::ChannelId, Connection&);
+ void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
void closeInternal();
void peerClose(uint16_t, const std::string&);
@@ -257,9 +254,6 @@ class Channel : private sys::Runnable
/** True if the channel is open */
bool isOpen() const;
- /** Get the connection associated with this channel */
- Connection& getConnection() { return *connection; }
-
/** Return the protocol version */
framing::ProtocolVersion getVersion() const { return version ; }
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index c998ec30df..3ae1478152 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -41,31 +41,20 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-const std::string Connection::OK("OK");
-
-Connection::Connection(
- bool _debug, uint32_t _max_frame_size,
- framing::ProtocolVersion _version
- ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size),
- defaultConnector(version, _debug, _max_frame_size),
- isOpen(false), debug(_debug)
-{
- setConnector(defaultConnector);
-
- handler.maxFrameSize = _max_frame_size;
-}
+Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
+ channelIdCounter(0), version(_version),
+ max_frame_size(_max_frame_size),
+ impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
+ isOpen(false) {}
+
+Connection::Connection(boost::shared_ptr<Connector> c) :
+ channelIdCounter(0), version(framing::highestProtocolVersion),
+ max_frame_size(65536),
+ impl(new ConnectionImpl(c)),
+ isOpen(false) {}
Connection::~Connection(){}
-void Connection::setConnector(Connector& con)
-{
- connector = &con;
- connector->setInputHandler(&handler);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
-}
-
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
@@ -73,97 +62,28 @@ void Connection::open(
if (isOpen)
THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
- //wire up the handler:
- handler.in = boost::bind(&Connection::received, this, _1);
- handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&Connection::closeChannels, this);
-
- handler.uid = uid;
- handler.pwd = pwd;
- handler.vhost = vhost;
-
- connector->connect(host, port);
- connector->init();
- handler.waitForOpen();
+ impl->open(host, port, uid, pwd, vhost);
isOpen = true;
}
-void Connection::shutdown() {
- //this indicates that the socket to the server has closed we do
- //not want to send a close request (or any other requests)
- if(markClosed()) {
- QPID_LOG(info, "Connection to peer closed!");
- closeChannels();
- }
-}
-
-void Connection::close(
- ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/
-)
-{
- if(markClosed()) {
- try {
- handler.close();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception closing channel: " << e.what());
- }
- closeChannels();
- connector->close();
- }
-}
-
-bool Connection::markClosed()
-{
- Mutex::ScopedLock locker(shutdownLock);
- if (isOpen) {
- isOpen = false;
- return true;
- } else {
- return false;
- }
-}
-
-void Connection::closeChannels()
-{
- using boost::bind;
- for_each(channels.begin(), channels.end(),
- bind(&Channel::closeInternal,
- bind(&ChannelMap::value_type::second, _1)));
- channels.clear();
-}
-
void Connection::openChannel(Channel& channel) {
ChannelId id = ++channelIdCounter;
- assert (channels.find(id) == channels.end());
- assert(out);
- channels[id] = &channel;
- channel.open(id, *this);
-}
-
-void Connection::erase(ChannelId id) {
- channels.erase(id);
+ SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+ impl->allocated(session);
+ channel.open(impl, session);
+ session->open();
}
-void Connection::received(AMQFrame& frame){
- ChannelId id = frame.getChannel();
- Channel* channel = channels[id];
- if (channel == 0) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- channel->channelHandler.incoming(frame);
-}
-
-void Connection::send(AMQFrame& frame) {
- out->send(frame);
-}
-
-void Connection::idleIn(){
- connector->close();
+Session Connection::newSession() {
+ ChannelId id = ++channelIdCounter;
+ SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
+ impl->allocated(session);
+ return Session(impl, session);
}
-void Connection::idleOut(){
- AMQFrame frame(version, 0, new AMQHeartbeatBody());
- out->send(frame);
+void Connection::close()
+{
+ impl->close();
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index 3de3a693b9..fd33fbc830 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -23,6 +23,7 @@
*/
#include <string>
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MethodContent.h"
namespace qpid {
namespace client {
@@ -35,11 +36,11 @@ namespace client {
*/
// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
// basic header properties.
-class Message : public framing::BasicHeaderProperties {
+class Message : public framing::BasicHeaderProperties, public framing::MethodContent {
public:
Message(const std::string& data_=std::string()) : data(data_) {}
- std::string getData() const { return data; }
+ const std::string& getData() const { return data; }
void setData(const std::string& _data) { data = _data; }
std::string getDestination() const { return destination; }
@@ -52,6 +53,8 @@ class Message : public framing::BasicHeaderProperties {
bool isRedelivered() const { return redelivered; }
void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+ const HeaderProperties& getMethodHeaders() const { return *this; }
+
private:
std::string data;
std::string destination;
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 4d32456c40..6f58986f25 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -25,11 +25,8 @@
#include <string>
#include "qpid/QpidError.h"
#include "ClientChannel.h"
-#include "Connector.h"
-#include "ConnectionHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/ShutdownHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
+#include "ConnectionImpl.h"
+#include "Session.h"
namespace qpid {
@@ -42,22 +39,6 @@ namespace qpid {
*/
namespace client {
-/**
- * \internal provide access to selected private channel functions
- * for the Connection without making it a friend of the entire channel.
- */
-class ConnectionForChannel :
- public framing::InputHandler,
- public framing::OutputHandler,
- public sys::TimeoutHandler,
- public sys::ShutdownHandler
-
-{
- private:
- friend class Channel;
- virtual void erase(framing::ChannelId) = 0;
-};
-
/**
* \defgroup clientapi Application API for an AMQP client
@@ -70,30 +51,17 @@ class ConnectionForChannel :
*
* \ingroup clientapi
*/
-class Connection : public ConnectionForChannel
+class Connection
{
- typedef std::map<framing::ChannelId, Channel*> ChannelMap;
-
framing::ChannelId channelIdCounter;
- static const std::string OK;
-
framing::ProtocolVersion version;
const uint32_t max_frame_size;
- ChannelMap channels;
- ConnectionHandler handler;
- Connector defaultConnector;
- Connector* connector;
- framing::OutputHandler* out;
+ ConnectionImpl::shared_ptr impl;
bool isOpen;
- sys::Mutex shutdownLock;
bool debug;
-
- void erase(framing::ChannelId);
- void closeChannels();
- bool markClosed();
// TODO aconway 2007-01-26: too many friendships, untagle these classes.
- friend class Channel;
+ friend class Channel;
public:
/**
@@ -111,6 +79,7 @@ class Connection : public ConnectionForChannel
*/
Connection(bool debug = false, uint32_t max_frame_size = 65536,
framing::ProtocolVersion=framing::highestProtocolVersion);
+ Connection(boost::shared_ptr<Connector>);
~Connection();
/**
@@ -136,13 +105,12 @@ class Connection : public ConnectionForChannel
const std::string& virtualhost = "/");
/**
- * Close the connection with optional error information for the peer.
+ * Close the connection
*
* Any further use of this connection (without reopening it) will
* not succeed.
*/
- void close(framing::ReplyCode=200, const std::string& msg=OK,
- framing::ClassId = 0, framing::MethodId = 0);
+ void close();
/**
* Associate a Channel with this connection and open it for use.
@@ -156,24 +124,7 @@ class Connection : public ConnectionForChannel
*/
void openChannel(Channel&);
-
- // TODO aconway 2007-01-26: can these be private?
- void send(framing::AMQFrame&);
- void received(framing::AMQFrame&);
- void idleOut();
- void idleIn();
- void shutdown();
-
- /**\internal used for testing */
- void setConnector(Connector& connector);
-
- /**
- * @return the maximum frame size in use on this connection
- */
- inline uint32_t getMaxFrameSize(){ return max_frame_size; }
-
- /** @return protocol version in use on this connection. */
- //framing::ProtocolVersion getVersion() const { return version; }
+ Session newSession();
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index ada3fa4fb0..f20f597d1f 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -175,8 +175,9 @@ void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
if (method->isA<ConnectionCloseBody>()) {
send(make_shared_ptr(new ConnectionCloseOkBody(version)));
setState(CLOSED);
- if (onClose) {
- onClose();
+ if (onError) {
+ ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+ onError(c->getReplyCode(), c->getReplyText());
}
} else {
error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 50618b50b1..464d0ca26d 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -61,6 +61,7 @@ class ConnectionHandler : private StateManager,
public:
typedef boost::function<void()> CloseListener;
+ typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
ConnectionHandler();
@@ -73,6 +74,7 @@ public:
void close();
CloseListener onClose;
+ ErrorListener onError;
};
}}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
new file mode 100644
index 0000000000..887790e4f0
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionImpl.h"
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
+{
+ handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
+ handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.onClose = boost::bind(&ConnectionImpl::closed, this);
+ connector->setInputHandler(&handler);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+}
+
+void ConnectionImpl::allocated(SessionCore::shared_ptr session)
+{
+ if (sessions.find(session->getId()) != sessions.end()) {
+ throw Exception("Id already in use.");
+ }
+ sessions[session->getId()] = session;
+}
+
+void ConnectionImpl::released(SessionCore::shared_ptr session)
+{
+ SessionMap::iterator i = sessions.find(session->getId());
+ if (i == sessions.end()) {
+ throw Exception("Id not in use.");
+ }
+ sessions.erase(i);
+}
+
+void ConnectionImpl::handle(framing::AMQFrame& frame)
+{
+ handler.outgoing(frame);
+}
+
+void ConnectionImpl::incoming(framing::AMQFrame& frame)
+{
+ uint16_t id = frame.getChannel();
+ SessionCore::shared_ptr session = sessions[id];
+ if (!session) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
+ }
+ session->handle(frame);
+}
+
+void ConnectionImpl::open(const std::string& host, int port,
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost)
+{
+ //TODO: better management of connection properties
+ handler.uid = uid;
+ handler.pwd = pwd;
+ handler.vhost = vhost;
+
+ connector->connect(host, port);
+ connector->init();
+ handler.waitForOpen();
+}
+
+void ConnectionImpl::close()
+{
+ handler.close();
+}
+
+void ConnectionImpl::closed()
+{
+ closed(200, "OK");
+}
+
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
+{
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
+ i->second->closed(code, text);
+ }
+ sessions.clear();
+ connector->close();
+}
+
+void ConnectionImpl::idleIn()
+{
+ connector->close();
+}
+
+void ConnectionImpl::idleOut()
+{
+ AMQFrame frame(version, 0, new AMQHeartbeatBody());
+ connector->send(frame);
+}
+
+void ConnectionImpl::shutdown() {
+ //this indicates that the socket to the server has closed
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
+ i->second->closed(0, "Unexpected scoket closure.");
+ }
+ sessions.clear();
+}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
new file mode 100644
index 0000000000..8b46d774bf
--- /dev/null
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _ConnectionImpl_
+#define _ConnectionImpl_
+
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "ConnectionHandler.h"
+#include "Connector.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class ConnectionImpl : public framing::FrameHandler,
+ public sys::TimeoutHandler,
+ public sys::ShutdownHandler
+
+{
+ typedef std::map<uint16_t, SessionCore::shared_ptr> SessionMap;
+ SessionMap sessions;
+ ConnectionHandler handler;
+ boost::shared_ptr<Connector> connector;
+ framing::ProtocolVersion version;
+
+ void incoming(framing::AMQFrame& frame);
+ void closed();
+ void closed(uint16_t, const std::string&);
+ void idleOut();
+ void idleIn();
+ void shutdown();
+public:
+ typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
+
+ ConnectionImpl(boost::shared_ptr<Connector> c);
+ void allocated(SessionCore::shared_ptr);
+ void released(SessionCore::shared_ptr);
+ void open(const std::string& host, int port = 5672,
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& virtualhost = "/");
+ void close();
+ void handle(framing::AMQFrame& frame);
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index e4270f4e98..abfce4f9d1 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -50,7 +50,8 @@ bool invoke(AMQBody::shared_ptr body, Invocable* target)
return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
}
-ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {}
+ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
+ version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {}
//incoming:
void ExecutionHandler::handle(AMQFrame& frame)
@@ -97,6 +98,12 @@ void ExecutionHandler::flush()
//make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
+void ExecutionHandler::sendFlush()
+{
+ AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ out(frame);
+}
+
void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
{
//allocate id:
@@ -111,21 +118,9 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List
AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
out(frame);
-
- if (f) {
- AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
- out(frame);
- }
-}
-
-void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content)
-{
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content);
- out(frame);
}
void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
- uint64_t frameSize,
CompletionTracker::Listener f, Correlator::Listener g)
{
send(command, f, g);
@@ -139,7 +134,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
u_int64_t data_length = data.length();
if(data_length > 0){
//frame itself uses 8 bytes
- u_int32_t frag_size = frameSize - 8;
+ u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
out(frame);
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index 99b0f4b915..f62598ef95 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -43,6 +43,7 @@ class ExecutionHandler :
Correlator correlation;
CompletionTracker completion;
framing::ProtocolVersion version;
+ uint64_t maxFrameSize;
void complete(uint32_t mark, framing::SequenceNumberSet range);
void flush();
@@ -50,7 +51,9 @@ class ExecutionHandler :
public:
BlockingQueue<ReceivedContent::shared_ptr> received;
- ExecutionHandler();
+ ExecutionHandler(uint64_t maxFrameSize = 65536);
+
+ void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
void handle(framing::AMQFrame& frame);
void send(framing::AMQBody::shared_ptr command,
@@ -58,11 +61,9 @@ public:
Correlator::Listener g = Correlator::Listener());
void sendContent(framing::AMQBody::shared_ptr command,
const framing::BasicHeaderProperties& headers, const std::string& data,
- uint64_t frameSize,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
-
- void sendContent(framing::AMQBody::shared_ptr content);
+ void sendFlush();
};
}}
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
new file mode 100644
index 0000000000..f44cd72783
--- /dev/null
+++ b/cpp/src/qpid/client/Response.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Response_
+#define _Response_
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureResponse.h"
+
+namespace qpid {
+namespace client {
+
+class Response
+{
+ boost::shared_ptr<FutureResponse> future;
+
+public:
+ Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+
+ template <class T> T& as()
+ {
+ framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ return boost::shared_polymorphic_cast<T>(*response);
+ }
+ template <class T> bool isA()
+ {
+ return future->getResponse()->isA<T>();
+ }
+
+ void sync()
+ {
+ return future->waitForCompletion();
+ }
+
+ //TODO: only exposed for old channel class, may want to hide this eventually
+ framing::AMQMethodBody::shared_ptr getPtr()
+ {
+ return future->getResponse();
+ }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
new file mode 100644
index 0000000000..391dcd909d
--- /dev/null
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionCore.h"
+#include <boost/bind.hpp>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
+ uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false)
+{
+ l2.out = boost::bind(&FrameHandler::handle, out, _1);
+ l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
+ l3.out = boost::bind(&ChannelHandler::outgoing, &l2, _1);
+ l2.onClose = boost::bind(&SessionCore::closed, this, _1, _2);
+}
+
+void SessionCore::open()
+{
+ l2.open(id);
+}
+
+void SessionCore::flush()
+{
+ l3.sendFlush();
+}
+
+Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse)
+{
+ boost::shared_ptr<FutureResponse> f(futures.createResponse());
+ if (expectResponse) {
+ l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
+ } else {
+ l3.send(method, boost::bind(&FutureResponse::completed, f));
+ }
+ if (sync) {
+ flush();
+ f->waitForCompletion();
+ }
+ return Response(f);
+}
+
+Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse)
+{
+ //TODO: lots of duplication between these two send methods; refactor
+ boost::shared_ptr<FutureResponse> f(futures.createResponse());
+ if (expectResponse) {
+ l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
+ boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
+ } else {
+ l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
+ boost::bind(&FutureResponse::completed, f));
+ }
+ if (sync) {
+ flush();
+ f->waitForCompletion();
+ }
+ return Response(f);
+}
+
+ReceivedContent::shared_ptr SessionCore::get()
+{
+ return l3.received.pop();
+}
+
+void SessionCore::setSync(bool s)
+{
+ sync = s;
+}
+
+bool SessionCore::isSync()
+{
+ return sync;
+}
+
+void SessionCore::close()
+{
+ l2.close();
+ l3.received.close();
+}
+
+void SessionCore::stop()
+{
+ l3.received.close();
+}
+
+void SessionCore::handle(AMQFrame& frame)
+{
+ l2.incoming(frame);
+}
+
+void SessionCore::closed(uint16_t code, const std::string& text)
+{
+ l3.received.close();
+ futures.close(code, text);
+}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
new file mode 100644
index 0000000000..15cd36114f
--- /dev/null
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _SessionCore_
+#define _SessionCore_
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/MethodContent.h"
+#include "ChannelHandler.h"
+#include "ExecutionHandler.h"
+#include "FutureFactory.h"
+#include "ReceivedContent.h"
+#include "Response.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore : public framing::FrameHandler
+{
+ ExecutionHandler l3;
+ ChannelHandler l2;
+ FutureFactory futures;
+ const uint16_t id;
+ bool sync;
+
+public:
+ typedef boost::shared_ptr<SessionCore> shared_ptr;
+
+ SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
+ Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false);
+ Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false);
+ ReceivedContent::shared_ptr get();
+ uint16_t getId() const { return id; }
+ void setSync(bool);
+ bool isSync();
+ void flush();
+ void open();
+ void close();
+ void stop();
+ void closed(uint16_t code, const std::string& text);
+
+ //for incoming frames:
+ void handle(framing::AMQFrame& frame);
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/MethodContent.h b/cpp/src/qpid/framing/MethodContent.h
new file mode 100644
index 0000000000..11d8d42cab
--- /dev/null
+++ b/cpp/src/qpid/framing/MethodContent.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MethodContent_
+#define _MethodContent_
+
+#include "HeaderProperties.h"
+
+namespace qpid {
+namespace framing {
+
+class MethodContent
+{
+public:
+ virtual ~MethodContent() {}
+ //TODO: rethink this interface
+ virtual const HeaderProperties& getMethodHeaders() const = 0;
+ virtual const std::string& getData() const = 0;
+};
+
+}}
+#endif
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 582c7d6e55..f172d1765e 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -33,6 +33,7 @@
#include "qpid/framing/Responder.h"
#include "InProcessBroker.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/Connector.h"
#include "qpid/client/ClientExchange.h"
#include "qpid/client/ClientQueue.h"
#include "qpid/framing/Correlator.h"
@@ -386,9 +387,8 @@ class FramingTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(string(expect, sizeof(expect)-1), boost::lexical_cast<string>(frame))
void testRequestResponseRoundtrip() {
- broker::InProcessBroker ibroker(version);
- client::Connection clientConnection;
- clientConnection.setConnector(ibroker);
+ boost::shared_ptr<broker::InProcessBroker> ibroker(new broker::InProcessBroker(version));
+ client::Connection clientConnection(boost::static_pointer_cast<client::Connector>(ibroker));
clientConnection.open("");
client::Channel c;
clientConnection.openChannel(c);
@@ -399,7 +399,9 @@ class FramingTest : public CppUnit::TestCase
c.declareExchange(exchange);
c.declareQueue(queue);
c.bind(exchange, queue, "MyTopic", framing::FieldTable());
- broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin();
+ c.close();
+ clientConnection.close();
+ broker::InProcessBroker::Conversation::const_iterator i = ibroker->conversation.begin();
ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++);
ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++);
ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++);
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 48ac80d30a..9f30ee584d 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -134,30 +134,7 @@ std::ostream& operator<<(
}
} // namespace broker
-
-
-namespace client {
-/** An in-process client+broker all in one. */
-class InProcessBrokerClient : public client::Connection {
- public:
- broker::InProcessBroker broker;
- broker::InProcessBroker::Conversation& conversation;
-
- /** Constructor creates broker and opens client connection. */
- InProcessBrokerClient(
- u_int32_t max_frame_size=65536,
- framing::ProtocolVersion version= framing::highestProtocolVersion
- ) : client::Connection(false, max_frame_size, version),
- broker(version),
- conversation(broker.conversation)
- {
- setConnector(broker);
- open("");
- }
-};
-
-
-}} // namespace qpid::client
+} // namespace qpid
#endif // _tests_InProcessBroker_h