summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2007-11-05 11:29:12 -0500
committerDwight <dmerriman@gmail.com>2007-11-05 11:29:12 -0500
commit8f932515bfc74003794481e619131c908b16bf79 (patch)
tree213c194eb53818129a3964e08fc61bc17918a0c8
parent5c82a67f35083385f29ca8bc9794d623abd5d990 (diff)
downloadmongo-8f932515bfc74003794481e619131c908b16bf79.tar.gz
long msgs work
-rw-r--r--db/db.cpp12
-rw-r--r--grid/message.cpp13
-rw-r--r--grid/message.h4
3 files changed, 23 insertions, 6 deletions
diff --git a/db/db.cpp b/db/db.cpp
index 6938aab8ea8..cd69883732d 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -265,7 +265,8 @@ void msg(const char *m) {
p.init(29999);
// SockAddr db("127.0.0.1", MessagingPort::DBPort);
- SockAddr db("10.0.21.60", MessagingPort::DBPort);
+// SockAddr db("10.0.21.60", MessagingPort::DBPort);
+ SockAddr db("172.16.0.179", MessagingPort::DBPort);
Message send;
Message response;
@@ -288,6 +289,7 @@ int main(int argc, char* argv[], char *envp[] )
{
quicktest();
+
if( argc >= 2 ) {
if( strcmp(argv[1], "quicktest") == 0 )
return 0;
@@ -299,6 +301,13 @@ int main(int argc, char* argv[], char *envp[] )
run();
return 0;
}
+ if( strcmp(argv[1], "longmsg") == 0 ) {
+ char buf[4096];
+ memset(buf, 'a', 4095);
+ buf[4095] = 0;
+ msg(buf);
+ return 0;
+ }
}
cout << "usage:\n";
@@ -306,6 +315,7 @@ int main(int argc, char* argv[], char *envp[] )
cout << " msg [msg] send a request to the db server" << endl;
cout << " msg end shut down" << endl;
cout << " run run db" << endl;
+ cout << " longmsg send a long test message to the db server" << endl;
return 0;
}
diff --git a/grid/message.cpp b/grid/message.cpp
index 48b91d90bc8..617925f268e 100644
--- a/grid/message.cpp
+++ b/grid/message.cpp
@@ -19,6 +19,7 @@ struct Fragment {
short fragmentNo;
char data[1];
int fragmentDataLen() { return fragmentLen - 8; }
+ char* fragmentData() { return data; }
bool ok(int nRead) {
if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
@@ -77,6 +78,7 @@ bool MessagingPort::recv(Message& m) {
/* we'll need to read more */
char *msgData = (char *) malloc(totalLen);
+ m.setData((MsgData*) msgData, true);
char *p = msgData;
memcpy(p, somd, ff->fragmentDataLen());
int sofar = ff->fragmentDataLen();
@@ -94,8 +96,12 @@ bool MessagingPort::recv(Message& m) {
Fragment *f = (Fragment *) b;
if( !f->ok(n) )
return false;
- if( f->msgId != msgid || f->fragmentNo != expectedFragmentNo ) {
- cout << "bad fragment" << endl;
+ if( f->msgId != msgid ) {
+ cout << "bad fragment, wrong msg id, expected:" << msgid << " got:" << f->msgId << endl;
+ return false;
+ }
+ if( f->fragmentNo != expectedFragmentNo ) {
+ cout << "bad fragment, wrong fragmentNo, expected:" << expectedFragmentNo << " got:" << f->fragmentNo << endl;
return false;
}
if( from != m.from ) {
@@ -104,7 +110,7 @@ bool MessagingPort::recv(Message& m) {
return false;
}
- memcpy(p, f->startOfMsgData(), f->fragmentDataLen());
+ memcpy(p, f->fragmentData(), f->fragmentDataLen());
p += f->fragmentDataLen();
wanted -= f->fragmentDataLen();
expectedFragmentNo++;
@@ -149,5 +155,6 @@ void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
p += l;
left -= l;
conn.sendto(buf, l+8, to);
+ f->fragmentNo++;
}
}
diff --git a/grid/message.h b/grid/message.h
index b2ba18e7ce9..83ced78ed11 100644
--- a/grid/message.h
+++ b/grid/message.h
@@ -62,7 +62,7 @@ inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
class Message {
public:
- Message() { data = 0; }
+ Message() { data = 0; freeIt = false; }
~Message() { reset(); }
SockAddr from;
@@ -71,7 +71,7 @@ public:
void reset() {
if( freeIt && data )
free(data);
- data = 0;
+ data = 0; freeIt = false;
}
void setData(MsgData *d, bool _freeIt) {