summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/Connector.cpp92
-rw-r--r--cpp/src/qpid/client/Connector.h11
2 files changed, 27 insertions, 76 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 08dae4105d..b25f19e4ba 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -25,12 +25,6 @@
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-
namespace qpid {
namespace client {
@@ -49,6 +43,7 @@ Connector::Connector(
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
+ inbuf(receive_buffer_size),
outbuf(send_buffer_size)
{ }
@@ -61,7 +56,6 @@ Connector::~Connector(){
void Connector::connect(const std::string& host, int port){
socket.connect(host, port);
closed = false;
- poller = Poller::shared_ptr(new Poller);
receiver = Thread(this);
}
@@ -74,7 +68,7 @@ void Connector::init(){
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
- poller->shutdown();
+ socket.close();
closed = true;
return true;
}
@@ -97,8 +91,6 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO
-// framework.
void Connector::send(AMQFrame& frame){
writeBlock(&frame);
QPID_LOG(trace, "SENT: " << frame);
@@ -129,10 +121,6 @@ void Connector::handleClosed() {
shutdownHandler->shutdown();
}
-// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
-// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
-//
-// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
AbsTime t = now();
@@ -178,65 +166,33 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-
-// Buffer definition
-struct Buff : public AsynchIO::Buffer {
- Buff() :
- AsynchIO::Buffer(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
-};
-
-void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
-
- AMQFrame frame(version);
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
-}
-
-void Connector::eof(AsynchIO&) {
- handleClosed();
-}
-
-// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
-// will never be called
void Connector::run(){
- try {
- Dispatcher d(poller);
-
- AsynchIO* aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1));
-
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
+ try{
+ while(!closed){
+ ssize_t available = inbuf.available();
+ if(available < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
+
+ if(!closed && received > 0){
+ inbuf.move(received);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame(version);
+ while(frame.decode(inbuf)){
+ QPID_LOG(trace, "RECV: " << frame);
+ input->received(frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
}
-
- aio->start(poller);
- d.run();
- aio->queueForDeletion();
- socket.close();
- } catch (const std::exception& e) {
+ }
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}
}
-
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 82c9db2ef1..1577564d57 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -34,10 +34,9 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/AsynchIO.h"
namespace qpid {
-
+
namespace client {
class Connector : public framing::OutputHandler,
@@ -63,6 +62,7 @@ class Connector : public framing::OutputHandler,
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
+ framing::Buffer inbuf;
framing::Buffer outbuf;
sys::Mutex writeLock;
@@ -70,8 +70,6 @@ class Connector : public framing::OutputHandler,
sys::Socket socket;
- sys::Poller::shared_ptr poller;
-
void checkIdle(ssize_t status);
void writeBlock(framing::AMQDataBlock* data);
void writeToSocket(char* data, size_t available);
@@ -80,10 +78,7 @@ class Connector : public framing::OutputHandler,
void run();
void handleClosed();
bool closeInternal();
-
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*);
- void eof(qpid::sys::AsynchIO&);
-
+
friend class Channel;
public:
Connector(framing::ProtocolVersion pVersion,