diff options
author | unknown <mysql@mc04.(none)> | 2004-09-20 12:03:18 +0200 |
---|---|---|
committer | unknown <mysql@mc04.(none)> | 2004-09-20 12:03:18 +0200 |
commit | 48e56f47a62829480803a7bdfe46d6ebbea17c47 (patch) | |
tree | 46f14c64edbc5848886ed45427b38d0937b74c06 | |
parent | a562315e3fc1fb84ac1352960b79fb21e39ca48d (diff) | |
download | mariadb-git-48e56f47a62829480803a7bdfe46d6ebbea17c47.tar.gz |
Restored old shared memory buffer implementation (used by SCI and SHM).
Improved Default SCI config params
Added missing SCI libraries in ndb_mgm and atrt
Added max of 1024 signals per receive on transporter (to improve
real-time bahaviour and to ensure no job buffer explosion, still
some more work left on avoiding job buffer explosion in the general
case)
ndb/src/common/transporter/Packer.cpp:
Fix for job buffer explosion and real-time behaviour also in
high load scenarios.
ndb/src/common/transporter/SCI_Transporter.cpp:
Restored old Shared memory buffer implementation.
Changed condition slightly on when to send SCI buffer.
ndb/src/common/transporter/SCI_Transporter.hpp:
Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Buffer.hpp:
Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Transporter.cpp:
Changed back to old shared memory implementation
ndb/src/common/transporter/SHM_Transporter.hpp:
Changed back to old shared memory implementation
ndb/src/common/transporter/TransporterRegistry.cpp:
Changed back to old shared memory implementation
ndb/src/kernel/vm/FastScheduler.hpp:
Spelling error
ndb/src/mgmclient/Makefile.am:
Missing SCI library
ndb/src/mgmsrv/ConfigInfo.cpp:
Changed to more proper config parameters
ndb/test/run-test/Makefile.am:
Added missing SCI library
-rw-r--r-- | ndb/src/common/transporter/Packer.cpp | 21 | ||||
-rw-r--r-- | ndb/src/common/transporter/SCI_Transporter.cpp | 13 | ||||
-rw-r--r-- | ndb/src/common/transporter/SCI_Transporter.hpp | 9 | ||||
-rw-r--r-- | ndb/src/common/transporter/SHM_Buffer.hpp | 26 | ||||
-rw-r--r-- | ndb/src/common/transporter/SHM_Transporter.cpp | 9 | ||||
-rw-r--r-- | ndb/src/common/transporter/SHM_Transporter.hpp | 8 | ||||
-rw-r--r-- | ndb/src/common/transporter/TransporterRegistry.cpp | 16 | ||||
-rw-r--r-- | ndb/src/kernel/vm/FastScheduler.hpp | 2 | ||||
-rw-r--r-- | ndb/src/mgmclient/Makefile.am | 2 | ||||
-rw-r--r-- | ndb/src/mgmsrv/ConfigInfo.cpp | 4 | ||||
-rw-r--r-- | ndb/test/run-test/Makefile.am | 2 |
11 files changed, 45 insertions, 67 deletions
diff --git a/ndb/src/common/transporter/Packer.cpp b/ndb/src/common/transporter/Packer.cpp index 645517a4b1a..9eba335330d 100644 --- a/ndb/src/common/transporter/Packer.cpp +++ b/ndb/src/common/transporter/Packer.cpp @@ -21,6 +21,7 @@ #include <TransporterCallback.hpp> #include <RefConvert.hpp> +#define MAX_RECEIVED_SIGNALS 1024 Uint32 TransporterRegistry::unpack(Uint32 * readPtr, Uint32 sizeOfData, @@ -30,12 +31,15 @@ TransporterRegistry::unpack(Uint32 * readPtr, LinearSectionPtr ptr[3]; Uint32 usedData = 0; - + Uint32 loop_count = 0; + if(state == NoHalt || state == HaltOutput){ - while(sizeOfData >= 4 + sizeof(Protocol6)){ + while ((sizeOfData >= 4 + sizeof(Protocol6)) && + (loop_count < MAX_RECEIVED_SIGNALS)) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; + loop_count++; #if 0 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ @@ -112,10 +116,12 @@ TransporterRegistry::unpack(Uint32 * readPtr, } else { /** state = HaltIO || state == HaltInput */ - while(sizeOfData >= 4 + sizeof(Protocol6)){ + while ((sizeOfData >= 4 + sizeof(Protocol6)) && + (loop_count < MAX_RECEIVED_SIGNALS)) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; + loop_count++; #if 0 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ @@ -208,12 +214,13 @@ TransporterRegistry::unpack(Uint32 * readPtr, IOState state) { static SignalHeader signalHeader; static LinearSectionPtr ptr[3]; + Uint32 loop_count = 0; if(state == NoHalt || state == HaltOutput){ - while(readPtr < eodPtr){ + while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; - + loop_count++; #if 0 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ //Do funky stuff @@ -280,11 +287,11 @@ TransporterRegistry::unpack(Uint32 * readPtr, } else { /** state = HaltIO || state == HaltInput */ - while(readPtr < eodPtr){ + while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { Uint32 word1 = readPtr[0]; Uint32 word2 = readPtr[1]; Uint32 word3 = readPtr[2]; - + loop_count++; #if 0 if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){ //Do funky stuff diff --git a/ndb/src/common/transporter/SCI_Transporter.cpp b/ndb/src/common/transporter/SCI_Transporter.cpp index 465d7827069..73fbb064599 100644 --- a/ndb/src/common/transporter/SCI_Transporter.cpp +++ b/ndb/src/common/transporter/SCI_Transporter.cpp @@ -530,7 +530,6 @@ void SCI_Transporter::setupLocalSegment() Uint32 * localReadIndex = (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory; Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1); - Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2); m_localStatusFlag = (Uint32*)(localReadIndex + 3); char * localStartOfBuf = (char*) @@ -538,7 +537,6 @@ void SCI_Transporter::setupLocalSegment() * localReadIndex = 0; * localWriteIndex = 0; - * localEndWriteIndex = 0; const Uint32 slack = MAX_MESSAGE_SIZE; @@ -546,7 +544,6 @@ void SCI_Transporter::setupLocalSegment() sizeOfBuffer, slack, localReadIndex, - localEndWriteIndex, localWriteIndex); reader->clear(); @@ -570,7 +567,6 @@ void SCI_Transporter::setupRemoteSegment() Uint32 * remoteReadIndex = (Uint32*)segPtr; Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1); - Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2); m_remoteStatusFlag = (Uint32*)(segPtr + 3); char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize)); @@ -579,7 +575,6 @@ void SCI_Transporter::setupRemoteSegment() sizeOfBuffer, slack, remoteReadIndex, - remoteEndWriteIndex, remoteWriteIndex); writer->clear(); @@ -598,7 +593,6 @@ void SCI_Transporter::setupRemoteSegment() Uint32 * remoteReadIndex2 = (Uint32*)segPtr; Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); - Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2); m_remoteStatusFlag2 = (Uint32*)(segPtr + 3); char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); @@ -613,12 +607,10 @@ void SCI_Transporter::setupRemoteSegment() sizeOfBuffer, slack, remoteReadIndex2, - remoteEndWriteIndex2, remoteWriteIndex2); * remoteReadIndex = 0; * remoteWriteIndex = 0; - * remoteEndWriteIndex = 0; writer2->clear(); m_TargetSegm[1].writer=writer2; if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { @@ -918,14 +910,13 @@ SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio) Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize; Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2; Uint32 new_curr_data_size = curr_data_size + lenBytes; - if ((new_curr_data_size >= send_buf_size) || + if ((curr_data_size >= send_buf_size) || (curr_data_size >= sci_buffer_remaining)) { /** * The new message will not fit in the send buffer. We need to * send the send buffer before filling it up with the new * signal data. If current data size will spill over buffer edge - * we will also send to avoid writing larger than possible in - * buffer. + * we will also send to ensure correct operation. */ if (!doSend()) { /** diff --git a/ndb/src/common/transporter/SCI_Transporter.hpp b/ndb/src/common/transporter/SCI_Transporter.hpp index adc94f8bb4b..e2f2dfcaf99 100644 --- a/ndb/src/common/transporter/SCI_Transporter.hpp +++ b/ndb/src/common/transporter/SCI_Transporter.hpp @@ -297,13 +297,12 @@ private: */ bool sendIsPossible(struct timeval * timeout); - - void getReceivePtr(Uint32 ** ptr, Uint32 &size){ - size = reader->getReadPtr(* ptr); + void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ + reader->getReadPtr(* ptr, * eod); } - void updateReceivePtr(Uint32 size){ - reader->updateReadPtr(size); + void updateReceivePtr(Uint32 *ptr){ + reader->updateReadPtr(ptr); } /** diff --git a/ndb/src/common/transporter/SHM_Buffer.hpp b/ndb/src/common/transporter/SHM_Buffer.hpp index b0dbd3362a8..f49b4fe73cb 100644 --- a/ndb/src/common/transporter/SHM_Buffer.hpp +++ b/ndb/src/common/transporter/SHM_Buffer.hpp @@ -42,13 +42,11 @@ public: Uint32 _sizeOfBuffer, Uint32 _slack, Uint32 * _readIndex, - Uint32 * _endWriteIndex, Uint32 * _writeIndex) : m_startOfBuffer(_startOfBuffer), m_totalBufferSize(_sizeOfBuffer), m_bufferSize(_sizeOfBuffer - _slack), m_sharedReadIndex(_readIndex), - m_sharedEndWriteIndex(_endWriteIndex), m_sharedWriteIndex(_writeIndex) { } @@ -68,12 +66,12 @@ public: * returns ptr - where to start reading * sz - how much can I read */ - inline Uint32 getReadPtr(Uint32 * & ptr); + inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod); /** * Update read ptr */ - inline void updateReadPtr(Uint32 size); + inline void updateReadPtr(Uint32 *ptr); private: char * const m_startOfBuffer; @@ -82,7 +80,6 @@ private: Uint32 m_readIndex; Uint32 * m_sharedReadIndex; - Uint32 * m_sharedEndWriteIndex; Uint32 * m_sharedWriteIndex; }; @@ -100,22 +97,19 @@ SHM_Reader::empty() const{ * sz - how much can I read */ inline -Uint32 -SHM_Reader::getReadPtr(Uint32 * & ptr) +void +SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod) { - Uint32 *eod; Uint32 tReadIndex = m_readIndex; Uint32 tWriteIndex = * m_sharedWriteIndex; - Uint32 tEndWriteIndex = * m_sharedEndWriteIndex; ptr = (Uint32*)&m_startOfBuffer[tReadIndex]; if(tReadIndex <= tWriteIndex){ eod = (Uint32*)&m_startOfBuffer[tWriteIndex]; } else { - eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex]; + eod = (Uint32*)&m_startOfBuffer[m_bufferSize]; } - return (Uint32)((char*)eod - (char*)ptr); } /** @@ -123,10 +117,10 @@ SHM_Reader::getReadPtr(Uint32 * & ptr) */ inline void -SHM_Reader::updateReadPtr(Uint32 size) +SHM_Reader::updateReadPtr(Uint32 *ptr) { - Uint32 tReadIndex = m_readIndex; - tReadIndex += size; + Uint32 tReadIndex = ((char*)ptr) - m_startOfBuffer; + assert(tReadIndex < m_totalBufferSize); if(tReadIndex >= m_bufferSize){ @@ -145,13 +139,11 @@ public: Uint32 _sizeOfBuffer, Uint32 _slack, Uint32 * _readIndex, - Uint32 * _endWriteIndex, Uint32 * _writeIndex) : m_startOfBuffer(_startOfBuffer), m_totalBufferSize(_sizeOfBuffer), m_bufferSize(_sizeOfBuffer - _slack), m_sharedReadIndex(_readIndex), - m_sharedEndWriteIndex(_endWriteIndex), m_sharedWriteIndex(_writeIndex) { } @@ -176,7 +168,6 @@ private: Uint32 m_writeIndex; Uint32 * m_sharedReadIndex; - Uint32 * m_sharedEndWriteIndex; Uint32 * m_sharedWriteIndex; }; @@ -215,7 +206,6 @@ SHM_Writer::updateWritePtr(Uint32 sz){ assert(tWriteIndex < m_totalBufferSize); if(tWriteIndex >= m_bufferSize){ - * m_sharedEndWriteIndex = tWriteIndex; tWriteIndex = 0; } diff --git a/ndb/src/common/transporter/SHM_Transporter.cpp b/ndb/src/common/transporter/SHM_Transporter.cpp index 7c801658dbd..ab161d8c18c 100644 --- a/ndb/src/common/transporter/SHM_Transporter.cpp +++ b/ndb/src/common/transporter/SHM_Transporter.cpp @@ -82,14 +82,12 @@ SHM_Transporter::setupBuffers(){ Uint32 * sharedReadIndex1 = base1; Uint32 * sharedWriteIndex1 = base1 + 1; - Uint32 * sharedEndWriteIndex1 = base1 + 2; serverStatusFlag = base1 + 4; char * startOfBuf1 = shmBuf+sharedSize; Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize); Uint32 * sharedReadIndex2 = base2; Uint32 * sharedWriteIndex2 = base2 + 1; - Uint32 * sharedEndWriteIndex2 = base2 + 2; clientStatusFlag = base2 + 4; char * startOfBuf2 = ((char *)base2)+sharedSize; @@ -99,23 +97,19 @@ SHM_Transporter::setupBuffers(){ sizeOfBuffer, slack, sharedReadIndex1, - sharedEndWriteIndex1, sharedWriteIndex1); writer = new SHM_Writer(startOfBuf2, sizeOfBuffer, slack, sharedReadIndex2, - sharedEndWriteIndex2, sharedWriteIndex2); * sharedReadIndex1 = 0; * sharedWriteIndex1 = 0; - * sharedEndWriteIndex1 = 0; * sharedReadIndex2 = 0; * sharedWriteIndex2 = 0; - * sharedEndWriteIndex2 = 0; reader->clear(); writer->clear(); @@ -148,19 +142,16 @@ SHM_Transporter::setupBuffers(){ sizeOfBuffer, slack, sharedReadIndex2, - sharedEndWriteIndex2, sharedWriteIndex2); writer = new SHM_Writer(startOfBuf1, sizeOfBuffer, slack, sharedReadIndex1, - sharedEndWriteIndex1, sharedWriteIndex1); * sharedReadIndex2 = 0; * sharedWriteIndex1 = 0; - * sharedEndWriteIndex1 = 0; reader->clear(); writer->clear(); diff --git a/ndb/src/common/transporter/SHM_Transporter.hpp b/ndb/src/common/transporter/SHM_Transporter.hpp index 892acbb7ac4..27692209ffe 100644 --- a/ndb/src/common/transporter/SHM_Transporter.hpp +++ b/ndb/src/common/transporter/SHM_Transporter.hpp @@ -61,12 +61,12 @@ public: writer->updateWritePtr(lenBytes); } - void getReceivePtr(Uint32 ** ptr, Uint32 sz){ - sz = reader->getReadPtr(* ptr); + void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ + reader->getReadPtr(* ptr, * eod); } - void updateReceivePtr(Uint32 sz){ - reader->updateReadPtr(sz); + void updateReceivePtr(Uint32 * ptr){ + reader->updateReadPtr(ptr); } protected: diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index ca574b19dbc..5ca88211f54 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -857,11 +857,11 @@ TransporterRegistry::performReceive(){ const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ - Uint32 * readPtr; + Uint32 * readPtr, * eodPtr; Uint32 sz = 0; - t->getReceivePtr(&readPtr, sz); - Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]); - t->updateReceivePtr(szUsed); + t->getReceivePtr(&readPtr, &eodPtr); + Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); + t->updateReceivePtr(newPtr); } } } @@ -873,11 +873,11 @@ TransporterRegistry::performReceive(){ const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ - Uint32 * readPtr; + Uint32 * readPtr, * eodPtr; Uint32 sz = 0; - t->getReceivePtr(&readPtr, sz); - Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]); - t->updateReceivePtr(szUsed); + t->getReceivePtr(&readPtr, &eodPtr); + Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); + t->updateReceivePtr(newPtr); } } } diff --git a/ndb/src/kernel/vm/FastScheduler.hpp b/ndb/src/kernel/vm/FastScheduler.hpp index 9749dab5d85..dc707e47eef 100644 --- a/ndb/src/kernel/vm/FastScheduler.hpp +++ b/ndb/src/kernel/vm/FastScheduler.hpp @@ -141,7 +141,7 @@ int FastScheduler::checkDoJob() { /* - * Joob buffer overload protetction + * Job buffer overload protetction * If the job buffer B is filled over a certain limit start * to execute the signals in the job buffer's */ diff --git a/ndb/src/mgmclient/Makefile.am b/ndb/src/mgmclient/Makefile.am index 72ddc9d098b..e271c7bed53 100644 --- a/ndb/src/mgmclient/Makefile.am +++ b/ndb/src/mgmclient/Makefile.am @@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ $(top_builddir)/strings/libmystrings.a \ - @TERMCAP_LIB@ + @TERMCAP_LIB@ @NDB_SCI_LIBS@ ndb_mgm_LDFLAGS = @ndb_bin_am_ldflags@ diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp index 7cb438bd2dd..3bbceb0113e 100644 --- a/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/ndb/src/mgmsrv/ConfigInfo.cpp @@ -1944,7 +1944,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::INT, - "2K", + "8K", "128", "32K" }, @@ -1956,7 +1956,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::INT, - "192K", + "1M", "64K", STR_VALUE(MAX_INT_RNIL) }, diff --git a/ndb/test/run-test/Makefile.am b/ndb/test/run-test/Makefile.am index 03b53509f05..3bf2edde47a 100644 --- a/ndb/test/run-test/Makefile.am +++ b/ndb/test/run-test/Makefile.am @@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/mgmclient/CpcClient.o \ $(top_builddir)/ndb/src/libndbclient.la \ $(top_builddir)/dbug/libdbug.a \ $(top_builddir)/mysys/libmysys.a \ - $(top_builddir)/strings/libmystrings.a + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ wrappersdir=$(prefix)/bin wrappers_SCRIPTS=atrt-testBackup atrt-mysql-test-run |