diff options
Diffstat (limited to 'cpp/lib/client')
-rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 39 | ||||
-rw-r--r-- | cpp/lib/client/ClientChannel.h | 8 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 60 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 26 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 43 |
5 files changed, 91 insertions, 85 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index b93596ebfc..a207763aac 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -85,13 +85,9 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - connection->send( - new AMQFrame( - version, 0, - new ConnectionTuneOkBody( - version, proposal->getChannelMax(), - connection->getMaxFrameSize(), - proposal->getHeartbeat()))); + (new ConnectionTuneOkBody( + version, proposal->getChannelMax(), connection->getMaxFrameSize(), + proposal->getHeartbeat()))->send(context); u_int16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -100,9 +96,8 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - send(new AMQFrame( - version, 0, - new ConnectionOpenBody(version, vhost, capabilities, true))); + (new ConnectionOpenBody(version, vhost, capabilities, true)) + ->send(context); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). responses.waitForResponse(); @@ -213,7 +208,8 @@ void Channel::cancel(const std::string& tag, bool synch) { if (i != consumers.end()) { Consumer& c = i->second; if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); sendAndReceiveSync<BasicCancelOkBody>( synch, new BasicCancelBody(version, tag, !synch)); consumers.erase(tag); @@ -231,7 +227,8 @@ void Channel::cancelAll(){ // trying the rest. NB no memory leaks if we do, // ConsumerMap holds values, not pointers. // - send(new BasicAckBody(version, c.lastDeliveryTag, true)); + (new BasicAckBody(version, c.lastDeliveryTag, true)) + ->send(context); } } } @@ -251,9 +248,8 @@ void Channel::retrieve(Message& msg){ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { string name = queue.getName(); - AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode)); responses.expect(); - send(body); + (new BasicGetBody(version, 0, name, ackMode))->send(context); responses.waitForResponse(); AMQMethodBody::shared_ptr response = responses.getResponse(); if(response->isA<BasicGetOkBody>()) { @@ -276,10 +272,12 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode) { void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ + // FIXME aconway 2007-01-30: Rework for message class. + string e = exchange.getName(); string key = routingKey; - send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); + (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); @@ -428,7 +426,8 @@ void Channel::deliver(Consumer& consumer, Message& msg){ if(++(consumer.count) < prefetch) break; //else drop-through case AUTO_ACK: - send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); + (new BasicAckBody(version, msg.getDeliveryTag(), multiple)) + ->send(context); consumer.lastDeliveryTag = 0; } } @@ -510,20 +509,20 @@ void Channel::closeInternal() { dispatcher.join(); } -void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m) +void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) { responses.expect(); - send(toSend); + toSend->send(context); responses.receive(c, m); } void Channel::sendAndReceiveSync( - bool sync, AMQBody* body, ClassId c, MethodId m) + bool sync, AMQMethodBody* body, ClassId c, MethodId m) { if(sync) sendAndReceive(body, c, m); else - send(body); + body->send(context); } diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 67274ddfc4..a34c95d2c4 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -124,21 +124,21 @@ class Channel : public framing::ChannelAdapter, const std::string& vhost); void sendAndReceive( - framing::AMQBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); void sendAndReceiveSync( bool sync, - framing::AMQBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody*, framing::ClassId, framing::MethodId); template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) { + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID); return boost::shared_polymorphic_downcast<BodyType>( responses.getResponse()); } template <class BodyType> void sendAndReceiveSync( - bool sync, framing::AMQBody* body) { + bool sync, framing::AMQMethodBody* body) { sendAndReceiveSync( sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID); } diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 19d5cce7db..bf6c44570d 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -27,6 +27,8 @@ #include <iostream> #include <sstream> #include <MethodBodyInstances.h> +#include <boost/bind.hpp> +#include <functional> using namespace qpid::framing; using namespace qpid::sys; @@ -41,45 +43,59 @@ ChannelId Connection::channelIdCounter; const std::string Connection::OK("OK"); Connection::Connection( - bool debug, u_int32_t _max_frame_size, + bool _debug, u_int32_t _max_frame_size, const framing::ProtocolVersion& _version -) : max_frame_size(_max_frame_size), closed(true), - version(_version) -{ - connector = new Connector(version, debug, _max_frame_size); -} +) : version(_version), max_frame_size(_max_frame_size), + defaultConnector(version, debug, max_frame_size), + connector(&defaultConnector), + isOpen(false), debug(_debug) +{} Connection::~Connection(){ - delete connector; + close(); } -void Connection::open( - const std::string& _host, int _port, const std::string& uid, - const std::string& pwd, const std::string& virtualhost) +void Connection::setConnector(Connector& con) { - - host = _host; - port = _port; + connector = &con; connector->setInputHandler(this); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); +} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); connector->connect(host, port); - - // Open the special channel 0. channels[0] = &channel0; channel0.open(0, *this); - channel0.protocolInit(uid, pwd, virtualhost); + channel0.protocolInit(uid, pwd, vhost); + isOpen = true; } +void Connection::shutdown() { + close(); +} + void Connection::close( ReplyCode code, const string& msg, ClassId classId, MethodId methodId ) { - if(!closed) { + if(isOpen) { + // TODO aconway 2007-01-29: Exception handling - could end up + // partly closed. + isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( new ConnectionCloseBody( getVersion(), code, msg, classId, methodId)); + while(!channels.empty()) { + channels.begin()->second->close(); + channels.erase(channels.begin()); + } connector->close(); } } @@ -140,14 +156,4 @@ void Connection::idleOut(){ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); } -void Connection::shutdown(){ - closed = true; - //close all channels, also removes them from the map. - while(!channels.empty()){ - Channel* channel = channels.begin()->second; - if (channel != 0) - channel->close(); - } -} - }} // namespace qpid::client diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index 6ee9e62e47..6a9a76eed2 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -1,5 +1,5 @@ -#ifndef _Connection_ -#define _Connection_ +#ifndef _client_Connection_ +#define _client_Connection_ /* * @@ -89,19 +89,19 @@ class Connection : public ConnectionForChannel static framing::ChannelId channelIdCounter; static const std::string OK; - std::string host; - int port; + framing::ProtocolVersion version; const u_int32_t max_frame_size; - ChannelMap channels; + ChannelMap channels; + Connector defaultConnector; Connector* connector; framing::OutputHandler* out; - volatile bool closed; - framing::ProtocolVersion version; + volatile bool isOpen; void erase(framing::ChannelId); void channelException( Channel&, framing::AMQMethodBody*, const QpidError&); Channel channel0; + bool debug; // TODO aconway 2007-01-26: too many friendships, untagle these classes. friend class Channel; @@ -145,10 +145,10 @@ class Connection : public ConnectionForChannel * within a single broker). */ void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", const std::string& pwd = "guest", + const std::string& uid = "guest", + const std::string& pwd = "guest", const std::string& virtualhost = "/"); - /** * Close the connection with optional error information for the peer. * @@ -177,7 +177,10 @@ class Connection : public ConnectionForChannel void idleOut(); void idleIn(); void shutdown(); - + + /**\internal used for testing */ + void setConnector(Connector& connector); + /** * @return the maximum frame size in use on this connection */ @@ -187,8 +190,7 @@ class Connection : public ConnectionForChannel const framing::ProtocolVersion& getVersion() { return version; } }; -} -} +}} // namespace qpid::client #endif diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index 40663486f2..1126e861e0 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -37,13 +37,13 @@ namespace qpid { namespace client { -class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable +class Connector : public framing::OutputHandler, + private sys::Runnable { const bool debug; const int receive_buffer_size; const int send_buffer_size; - qpid::framing::ProtocolVersion version; + framing::ProtocolVersion version; bool closed; @@ -53,22 +53,22 @@ class Connector : public qpid::framing::OutputHandler, u_int32_t idleIn; u_int32_t idleOut; - qpid::sys::TimeoutHandler* timeoutHandler; - qpid::sys::ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; + sys::TimeoutHandler* timeoutHandler; + sys::ShutdownHandler* shutdownHandler; + framing::InputHandler* input; + framing::InitiationHandler* initialiser; + framing::OutputHandler* output; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; + framing::Buffer inbuf; + framing::Buffer outbuf; - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; + sys::Mutex writeLock; + sys::Thread receiver; - qpid::sys::Socket socket; + sys::Socket socket; void checkIdle(ssize_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); + void writeBlock(framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); void setSocketTimeout(); @@ -77,23 +77,22 @@ class Connector : public qpid::framing::OutputHandler, friend class Channel; public: - Connector(const qpid::framing::ProtocolVersion& pVersion, + Connector(const framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(); virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); - virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void setInputHandler(framing::InputHandler* handler); + virtual void setTimeoutHandler(sys::TimeoutHandler* handler); + virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual framing::OutputHandler* getOutputHandler(); + virtual void send(framing::AMQFrame* frame); virtual void setReadTimeout(u_int16_t timeout); virtual void setWriteTimeout(u_int16_t timeout); }; -} -} +}} #endif |