diff options
Diffstat (limited to 'cpp/src/qpid/client/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 93f170742a..de324fdab4 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -23,6 +23,7 @@ #include <qpid/client/Message.h> #include <qpid/QpidError.h> #include <iostream> +#include <qpid/client/MethodBodyInstances.h> using namespace qpid::client; using namespace qpid::framing; @@ -31,7 +32,11 @@ using namespace qpid::sys; u_int16_t Connection::channelIdCounter; -Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) +{ connector = new Connector(debug, _max_frame_size); } @@ -51,14 +56,14 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui ProtocolInitiation* header = new ProtocolInitiation(8, 0); responses.expect(); connector->init(header); - responses.receive(connection_start); + responses.receive(method_bodies.connection_start); FieldTable props; string mechanism("PLAIN"); string response = ((char)0) + uid + ((char)0) + pwd; string locale("en_US"); responses.expect(); - out->send(new AMQFrame(0, new ConnectionStartOkBody(props, mechanism, response, locale))); + out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); /** * Assume for now that further challenges will not be required @@ -68,10 +73,10 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - responses.receive(connection_tune); + responses.receive(method_bodies.connection_tune); ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(0, new ConnectionTuneOkBody(proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); u_int16_t heartbeat = proposal->getHeartbeat(); connector->setReadTimeout(heartbeat * 2); @@ -81,12 +86,12 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui string capabilities; string vhost = virtualhost; responses.expect(); - out->send(new AMQFrame(0, new ConnectionOpenBody(vhost, capabilities, true))); + out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). responses.waitForResponse(); - if(responses.validate(connection_open_ok)){ + if(responses.validate(method_bodies.connection_open_ok)){ //ok - }else if(responses.validate(connection_redirect)){ + }else if(responses.validate(method_bodies.connection_redirect)){ //ignore for now ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); std::cout << "Received redirection to " << redirect->getHost() << std::endl; @@ -103,7 +108,7 @@ void Connection::close(){ u_int16_t classId(0); u_int16_t methodId(0); - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, text, classId, methodId)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); } } @@ -115,7 +120,7 @@ void Connection::openChannel(Channel* channel){ channels[channel->id] = channel; //now send frame to open channel and wait for response string oob; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(oob)), channel_open_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); channel->setQos(); channel->closed = false; } @@ -133,7 +138,7 @@ void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_ //send frame to close channel channel->cancelAll(); channel->closed = true; - channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(code, text, classId, methodId)), channel_close_ok); + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); channel->con = 0; channel->out = 0; removeChannel(channel); @@ -171,7 +176,7 @@ void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); - }else if(connection_close.match(body.get())){ + }else if(method_bodies.connection_close.match(body.get())){ //send back close ok //close socket ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); @@ -206,7 +211,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){ std::cout << " [" << methodid << ":" << classid << "]"; } std::cout << std::endl; - sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(code, msg, classid, methodid)), connection_close_ok); + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); connector->close(); } |