summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-13 02:41:14 +0000
committerAlan Conway <aconway@apache.org>2007-02-13 02:41:14 +0000
commit9517deedff9691dbe3429b0b917dfd4208b0b1b8 (patch)
treef8868a2fbc63e92c770b401eeff2aee3a522697a /cpp/lib/broker/Connection.cpp
parentd26ea3376f66f69486fe214c8a7a8b96a7605c99 (diff)
downloadqpid-python-9517deedff9691dbe3429b0b917dfd4208b0b1b8.tar.gz
* gentools/templ.cpp/*Proxy*, CppGenerator.java: Changes to Proxy
classes to make them directly usable as an API for low-level AMQP access. - Proxies hold reference to a ChannelAdapter not just an output handler. - Removed MethodContext parameter, makes no sense on requester end. - Return RequestId from request methods so caller can correlate incoming responses. - Add RequestId parameter to response methods so caller can provide correlation for outgoing responses. - No longer inherit from *Operations classes as the signatures no longer match. Proxy is for caller (client/requester) and Operations is for callee (server/responder) * cpp/lib/client/ClientChannel.h: Channel provides a raw proxy to the broker. Normal users will still use the Channel API to deal with the broker, but advanced users (incl ourselves!) can use the raw API to directly send and receive any AMQP message. * cpp/lib/broker/BrokerChannel,BrokerAdapter: Refactor for new proxies. broker::Channel is also a ClientProxy * Sundry files: - Pass ProtcolVersion by value, it is only two bytes. - Misc. const correctness fixes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Connection.cpp')
-rw-r--r--cpp/lib/broker/Connection.cpp43
1 files changed, 23 insertions, 20 deletions
diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp
index 000199a65e..3d9e5cdaf8 100644
--- a/cpp/lib/broker/Connection.cpp
+++ b/cpp/lib/broker/Connection.cpp
@@ -22,6 +22,9 @@
#include <assert.h>
#include "Connection.h"
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+#include "BrokerAdapter.h"
using namespace boost;
using namespace qpid::sys;
@@ -33,12 +36,15 @@ namespace broker {
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
broker(broker_),
- settings(broker.getTimeout(), broker.getStagingThreshold()),
out(out_),
framemax(65536),
- heartbeat(0)
+ heartbeat(0),
+ client(0),
+ timeout(broker.getTimeout()),
+ stagingThreshold(broker.getStagingThreshold())
{}
+
Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){
Queue::shared_ptr queue;
if (name.empty()) {
@@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
}
-void Connection::received(qpid::framing::AMQFrame* frame){
+void Connection::received(framing::AMQFrame* frame){
getChannel(frame->getChannel()).handleBody(frame->getBody());
}
-void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){
- client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId);
+void Connection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ client->close(code, text, classId, methodId);
getOutput().close();
}
-// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter
-// as it is part of the protocol.
-void Connection::initiated(qpid::framing::ProtocolInitiation* header) {
- if (client.get())
- // TODO aconway 2007-01-16: correct error code.
- throw ConnectionException(0, "Connection initiated twice");
- client.reset(new qpid::framing::AMQP_ClientProxy(
- out, header->getMajor(), header->getMinor()));
+void Connection::initiated(framing::ProtocolInitiation* header) {
+ version = ProtocolVersion(header->getMajor(), header->getMinor());
FieldTable properties;
string mechanisms("PLAIN");
string locales("en_US");
- client->getConnection().start(
- MethodContext(&getChannel(0)),
+ getChannel(0).init(0, *out, getVersion());
+ client = &getChannel(0).getAdatper().getProxy().getConnection();
+ client->start(
header->getMajor(), header->getMinor(),
properties, mechanisms, locales);
- getChannel(0).init(0, *out, client->getProtocolVersion());
}
void Connection::idleOut(){}
@@ -103,9 +105,10 @@ void Connection::closed(){
}
}
-void Connection::closeChannel(u_int16_t channel) {
- getChannel(channel).close();
- channels.erase(channels.find(channel));
+void Connection::closeChannel(u_int16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end())
+ i->close();
}
@@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) {
i = channels.insert(
id, new Channel(
*this, id, framemax, broker.getQueues().getStore(),
- settings.stagingThreshold)).first;
+ broker.getStagingThreshold())).first;
}
return *i;
}