summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/ClientChannel.cpp39
-rw-r--r--cpp/lib/client/ClientChannel.h8
-rw-r--r--cpp/lib/client/Connection.cpp60
-rw-r--r--cpp/lib/client/Connection.h26
-rw-r--r--cpp/lib/client/Connector.h43
5 files changed, 91 insertions, 85 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index b93596ebfc..a207763aac 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -85,13 +85,9 @@ void Channel::protocolInit(
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- connection->send(
- new AMQFrame(
- version, 0,
- new ConnectionTuneOkBody(
- version, proposal->getChannelMax(),
- connection->getMaxFrameSize(),
- proposal->getHeartbeat())));
+ (new ConnectionTuneOkBody(
+ version, proposal->getChannelMax(), connection->getMaxFrameSize(),
+ proposal->getHeartbeat()))->send(context);
u_int16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -100,9 +96,8 @@ void Channel::protocolInit(
// Send connection open.
std::string capabilities;
responses.expect();
- send(new AMQFrame(
- version, 0,
- new ConnectionOpenBody(version, vhost, capabilities, true)));
+ (new ConnectionOpenBody(version, vhost, capabilities, true))
+ ->send(context);
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
responses.waitForResponse();
@@ -213,7 +208,8 @@ void Channel::cancel(const std::string& tag, bool synch) {
if (i != consumers.end()) {
Consumer& c = i->second;
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ (new BasicAckBody(version, c.lastDeliveryTag, true))
+ ->send(context);
sendAndReceiveSync<BasicCancelOkBody>(
synch, new BasicCancelBody(version, tag, !synch));
consumers.erase(tag);
@@ -231,7 +227,8 @@ void Channel::cancelAll(){
// trying the rest. NB no memory leaks if we do,
// ConsumerMap holds values, not pointers.
//
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ (new BasicAckBody(version, c.lastDeliveryTag, true))
+ ->send(context);
}
}
}
@@ -251,9 +248,8 @@ void Channel::retrieve(Message& msg){
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
string name = queue.getName();
- AMQBody::shared_ptr body(new BasicGetBody(version, 0, name, ackMode));
responses.expect();
- send(body);
+ (new BasicGetBody(version, 0, name, ackMode))->send(context);
responses.waitForResponse();
AMQMethodBody::shared_ptr response = responses.getResponse();
if(response->isA<BasicGetOkBody>()) {
@@ -276,10 +272,12 @@ bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+ // FIXME aconway 2007-01-30: Rework for message class.
+
string e = exchange.getName();
string key = routingKey;
- send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
+ (new BasicPublishBody(version, 0, e, key, mandatory, immediate))->send(context);
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
@@ -428,7 +426,8 @@ void Channel::deliver(Consumer& consumer, Message& msg){
if(++(consumer.count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
+ (new BasicAckBody(version, msg.getDeliveryTag(), multiple))
+ ->send(context);
consumer.lastDeliveryTag = 0;
}
}
@@ -510,20 +509,20 @@ void Channel::closeInternal() {
dispatcher.join();
}
-void Channel::sendAndReceive(AMQBody* toSend, ClassId c, MethodId m)
+void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
{
responses.expect();
- send(toSend);
+ toSend->send(context);
responses.receive(c, m);
}
void Channel::sendAndReceiveSync(
- bool sync, AMQBody* body, ClassId c, MethodId m)
+ bool sync, AMQMethodBody* body, ClassId c, MethodId m)
{
if(sync)
sendAndReceive(body, c, m);
else
- send(body);
+ body->send(context);
}
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 67274ddfc4..a34c95d2c4 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -124,21 +124,21 @@ class Channel : public framing::ChannelAdapter,
const std::string& vhost);
void sendAndReceive(
- framing::AMQBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
void sendAndReceiveSync(
bool sync,
- framing::AMQBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQBody* body) {
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID);
return boost::shared_polymorphic_downcast<BodyType>(
responses.getResponse());
}
template <class BodyType> void sendAndReceiveSync(
- bool sync, framing::AMQBody* body) {
+ bool sync, framing::AMQMethodBody* body) {
sendAndReceiveSync(
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID);
}
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 19d5cce7db..bf6c44570d 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -27,6 +27,8 @@
#include <iostream>
#include <sstream>
#include <MethodBodyInstances.h>
+#include <boost/bind.hpp>
+#include <functional>
using namespace qpid::framing;
using namespace qpid::sys;
@@ -41,45 +43,59 @@ ChannelId Connection::channelIdCounter;
const std::string Connection::OK("OK");
Connection::Connection(
- bool debug, u_int32_t _max_frame_size,
+ bool _debug, u_int32_t _max_frame_size,
const framing::ProtocolVersion& _version
-) : max_frame_size(_max_frame_size), closed(true),
- version(_version)
-{
- connector = new Connector(version, debug, _max_frame_size);
-}
+) : version(_version), max_frame_size(_max_frame_size),
+ defaultConnector(version, debug, max_frame_size),
+ connector(&defaultConnector),
+ isOpen(false), debug(_debug)
+{}
Connection::~Connection(){
- delete connector;
+ close();
}
-void Connection::open(
- const std::string& _host, int _port, const std::string& uid,
- const std::string& pwd, const std::string& virtualhost)
+void Connection::setConnector(Connector& con)
{
-
- host = _host;
- port = _port;
+ connector = &con;
connector->setInputHandler(this);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
+}
+
+void Connection::open(
+ const std::string& host, int port,
+ const std::string& uid, const std::string& pwd, const std::string& vhost)
+{
+ if (isOpen)
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
connector->connect(host, port);
-
- // Open the special channel 0.
channels[0] = &channel0;
channel0.open(0, *this);
- channel0.protocolInit(uid, pwd, virtualhost);
+ channel0.protocolInit(uid, pwd, vhost);
+ isOpen = true;
}
+void Connection::shutdown() {
+ close();
+}
+
void Connection::close(
ReplyCode code, const string& msg, ClassId classId, MethodId methodId
)
{
- if(!closed) {
+ if(isOpen) {
+ // TODO aconway 2007-01-29: Exception handling - could end up
+ // partly closed.
+ isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
new ConnectionCloseBody(
getVersion(), code, msg, classId, methodId));
+ while(!channels.empty()) {
+ channels.begin()->second->close();
+ channels.erase(channels.begin());
+ }
connector->close();
}
}
@@ -140,14 +156,4 @@ void Connection::idleOut(){
out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
}
-void Connection::shutdown(){
- closed = true;
- //close all channels, also removes them from the map.
- while(!channels.empty()){
- Channel* channel = channels.begin()->second;
- if (channel != 0)
- channel->close();
- }
-}
-
}} // namespace qpid::client
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index 6ee9e62e47..6a9a76eed2 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -1,5 +1,5 @@
-#ifndef _Connection_
-#define _Connection_
+#ifndef _client_Connection_
+#define _client_Connection_
/*
*
@@ -89,19 +89,19 @@ class Connection : public ConnectionForChannel
static framing::ChannelId channelIdCounter;
static const std::string OK;
- std::string host;
- int port;
+ framing::ProtocolVersion version;
const u_int32_t max_frame_size;
- ChannelMap channels;
+ ChannelMap channels;
+ Connector defaultConnector;
Connector* connector;
framing::OutputHandler* out;
- volatile bool closed;
- framing::ProtocolVersion version;
+ volatile bool isOpen;
void erase(framing::ChannelId);
void channelException(
Channel&, framing::AMQMethodBody*, const QpidError&);
Channel channel0;
+ bool debug;
// TODO aconway 2007-01-26: too many friendships, untagle these classes.
friend class Channel;
@@ -145,10 +145,10 @@ class Connection : public ConnectionForChannel
* within a single broker).
*/
void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest", const std::string& pwd = "guest",
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
const std::string& virtualhost = "/");
-
/**
* Close the connection with optional error information for the peer.
*
@@ -177,7 +177,10 @@ class Connection : public ConnectionForChannel
void idleOut();
void idleIn();
void shutdown();
-
+
+ /**\internal used for testing */
+ void setConnector(Connector& connector);
+
/**
* @return the maximum frame size in use on this connection
*/
@@ -187,8 +190,7 @@ class Connection : public ConnectionForChannel
const framing::ProtocolVersion& getVersion() { return version; }
};
-}
-}
+}} // namespace qpid::client
#endif
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index 40663486f2..1126e861e0 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -37,13 +37,13 @@ namespace qpid {
namespace client {
-class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
+class Connector : public framing::OutputHandler,
+ private sys::Runnable
{
const bool debug;
const int receive_buffer_size;
const int send_buffer_size;
- qpid::framing::ProtocolVersion version;
+ framing::ProtocolVersion version;
bool closed;
@@ -53,22 +53,22 @@ class Connector : public qpid::framing::OutputHandler,
u_int32_t idleIn;
u_int32_t idleOut;
- qpid::sys::TimeoutHandler* timeoutHandler;
- qpid::sys::ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
+ sys::TimeoutHandler* timeoutHandler;
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
+ framing::Buffer inbuf;
+ framing::Buffer outbuf;
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
+ sys::Mutex writeLock;
+ sys::Thread receiver;
- qpid::sys::Socket socket;
+ sys::Socket socket;
void checkIdle(ssize_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeBlock(framing::AMQDataBlock* data);
void writeToSocket(char* data, size_t available);
void setSocketTimeout();
@@ -77,23 +77,22 @@ class Connector : public qpid::framing::OutputHandler,
friend class Channel;
public:
- Connector(const qpid::framing::ProtocolVersion& pVersion,
+ Connector(const framing::ProtocolVersion& pVersion,
bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init();
virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setInputHandler(framing::InputHandler* handler);
+ virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+ virtual framing::OutputHandler* getOutputHandler();
+ virtual void send(framing::AMQFrame* frame);
virtual void setReadTimeout(u_int16_t timeout);
virtual void setWriteTimeout(u_int16_t timeout);
};
-}
-}
+}}
#endif