diff options
Diffstat (limited to 'qpid/cpp/lib/client/Connection.cpp')
-rw-r--r-- | qpid/cpp/lib/client/Connection.cpp | 242 |
1 files changed, 0 insertions, 242 deletions
diff --git a/qpid/cpp/lib/client/Connection.cpp b/qpid/cpp/lib/client/Connection.cpp deleted file mode 100644 index ad8aa1d0dd..0000000000 --- a/qpid/cpp/lib/client/Connection.cpp +++ /dev/null @@ -1,242 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Connection.h> -#include <ClientChannel.h> -#include <ClientMessage.h> -#include <QpidError.h> -#include <iostream> -#include <MethodBodyInstances.h> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::sys; - -u_int16_t Connection::channelIdCounter; - -Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true), - version(_version->getMajor(),_version->getMinor()) -{ - connector = new Connector(version, debug, _max_frame_size); -} - -Connection::~Connection(){ - delete connector; -} - -void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ - host = _host; - port = _port; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); - connector->connect(host, port); - - ProtocolInitiation* header = new ProtocolInitiation(version); - responses.expect(); - connector->init(header); - responses.receive(method_bodies.connection_start); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - responses.receive(method_bodies.connection_tune); - - ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); - out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); - - u_int16_t heartbeat = proposal->getHeartbeat(); - connector->setReadTimeout(heartbeat * 2); - connector->setWriteTimeout(heartbeat); - - //send connection.open - string capabilities; - string vhost = virtualhost; - responses.expect(); - out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). - responses.waitForResponse(); - if(responses.validate(method_bodies.connection_open_ok)){ - //ok - }else if(responses.validate(method_bodies.connection_redirect)){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() << std::endl; - }else{ - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } - -} - -void Connection::close(){ - if(!closed){ - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); - connector->close(); - } -} - -void Connection::openChannel(Channel* channel){ - channel->con = this; - channel->id = ++channelIdCounter; - channel->out = out; - channels[channel->id] = channel; - //now send frame to open channel and wait for response - string oob; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); - channel->setQos(); - channel->closed = false; -} - -void Connection::closeChannel(Channel* channel){ - //send frame to close channel - u_int16_t code(200); - string text("Ok"); - u_int16_t classId(0); - u_int16_t methodId(0); - closeChannel(channel, code, text, classId, methodId); -} - -void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ - //send frame to close channel - channel->cancelAll(); - channel->closed = true; - channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); - channel->con = 0; - channel->out = 0; - removeChannel(channel); -} - -void Connection::removeChannel(Channel* channel){ - //send frame to close channel - - channels.erase(channel->id); - channel->out = 0; - channel->id = 0; - channel->con = 0; -} - -void Connection::received(AMQFrame* frame){ - u_int16_t channelId = frame->getChannel(); - - if(channelId == 0){ - this->handleBody(frame->getBody()); - }else{ - Channel* channel = channels[channelId]; - if(channel == 0){ - error(504, "Unknown channel"); - }else{ - try{ - channel->handleBody(frame->getBody()); - }catch(qpid::QpidError e){ - channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } - } - } -} - -void Connection::handleMethod(AMQMethodBody::shared_ptr body){ - //connection.close, basic.deliver, basic.return or a response to a synchronous request - if(responses.isWaiting()){ - responses.signalResponse(body); - }else if(method_bodies.connection_close.match(body.get())){ - //send back close ok - //close socket - ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); - std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; - connector->close(); - }else{ - std::cout << "Unhandled method for connection: " << *body << std::endl; - error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); - } -} - -void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ - error(504, "Channel error: received header body with channel 0."); -} - -void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ - error(504, "Channel error: received content body with channel 0."); -} - -void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ -} - -void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ - responses.expect(); - out->send(frame); - responses.receive(body); -} - -void Connection::error(int code, const string& msg, int classid, int methodid){ - std::cout << "Connection exception generated: " << code << msg; - if(classid || methodid){ - std::cout << " [" << methodid << ":" << classid << "]"; - } - std::cout << std::endl; - sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); - connector->close(); -} - -void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; - int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0){ - closeChannel(channel, code, msg); - }else{ - closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); - } -} - -void Connection::idleIn(){ - std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); -} - -void Connection::shutdown(){ - closed = true; - //close all channels - for(iterator i = channels.begin(); i != channels.end(); i++){ - i->second->stop(); - } -} |