summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <mysql@mc04.(none)>2004-09-20 12:03:18 +0200
committerunknown <mysql@mc04.(none)>2004-09-20 12:03:18 +0200
commit48e56f47a62829480803a7bdfe46d6ebbea17c47 (patch)
tree46f14c64edbc5848886ed45427b38d0937b74c06
parenta562315e3fc1fb84ac1352960b79fb21e39ca48d (diff)
downloadmariadb-git-48e56f47a62829480803a7bdfe46d6ebbea17c47.tar.gz
Restored old shared memory buffer implementation (used by SCI and SHM).
Improved Default SCI config params Added missing SCI libraries in ndb_mgm and atrt Added max of 1024 signals per receive on transporter (to improve real-time bahaviour and to ensure no job buffer explosion, still some more work left on avoiding job buffer explosion in the general case) ndb/src/common/transporter/Packer.cpp: Fix for job buffer explosion and real-time behaviour also in high load scenarios. ndb/src/common/transporter/SCI_Transporter.cpp: Restored old Shared memory buffer implementation. Changed condition slightly on when to send SCI buffer. ndb/src/common/transporter/SCI_Transporter.hpp: Changed back to old shared memory implementation ndb/src/common/transporter/SHM_Buffer.hpp: Changed back to old shared memory implementation ndb/src/common/transporter/SHM_Transporter.cpp: Changed back to old shared memory implementation ndb/src/common/transporter/SHM_Transporter.hpp: Changed back to old shared memory implementation ndb/src/common/transporter/TransporterRegistry.cpp: Changed back to old shared memory implementation ndb/src/kernel/vm/FastScheduler.hpp: Spelling error ndb/src/mgmclient/Makefile.am: Missing SCI library ndb/src/mgmsrv/ConfigInfo.cpp: Changed to more proper config parameters ndb/test/run-test/Makefile.am: Added missing SCI library
-rw-r--r--ndb/src/common/transporter/Packer.cpp21
-rw-r--r--ndb/src/common/transporter/SCI_Transporter.cpp13
-rw-r--r--ndb/src/common/transporter/SCI_Transporter.hpp9
-rw-r--r--ndb/src/common/transporter/SHM_Buffer.hpp26
-rw-r--r--ndb/src/common/transporter/SHM_Transporter.cpp9
-rw-r--r--ndb/src/common/transporter/SHM_Transporter.hpp8
-rw-r--r--ndb/src/common/transporter/TransporterRegistry.cpp16
-rw-r--r--ndb/src/kernel/vm/FastScheduler.hpp2
-rw-r--r--ndb/src/mgmclient/Makefile.am2
-rw-r--r--ndb/src/mgmsrv/ConfigInfo.cpp4
-rw-r--r--ndb/test/run-test/Makefile.am2
11 files changed, 45 insertions, 67 deletions
diff --git a/ndb/src/common/transporter/Packer.cpp b/ndb/src/common/transporter/Packer.cpp
index 645517a4b1a..9eba335330d 100644
--- a/ndb/src/common/transporter/Packer.cpp
+++ b/ndb/src/common/transporter/Packer.cpp
@@ -21,6 +21,7 @@
#include <TransporterCallback.hpp>
#include <RefConvert.hpp>
+#define MAX_RECEIVED_SIGNALS 1024
Uint32
TransporterRegistry::unpack(Uint32 * readPtr,
Uint32 sizeOfData,
@@ -30,12 +31,15 @@ TransporterRegistry::unpack(Uint32 * readPtr,
LinearSectionPtr ptr[3];
Uint32 usedData = 0;
-
+ Uint32 loop_count = 0;
+
if(state == NoHalt || state == HaltOutput){
- while(sizeOfData >= 4 + sizeof(Protocol6)){
+ while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
+ (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
+ loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
@@ -112,10 +116,12 @@ TransporterRegistry::unpack(Uint32 * readPtr,
} else {
/** state = HaltIO || state == HaltInput */
- while(sizeOfData >= 4 + sizeof(Protocol6)){
+ while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
+ (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
+ loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
@@ -208,12 +214,13 @@ TransporterRegistry::unpack(Uint32 * readPtr,
IOState state) {
static SignalHeader signalHeader;
static LinearSectionPtr ptr[3];
+ Uint32 loop_count = 0;
if(state == NoHalt || state == HaltOutput){
- while(readPtr < eodPtr){
+ while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
-
+ loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
//Do funky stuff
@@ -280,11 +287,11 @@ TransporterRegistry::unpack(Uint32 * readPtr,
} else {
/** state = HaltIO || state == HaltInput */
- while(readPtr < eodPtr){
+ while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
-
+ loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
//Do funky stuff
diff --git a/ndb/src/common/transporter/SCI_Transporter.cpp b/ndb/src/common/transporter/SCI_Transporter.cpp
index 465d7827069..73fbb064599 100644
--- a/ndb/src/common/transporter/SCI_Transporter.cpp
+++ b/ndb/src/common/transporter/SCI_Transporter.cpp
@@ -530,7 +530,6 @@ void SCI_Transporter::setupLocalSegment()
Uint32 * localReadIndex =
(Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1);
- Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2);
m_localStatusFlag = (Uint32*)(localReadIndex + 3);
char * localStartOfBuf = (char*)
@@ -538,7 +537,6 @@ void SCI_Transporter::setupLocalSegment()
* localReadIndex = 0;
* localWriteIndex = 0;
- * localEndWriteIndex = 0;
const Uint32 slack = MAX_MESSAGE_SIZE;
@@ -546,7 +544,6 @@ void SCI_Transporter::setupLocalSegment()
sizeOfBuffer,
slack,
localReadIndex,
- localEndWriteIndex,
localWriteIndex);
reader->clear();
@@ -570,7 +567,6 @@ void SCI_Transporter::setupRemoteSegment()
Uint32 * remoteReadIndex = (Uint32*)segPtr;
Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
- Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2);
m_remoteStatusFlag = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));
@@ -579,7 +575,6 @@ void SCI_Transporter::setupRemoteSegment()
sizeOfBuffer,
slack,
remoteReadIndex,
- remoteEndWriteIndex,
remoteWriteIndex);
writer->clear();
@@ -598,7 +593,6 @@ void SCI_Transporter::setupRemoteSegment()
Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
- Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2);
m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
@@ -613,12 +607,10 @@ void SCI_Transporter::setupRemoteSegment()
sizeOfBuffer,
slack,
remoteReadIndex2,
- remoteEndWriteIndex2,
remoteWriteIndex2);
* remoteReadIndex = 0;
* remoteWriteIndex = 0;
- * remoteEndWriteIndex = 0;
writer2->clear();
m_TargetSegm[1].writer=writer2;
if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
@@ -918,14 +910,13 @@ SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
Uint32 new_curr_data_size = curr_data_size + lenBytes;
- if ((new_curr_data_size >= send_buf_size) ||
+ if ((curr_data_size >= send_buf_size) ||
(curr_data_size >= sci_buffer_remaining)) {
/**
* The new message will not fit in the send buffer. We need to
* send the send buffer before filling it up with the new
* signal data. If current data size will spill over buffer edge
- * we will also send to avoid writing larger than possible in
- * buffer.
+ * we will also send to ensure correct operation.
*/
if (!doSend()) {
/**
diff --git a/ndb/src/common/transporter/SCI_Transporter.hpp b/ndb/src/common/transporter/SCI_Transporter.hpp
index adc94f8bb4b..e2f2dfcaf99 100644
--- a/ndb/src/common/transporter/SCI_Transporter.hpp
+++ b/ndb/src/common/transporter/SCI_Transporter.hpp
@@ -297,13 +297,12 @@ private:
*/
bool sendIsPossible(struct timeval * timeout);
-
- void getReceivePtr(Uint32 ** ptr, Uint32 &size){
- size = reader->getReadPtr(* ptr);
+ void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
+ reader->getReadPtr(* ptr, * eod);
}
- void updateReceivePtr(Uint32 size){
- reader->updateReadPtr(size);
+ void updateReceivePtr(Uint32 *ptr){
+ reader->updateReadPtr(ptr);
}
/**
diff --git a/ndb/src/common/transporter/SHM_Buffer.hpp b/ndb/src/common/transporter/SHM_Buffer.hpp
index b0dbd3362a8..f49b4fe73cb 100644
--- a/ndb/src/common/transporter/SHM_Buffer.hpp
+++ b/ndb/src/common/transporter/SHM_Buffer.hpp
@@ -42,13 +42,11 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
- Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
- m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
@@ -68,12 +66,12 @@ public:
* returns ptr - where to start reading
* sz - how much can I read
*/
- inline Uint32 getReadPtr(Uint32 * & ptr);
+ inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
/**
* Update read ptr
*/
- inline void updateReadPtr(Uint32 size);
+ inline void updateReadPtr(Uint32 *ptr);
private:
char * const m_startOfBuffer;
@@ -82,7 +80,6 @@ private:
Uint32 m_readIndex;
Uint32 * m_sharedReadIndex;
- Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
@@ -100,22 +97,19 @@ SHM_Reader::empty() const{
* sz - how much can I read
*/
inline
-Uint32
-SHM_Reader::getReadPtr(Uint32 * & ptr)
+void
+SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod)
{
- Uint32 *eod;
Uint32 tReadIndex = m_readIndex;
Uint32 tWriteIndex = * m_sharedWriteIndex;
- Uint32 tEndWriteIndex = * m_sharedEndWriteIndex;
ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
if(tReadIndex <= tWriteIndex){
eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
} else {
- eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex];
+ eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
}
- return (Uint32)((char*)eod - (char*)ptr);
}
/**
@@ -123,10 +117,10 @@ SHM_Reader::getReadPtr(Uint32 * & ptr)
*/
inline
void
-SHM_Reader::updateReadPtr(Uint32 size)
+SHM_Reader::updateReadPtr(Uint32 *ptr)
{
- Uint32 tReadIndex = m_readIndex;
- tReadIndex += size;
+ Uint32 tReadIndex = ((char*)ptr) - m_startOfBuffer;
+
assert(tReadIndex < m_totalBufferSize);
if(tReadIndex >= m_bufferSize){
@@ -145,13 +139,11 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
- Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
- m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
@@ -176,7 +168,6 @@ private:
Uint32 m_writeIndex;
Uint32 * m_sharedReadIndex;
- Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
@@ -215,7 +206,6 @@ SHM_Writer::updateWritePtr(Uint32 sz){
assert(tWriteIndex < m_totalBufferSize);
if(tWriteIndex >= m_bufferSize){
- * m_sharedEndWriteIndex = tWriteIndex;
tWriteIndex = 0;
}
diff --git a/ndb/src/common/transporter/SHM_Transporter.cpp b/ndb/src/common/transporter/SHM_Transporter.cpp
index 7c801658dbd..ab161d8c18c 100644
--- a/ndb/src/common/transporter/SHM_Transporter.cpp
+++ b/ndb/src/common/transporter/SHM_Transporter.cpp
@@ -82,14 +82,12 @@ SHM_Transporter::setupBuffers(){
Uint32 * sharedReadIndex1 = base1;
Uint32 * sharedWriteIndex1 = base1 + 1;
- Uint32 * sharedEndWriteIndex1 = base1 + 2;
serverStatusFlag = base1 + 4;
char * startOfBuf1 = shmBuf+sharedSize;
Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
Uint32 * sharedReadIndex2 = base2;
Uint32 * sharedWriteIndex2 = base2 + 1;
- Uint32 * sharedEndWriteIndex2 = base2 + 2;
clientStatusFlag = base2 + 4;
char * startOfBuf2 = ((char *)base2)+sharedSize;
@@ -99,23 +97,19 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer,
slack,
sharedReadIndex1,
- sharedEndWriteIndex1,
sharedWriteIndex1);
writer = new SHM_Writer(startOfBuf2,
sizeOfBuffer,
slack,
sharedReadIndex2,
- sharedEndWriteIndex2,
sharedWriteIndex2);
* sharedReadIndex1 = 0;
* sharedWriteIndex1 = 0;
- * sharedEndWriteIndex1 = 0;
* sharedReadIndex2 = 0;
* sharedWriteIndex2 = 0;
- * sharedEndWriteIndex2 = 0;
reader->clear();
writer->clear();
@@ -148,19 +142,16 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer,
slack,
sharedReadIndex2,
- sharedEndWriteIndex2,
sharedWriteIndex2);
writer = new SHM_Writer(startOfBuf1,
sizeOfBuffer,
slack,
sharedReadIndex1,
- sharedEndWriteIndex1,
sharedWriteIndex1);
* sharedReadIndex2 = 0;
* sharedWriteIndex1 = 0;
- * sharedEndWriteIndex1 = 0;
reader->clear();
writer->clear();
diff --git a/ndb/src/common/transporter/SHM_Transporter.hpp b/ndb/src/common/transporter/SHM_Transporter.hpp
index 892acbb7ac4..27692209ffe 100644
--- a/ndb/src/common/transporter/SHM_Transporter.hpp
+++ b/ndb/src/common/transporter/SHM_Transporter.hpp
@@ -61,12 +61,12 @@ public:
writer->updateWritePtr(lenBytes);
}
- void getReceivePtr(Uint32 ** ptr, Uint32 sz){
- sz = reader->getReadPtr(* ptr);
+ void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
+ reader->getReadPtr(* ptr, * eod);
}
- void updateReceivePtr(Uint32 sz){
- reader->updateReadPtr(sz);
+ void updateReceivePtr(Uint32 * ptr){
+ reader->updateReadPtr(ptr);
}
protected:
diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp
index ca574b19dbc..5ca88211f54 100644
--- a/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -857,11 +857,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
- Uint32 * readPtr;
+ Uint32 * readPtr, * eodPtr;
Uint32 sz = 0;
- t->getReceivePtr(&readPtr, sz);
- Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
- t->updateReceivePtr(szUsed);
+ t->getReceivePtr(&readPtr, &eodPtr);
+ Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ t->updateReceivePtr(newPtr);
}
}
}
@@ -873,11 +873,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
- Uint32 * readPtr;
+ Uint32 * readPtr, * eodPtr;
Uint32 sz = 0;
- t->getReceivePtr(&readPtr, sz);
- Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
- t->updateReceivePtr(szUsed);
+ t->getReceivePtr(&readPtr, &eodPtr);
+ Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+ t->updateReceivePtr(newPtr);
}
}
}
diff --git a/ndb/src/kernel/vm/FastScheduler.hpp b/ndb/src/kernel/vm/FastScheduler.hpp
index 9749dab5d85..dc707e47eef 100644
--- a/ndb/src/kernel/vm/FastScheduler.hpp
+++ b/ndb/src/kernel/vm/FastScheduler.hpp
@@ -141,7 +141,7 @@ int
FastScheduler::checkDoJob()
{
/*
- * Joob buffer overload protetction
+ * Job buffer overload protetction
* If the job buffer B is filled over a certain limit start
* to execute the signals in the job buffer's
*/
diff --git a/ndb/src/mgmclient/Makefile.am b/ndb/src/mgmclient/Makefile.am
index 72ddc9d098b..e271c7bed53 100644
--- a/ndb/src/mgmclient/Makefile.am
+++ b/ndb/src/mgmclient/Makefile.am
@@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a \
- @TERMCAP_LIB@
+ @TERMCAP_LIB@ @NDB_SCI_LIBS@
ndb_mgm_LDFLAGS = @ndb_bin_am_ldflags@
diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp
index 7cb438bd2dd..3bbceb0113e 100644
--- a/ndb/src/mgmsrv/ConfigInfo.cpp
+++ b/ndb/src/mgmsrv/ConfigInfo.cpp
@@ -1944,7 +1944,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
- "2K",
+ "8K",
"128",
"32K" },
@@ -1956,7 +1956,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
- "192K",
+ "1M",
"64K",
STR_VALUE(MAX_INT_RNIL) },
diff --git a/ndb/test/run-test/Makefile.am b/ndb/test/run-test/Makefile.am
index 03b53509f05..3bf2edde47a 100644
--- a/ndb/test/run-test/Makefile.am
+++ b/ndb/test/run-test/Makefile.am
@@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/mgmclient/CpcClient.o \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
- $(top_builddir)/strings/libmystrings.a
+ $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
wrappersdir=$(prefix)/bin
wrappers_SCRIPTS=atrt-testBackup atrt-mysql-test-run