summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/client/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/lib/client/Connection.cpp')
-rw-r--r--qpid/cpp/lib/client/Connection.cpp242
1 files changed, 0 insertions, 242 deletions
diff --git a/qpid/cpp/lib/client/Connection.cpp b/qpid/cpp/lib/client/Connection.cpp
deleted file mode 100644
index ad8aa1d0dd..0000000000
--- a/qpid/cpp/lib/client/Connection.cpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <Connection.h>
-#include <ClientChannel.h>
-#include <ClientMessage.h>
-#include <QpidError.h>
-#include <iostream>
-#include <MethodBodyInstances.h>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace qpid::sys;
-
-u_int16_t Connection::channelIdCounter;
-
-Connection::Connection( bool debug, u_int32_t _max_frame_size, qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size), closed(true),
- version(_version->getMajor(),_version->getMinor())
-{
- connector = new Connector(version, debug, _max_frame_size);
-}
-
-Connection::~Connection(){
- delete connector;
-}
-
-void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
- host = _host;
- port = _port;
- connector->setInputHandler(this);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
- out = connector->getOutputHandler();
- connector->connect(host, port);
-
- ProtocolInitiation* header = new ProtocolInitiation(version);
- responses.expect();
- connector->init(header);
- responses.receive(method_bodies.connection_start);
-
- FieldTable props;
- string mechanism("PLAIN");
- string response = ((char)0) + uid + ((char)0) + pwd;
- string locale("en_US");
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
-
- /**
- * Assume for now that further challenges will not be required
- //receive connection.secure
- responses.receive(connection_secure));
- //send connection.secure-ok
- out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
- **/
-
- responses.receive(method_bodies.connection_tune);
-
- ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
- out->send(new AMQFrame(version, 0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
-
- u_int16_t heartbeat = proposal->getHeartbeat();
- connector->setReadTimeout(heartbeat * 2);
- connector->setWriteTimeout(heartbeat);
-
- //send connection.open
- string capabilities;
- string vhost = virtualhost;
- responses.expect();
- out->send(new AMQFrame(version, 0, new ConnectionOpenBody(version, vhost, capabilities, true)));
- //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
- responses.waitForResponse();
- if(responses.validate(method_bodies.connection_open_ok)){
- //ok
- }else if(responses.validate(method_bodies.connection_redirect)){
- //ignore for now
- ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost() << std::endl;
- }else{
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
- }
-
-}
-
-void Connection::close(){
- if(!closed){
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
-
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
- connector->close();
- }
-}
-
-void Connection::openChannel(Channel* channel){
- channel->con = this;
- channel->id = ++channelIdCounter;
- channel->out = out;
- channels[channel->id] = channel;
- //now send frame to open channel and wait for response
- string oob;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
- channel->setQos();
- channel->closed = false;
-}
-
-void Connection::closeChannel(Channel* channel){
- //send frame to close channel
- u_int16_t code(200);
- string text("Ok");
- u_int16_t classId(0);
- u_int16_t methodId(0);
- closeChannel(channel, code, text, classId, methodId);
-}
-
-void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
- //send frame to close channel
- channel->cancelAll();
- channel->closed = true;
- channel->sendAndReceive(new AMQFrame(version, channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
- channel->con = 0;
- channel->out = 0;
- removeChannel(channel);
-}
-
-void Connection::removeChannel(Channel* channel){
- //send frame to close channel
-
- channels.erase(channel->id);
- channel->out = 0;
- channel->id = 0;
- channel->con = 0;
-}
-
-void Connection::received(AMQFrame* frame){
- u_int16_t channelId = frame->getChannel();
-
- if(channelId == 0){
- this->handleBody(frame->getBody());
- }else{
- Channel* channel = channels[channelId];
- if(channel == 0){
- error(504, "Unknown channel");
- }else{
- try{
- channel->handleBody(frame->getBody());
- }catch(qpid::QpidError e){
- channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
- }
- }
- }
-}
-
-void Connection::handleMethod(AMQMethodBody::shared_ptr body){
- //connection.close, basic.deliver, basic.return or a response to a synchronous request
- if(responses.isWaiting()){
- responses.signalResponse(body);
- }else if(method_bodies.connection_close.match(body.get())){
- //send back close ok
- //close socket
- ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
- std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
- connector->close();
- }else{
- std::cout << "Unhandled method for connection: " << *body << std::endl;
- error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
- }
-}
-
-void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
- error(504, "Channel error: received header body with channel 0.");
-}
-
-void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
- error(504, "Channel error: received content body with channel 0.");
-}
-
-void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
-}
-
-void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
- responses.expect();
- out->send(frame);
- responses.receive(body);
-}
-
-void Connection::error(int code, const string& msg, int classid, int methodid){
- std::cout << "Connection exception generated: " << code << msg;
- if(classid || methodid){
- std::cout << " [" << methodid << ":" << classid << "]";
- }
- std::cout << std::endl;
- sendAndReceive(new AMQFrame(version, 0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
- connector->close();
-}
-
-void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
- int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
- string msg = e.msg;
- if(method == 0){
- closeChannel(channel, code, msg);
- }else{
- closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void Connection::idleIn(){
- std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
- connector->close();
-}
-
-void Connection::idleOut(){
- out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
-}
-
-void Connection::shutdown(){
- closed = true;
- //close all channels
- for(iterator i = channels.begin(); i != channels.end(); i++){
- i->second->stop();
- }
-}