summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Final/cpp/lib/client/Connector.cpp')
-rw-r--r--Final/cpp/lib/client/Connector.cpp195
1 files changed, 195 insertions, 0 deletions
diff --git a/Final/cpp/lib/client/Connector.cpp b/Final/cpp/lib/client/Connector.cpp
new file mode 100644
index 0000000000..a99360b840
--- /dev/null
+++ b/Final/cpp/lib/client/Connector.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <QpidError.h>
+#include <sys/Time.h>
+#include "Connector.h"
+
+using namespace qpid::sys;
+using namespace qpid::client;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ version(pVersion),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size){ }
+
+Connector::~Connector(){ }
+
+void Connector::connect(const std::string& host, int port, bool tcpNoDelay){
+ socket = Socket::createTcp();
+ if (tcpNoDelay) {
+ socket.setTcpNoDelay(true);
+ }
+ socket.connect(host, port);
+ closed = false;
+ receiver = Thread(this);
+}
+
+void Connector::init(ProtocolInitiation* header){
+ writeBlock(header);
+ delete header;
+}
+
+void Connector::close(){
+ if (markClosed()) {
+ socket.close();
+ receiver.join();
+ }
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* frame){
+ writeBlock(frame);
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+ delete frame;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ Mutex::ScopedLock l(writeLock);
+ data->encode(outbuf);
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ size_t written = 0;
+ while(written < available && !closed){
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = now() * TIME_MSEC;
+ written += sent;
+ }
+ }
+}
+
+void Connector::handleClosed(){
+ if (markClosed()) {
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }
+}
+
+bool Connector::markClosed(){
+ if (closed) {
+ return false;
+ } else {
+ closed = true;
+ return true;
+ }
+}
+
+void Connector::checkIdle(ssize_t status){
+ if(timeoutHandler){
+ Time t = now() * TIME_MSEC;
+ if(status == Socket::SOCKET_TIMEOUT) {
+ if(idleIn && (t - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }else if(status == Socket::SOCKET_EOF){
+ handleClosed();
+ }else{
+ lastIn = t;
+ }
+ if(idleOut && (t - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(u_int16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(u_int16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ socket.setTimeout(timeout*TIME_MSEC);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(!closed && received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame(version);
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg
+ << " (" << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
+ handleClosed();
+ }
+}