/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /* message todo: authenticate; encrypt? */ #include "stdafx.h" #include "message.h" #include #include "../util/goodies.h" #include // if you want trace output: #define mmm(x) /* listener ------------------------------------------------------------------- */ void Listener::listen() { SockAddr me(port); int sock = socket(AF_INET, SOCK_STREAM, 0); if ( sock == INVALID_SOCKET ) { log() << "ERROR: listen(): invalid socket? " << errno << endl; return; } prebindOptions( sock ); if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { log() << "listen(): bind() failed errno:" << errno << endl; if ( errno == 98 ) log() << "98 == addr already in use" << endl; closesocket(sock); return; } if ( ::listen(sock, 128) != 0 ) { log() << "listen(): listen() failed " << errno << endl; closesocket(sock); return; } SockAddr from; while ( 1 ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if ( s < 0 ) { log() << "Listener: accept() returns " << s << " errno:" << errno << endl; continue; } disableNagle(s); log() << "connection accepted from " << from.toString() << endl; accepted( new MessagingPort(s, from) ); } } /* messagingport -------------------------------------------------------------- */ MSGID NextMsgId; struct MsgStart { MsgStart() { NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); assert(MsgDataHeaderSize == 16); } } msgstart; // we "new" this so it guaranteed to still be around when other automatic global vars // are being destructed during termination. set& ports = *(new set()); void closeAllSockets() { for ( set::iterator i = ports.begin(); i != ports.end(); i++ ) (*i)->shutdown(); } MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { ports.insert(this); } MessagingPort::MessagingPort() { ports.insert(this); sock = -1; } void MessagingPort::shutdown() { if ( sock >= 0 ) { closesocket(sock); sock = -1; } } MessagingPort::~MessagingPort() { shutdown(); ports.erase(this); } #include "../util/background.h" class ConnectBG : public BackgroundJob { public: int sock; int res; SockAddr farEnd; void run() { res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); } }; bool MessagingPort::connect(SockAddr& _far) { farEnd = _far; sock = socket(AF_INET, SOCK_STREAM, 0); if ( sock == INVALID_SOCKET ) { log() << "ERROR: connect(): invalid socket? " << errno << endl; return false; } #if 0 long fl = fcntl(sock, F_GETFL, 0); assert( fl >= 0 ); fl |= O_NONBLOCK; fcntl(sock, F_SETFL, fl); int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); if ( res ) { if ( errno == EINPROGRESS ) //log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl; closesocket(sock); sock = -1; return false; } #endif ConnectBG bg; bg.sock = sock; bg.farEnd = farEnd; bg.go(); // int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); if ( bg.wait(5000) ) { if ( bg.res ) { closesocket(sock); sock = -1; return false; } } else { // time out the connect closesocket(sock); sock = -1; bg.wait(); // so bg stays in scope until bg thread terminates return false; } disableNagle(sock); return true; } bool MessagingPort::recv(Message& m) { again: mmm( cout << "* recv() sock:" << this->sock << endl; ) int len = -1; char *lenbuf = (char *) &len; int lft = 4; while ( 1 ) { int x = ::recv(sock, lenbuf, lft, 0); if ( x == 0 ) { DEV cout << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; m.reset(); return false; } if ( x < 0 ) { log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString()<=len); MsgData *md = (MsgData *) malloc(z); md->len = len; if ( len <= 0 ) { cout << "got a length of " << len << ", something is wrong" << endl; return false; } char *p = (char *) &md->id; int left = len -4; while ( 1 ) { int x = ::recv(sock, p, left, 0); if ( x == 0 ) { DEV cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; m.reset(); return false; } if ( x < 0 ) { log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString() << endl; m.reset(); return false; } left -= x; p += x; if ( left <= 0 ) break; } m.setData(md, true); return true; } void MessagingPort::reply(Message& received, Message& response) { say(/*received.from, */response, received.data->id); } void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { say(/*received.from, */response, responseTo); } bool MessagingPort::call(Message& toSend, Message& response) { mmm( cout << "*call()" << endl; ) MSGID old = toSend.data->id; say(/*to,*/ toSend); while ( 1 ) { bool ok = recv(response); if ( !ok ) return false; //cout << "got response: " << response.data->responseTo << endl; if ( response.data->responseTo == toSend.data->id ) break; cout << "********************" << endl; cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl; cout << " old:" << old << endl; cout << " response msgid:" << response.data->id << endl; cout << " response len: " << response.data->len << endl; assert(false); response.reset(); } mmm( cout << "*call() end" << endl; ) return true; } void MessagingPort::say(Message& toSend, int responseTo) { mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) MSGID msgid = NextMsgId; ++NextMsgId; toSend.data->id = msgid; toSend.data->responseTo = responseTo; int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0); if ( x <= 0 ) { log() << "MessagingPort say send() error " << errno << ' ' << farEnd.toString() << endl; } }