diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 6769e5383c..d53db20598 100644 --- a/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -31,6 +31,7 @@ #include "qpid/sys/SecuritySettings.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> #include <memory> #include <netdb.h> @@ -211,9 +212,10 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (readError) { return; } + size_t decoded = 0; try { if (codec) { - (void) codec->decode(buff->bytes(), buff->dataCount()); + decoded = codec->decode(buff->bytes(), buff->dataCount()); }else{ // Need to start protocol processing initProtocolIn(buff); @@ -228,7 +230,9 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { framing::Buffer in(buff->bytes(), buff->dataCount()); framing::ProtocolInitiation protocolInit; + size_t decoded = 0; if (protocolInit.decode(in)) { + decoded = in.getPosition(); QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -250,9 +254,10 @@ class RdmaIOProtocolFactory : public ProtocolFactory { public: RdmaIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); + void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; + string getHost() const; private: bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); @@ -342,7 +347,18 @@ uint16_t RdmaIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } +string RdmaIOProtocolFactory::getHost() const { + //return listener.getSockname(); + return ""; +} + void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + ::sockaddr_in sin; + + sin.sin_family = AF_INET; + sin.sin_port = htons(listeningPort); + sin.sin_addr.s_addr = INADDR_ANY; + listener.reset( new Rdma::Listener( Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), @@ -371,7 +387,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, - const std::string& host, const std::string& port, + const std::string& host, int16_t port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { @@ -383,7 +399,7 @@ void RdmaIOProtocolFactory::connect( boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); - SocketAddress sa(host, port); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); c->start(poller, sa); } |