summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-01-09 19:44:50 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-01-09 19:44:50 +0000
commit879413783bf64537e3a1c7d036e2fb34700cc4e5 (patch)
treedd10e99b938ed82523bf878d05edcc6e06f90231 /cpp/lib/client
parentcb148a3cf74760e2af234896825cc117f13c506e (diff)
downloadqpid-python-879413783bf64537e3a1c7d036e2fb34700cc4e5.tar.gz
Most of remaining version changes for C++. Still need to deal with AMQFrame
defualt constructor and do some clean up here and there.. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/ClientChannel.cpp38
-rw-r--r--cpp/lib/client/Connection.cpp18
-rw-r--r--cpp/lib/client/Connector.cpp7
-rw-r--r--cpp/lib/client/Connector.h4
4 files changed, 35 insertions, 32 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index c7b8e39ae5..3d0b547b07 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -56,9 +56,9 @@ void Channel::setPrefetch(u_int16_t _prefetch){
void Channel::setQos(){
// AMQP version management change - kpvdr 2006-11-20
// TODO: Make this class version-aware and link these hard-wired numbers to that version
- sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ sendAndReceive(new AMQFrame(version, id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
if(transactional){
- sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ sendAndReceive(new AMQFrame(version, id, new TxSelectBody(version)), method_bodies.tx_select_ok);
}
}
@@ -66,7 +66,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
if(synch){
sendAndReceive(frame, method_bodies.exchange_declare_ok);
}else{
@@ -76,7 +76,7 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
- AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new ExchangeDeleteBody(version, 0, name, false, !synch));
if(synch){
sendAndReceive(frame, method_bodies.exchange_delete_ok);
}else{
@@ -87,7 +87,7 @@ void Channel::deleteExchange(Exchange& exchange, bool synch){
void Channel::declareQueue(Queue& queue, bool synch){
string name = queue.getName();
FieldTable args;
- AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeclareBody(version, 0, name, false, false,
queue.isExclusive(),
queue.isAutoDelete(), !synch, args));
if(synch){
@@ -105,7 +105,7 @@ void Channel::declareQueue(Queue& queue, bool synch){
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
//ticket, queue, ifunused, ifempty, nowait
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
if(synch){
sendAndReceive(frame, method_bodies.queue_delete_ok);
}else{
@@ -116,7 +116,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
+ AMQFrame* frame = new AMQFrame(version, id, new QueueBindBody(version, 0, q, e, key,!synch, args));
if(synch){
sendAndReceive(frame, method_bodies.queue_bind_ok);
}else{
@@ -130,7 +130,7 @@ void Channel::consume(
{
string q = queue.getName();
AMQFrame* frame =
- new AMQFrame(
+ new AMQFrame(version,
id,
new BasicConsumeBody(
version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
@@ -152,10 +152,10 @@ void Channel::consume(
void Channel::cancel(std::string& tag, bool synch){
Consumer* c = consumers[tag];
if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(version, c->lastDeliveryTag, true)));
}
- AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
+ AMQFrame* frame = new AMQFrame(version, id, new BasicCancelBody(version, (string&) tag, !synch));
if(synch){
sendAndReceive(frame, method_bodies.basic_cancel_ok);
}else{
@@ -171,7 +171,7 @@ void Channel::cancelAll(){
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
Consumer* c = i->second;
if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
- out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(c->lastDeliveryTag, true)));
}
consumers.erase(i);
delete c;
@@ -193,7 +193,7 @@ void Channel::retrieve(Message& msg){
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
string name = queue.getName();
- AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
+ AMQFrame* frame = new AMQFrame(version, id, new BasicGetBody(version, 0, name, ackMode));
responses.expect();
out->send(frame);
responses.waitForResponse();
@@ -219,25 +219,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
string e = exchange.getName();
string key = routingKey;
- out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
//break msg up into header frame and content frame(s) and send these
string data = msg.getData();
msg.header->setContentSize(data.length());
AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
- out->send(new AMQFrame(id, body));
+ out->send(new AMQFrame(version, id, body));
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
if(data_length < frag_size){
- out->send(new AMQFrame(id, new AMQContentBody(data)));
+ out->send(new AMQFrame(version, id, new AMQContentBody(data)));
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- out->send(new AMQFrame(id, new AMQContentBody(frag)));
+ out->send(new AMQFrame(version, id, new AMQContentBody(frag)));
offset += length;
remaining = data_length - offset;
@@ -247,12 +247,12 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string&
}
void Channel::commit(){
- AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
+ AMQFrame* frame = new AMQFrame(version, id, new TxCommitBody(version));
sendAndReceive(frame, method_bodies.tx_commit_ok);
}
void Channel::rollback(){
- AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
+ AMQFrame* frame = new AMQFrame(version, id, new TxRollbackBody(version));
sendAndReceive(frame, method_bodies.tx_rollback_ok);
}
@@ -377,7 +377,7 @@ void Channel::deliver(Consumer* consumer, Message& msg){
if(++(consumer->count) < prefetch) break;
//else drop-through
case AUTO_ACK:
- out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+ out->send(new AMQFrame(version, id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
consumer->lastDeliveryTag = 0;
}
}
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 78aeafb37b..ad8aa1d0dd 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -35,7 +35,7 @@ u_int16_t Connection::channelIdCounter;
Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
version(_version->getMajor(),_version->getMinor())
{
- connector = new Connector(debug, _max_frame_size);
+ connector = new Connector(version, debug, _max_frame_size);
}
Connection::~Connection(){
@@ -61,7 +61,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
string response = ((char)0) + uid + ((char)0) + pwd;
string locale("en_US");
responses.expect();
- out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
+ out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -74,7 +74,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
responses.receive(method_bodies.connection_tune);
ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+ out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
u_int16_t heartbeat = proposal->getHeartbeat();
connector->setReadTimeout(heartbeat * 2);
@@ -84,7 +84,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
string capabilities;
string vhost = virtualhost;
responses.expect();
- out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
+ out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
responses.waitForResponse();
if(responses.validate(method_bodies.connection_open_ok)){
@@ -106,7 +106,7 @@ void Connection::close(){
u_int16_t classId(0);
u_int16_t methodId(0);
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
connector->close();
}
}
@@ -118,7 +118,7 @@ void Connection::openChannel(Channel* channel){
channels[channel->id] = channel;
//now send frame to open channel and wait for response
string oob;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
channel->setQos();
channel->closed = false;
}
@@ -136,7 +136,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_
//send frame to close channel
channel->cancelAll();
channel->closed = true;
- channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
+ channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
channel->con = 0;
channel->out = 0;
removeChannel(channel);
@@ -209,7 +209,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){
std::cout << " [" << methodid << ":" << classid << "]";
}
std::cout << std::endl;
- sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
+ sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
connector->close();
}
@@ -230,7 +230,7 @@ void Connection::idleIn(){
}
void Connection::idleOut(){
- out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+ out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
}
void Connection::shutdown(){
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index 2bd77c1bcd..b34e66fd94 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -28,10 +28,11 @@ using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(bool _debug, u_int32_t buffer_size) :
- debug(_debug),
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
+ version(pVersion),
closed(true),
lastIn(0), lastOut(0),
timeout(0),
@@ -162,7 +163,7 @@ void Connector::run(){
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
- AMQFrame frame;
+ AMQFrame frame(version);
while(frame.decode(inbuf)){
if(debug) std::cout << "RECV: " << frame << std::endl;
input->received(&frame);
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index c64472bd53..f9e50f3216 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -26,6 +26,7 @@
#include <framing/OutputHandler.h>
#include <framing/InitiationHandler.h>
#include <framing/ProtocolInitiation.h>
+#include <ProtocolVersion.h>
#include <sys/ShutdownHandler.h>
#include <sys/TimeoutHandler.h>
#include <sys/Thread.h>
@@ -41,6 +42,7 @@ namespace client {
const bool debug;
const int receive_buffer_size;
const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
bool closed;
@@ -73,7 +75,7 @@ namespace client {
void handleClosed();
public:
- Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init(qpid::framing::ProtocolInitiation* header);