summaryrefslogtreecommitdiff
path: root/storage/ndb/src/common/transporter/OSE_Receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/common/transporter/OSE_Receiver.cpp')
-rw-r--r--storage/ndb/src/common/transporter/OSE_Receiver.cpp359
1 files changed, 359 insertions, 0 deletions
diff --git a/storage/ndb/src/common/transporter/OSE_Receiver.cpp b/storage/ndb/src/common/transporter/OSE_Receiver.cpp
new file mode 100644
index 00000000000..63a33fc8f24
--- /dev/null
+++ b/storage/ndb/src/common/transporter/OSE_Receiver.cpp
@@ -0,0 +1,359 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <NdbOut.hpp>
+#include "OSE_Receiver.hpp"
+#include "OSE_Transporter.hpp"
+#include "TransporterCallback.hpp"
+#include <TransporterRegistry.hpp>
+#include "TransporterInternalDefinitions.hpp"
+
+OSE_Receiver::OSE_Receiver(TransporterRegistry * tr,
+ int _recBufSize,
+ NodeId _localNodeId) {
+ theTransporterRegistry = tr;
+
+ recBufSize = _recBufSize;
+ recBufReadIndex = 0;
+ recBufWriteIndex = 0;
+ receiveBuffer = new union SIGNAL * [recBufSize];
+
+ waitStackCount = 0;
+ waitStackSize = _recBufSize;
+ waitStack = new union SIGNAL * [waitStackSize];
+
+ nextSigId = new Uint32[MAX_NTRANSPORTERS];
+ for (int i = 0; i < MAX_NTRANSPORTERS; i++)
+ nextSigId[i] = 0;
+
+ phantomCreated = false;
+ localNodeId = _localNodeId;
+ BaseString::snprintf(localHostName, sizeof(localHostName),
+ "ndb_node%d", localNodeId);
+
+ DEBUG("localNodeId = " << localNodeId << " -> localHostName = "
+ << localHostName);
+}
+
+OSE_Receiver::~OSE_Receiver(){
+ while(recBufReadIndex != recBufWriteIndex){
+ free_buf(&receiveBuffer[recBufReadIndex]);
+ recBufReadIndex = (recBufReadIndex + 1) % recBufSize;
+ }
+ delete [] receiveBuffer;
+ destroyPhantom();
+}
+
+PROCESS
+OSE_Receiver::createPhantom(){
+ redir.sig = 1;
+ redir.pid = current_process();
+
+ if(!phantomCreated){
+ phantomPid = create_process
+ (OS_PHANTOM, // Type
+ localHostName, // Name
+ NULL, // Entry point
+ 0, // Stack size
+ 0, // Prio - Not used
+ (OSTIME)0, // Timeslice - Not used
+ 0, // Block - current block
+ &redir,
+ (OSVECTOR)0, // vector
+ (OSUSER)0); // user
+ phantomCreated = true;
+ DEBUG("Created phantom pid: " << hex << phantomPid);
+ }
+ return phantomPid;
+}
+
+void
+OSE_Receiver::destroyPhantom(){
+ if(phantomCreated){
+ DEBUG("Destroying phantom pid: " << hex << phantomPid);
+ kill_proc(phantomPid);
+ phantomCreated = false;
+ }
+}
+
+static SIGSELECT PRIO_A_SIGNALS[] = { 6,
+ NDB_TRANSPORTER_PRIO_A,
+ NDB_TRANSPORTER_HUNT,
+ NDB_TRANSPORTER_CONNECT_REQ,
+ NDB_TRANSPORTER_CONNECT_REF,
+ NDB_TRANSPORTER_CONNECT_CONF,
+ NDB_TRANSPORTER_DISCONNECT_ORD
+};
+
+static SIGSELECT PRIO_B_SIGNALS[] = { 1,
+ NDB_TRANSPORTER_DATA
+};
+
+/**
+ * Check waitstack for signals that are next in sequence
+ * Put any found signal in receive buffer
+ * Returns true if one signal is found
+ */
+bool
+OSE_Receiver::checkWaitStack(NodeId _nodeId){
+
+ for(int i = 0; i < waitStackCount; i++){
+ if (waitStack[i]->dataSignal.senderNodeId == _nodeId &&
+ waitStack[i]->dataSignal.sigId == nextSigId[_nodeId]){
+
+ ndbout_c("INFO: signal popped from waitStack, sigId = %d",
+ waitStack[i]->dataSignal.sigId);
+
+ if(isFull()){
+ ndbout_c("ERROR: receiveBuffer is full");
+ reportError(callbackObj, _nodeId, TE_RECEIVE_BUFFER_FULL);
+ return false;
+ }
+
+ // The next signal was found, put it in the receive buffer
+ insertReceiveBuffer(waitStack[i]);
+
+ // Increase sequence id, set it to the next expected id
+ nextSigId[_nodeId]++;
+
+ // Move signals below up one step
+ for(int j = i; j < waitStackCount-1; j++)
+ waitStack[j] = waitStack[j+1];
+ waitStack[waitStackCount] = NULL;
+ waitStackCount--;
+
+ // return true since signal was found
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * Clear waitstack for signals from node with _nodeId
+ */
+void
+OSE_Receiver::clearWaitStack(NodeId _nodeId){
+
+ for(int i = 0; i < waitStackCount; i++){
+ if (waitStack[i]->dataSignal.senderNodeId == _nodeId){
+
+ // Free signal buffer
+ free_buf(&waitStack[i]);
+
+ // Move signals below up one step
+ for(int j = i; j < waitStackCount-1; j++)
+ waitStack[j] = waitStack[j+1];
+ waitStack[waitStackCount] = NULL;
+ waitStackCount--;
+ }
+ }
+ nextSigId[_nodeId] = 0;
+}
+
+
+inline
+void
+OSE_Receiver::insertWaitStack(union SIGNAL* _sig){
+ if (waitStackCount <= waitStackSize){
+ waitStack[waitStackCount] = _sig;
+ waitStackCount++;
+ } else {
+ ndbout_c("ERROR: waitStack is full");
+ reportError(callbackObj, localNodeId, TE_WAIT_STACK_FULL);
+ }
+}
+
+bool
+OSE_Receiver::doReceive(Uint32 timeOutMillis) {
+ if(isFull())
+ return false;
+
+ union SIGNAL * sig = receive_w_tmo(0,
+ PRIO_A_SIGNALS);
+ if(sig == NIL){
+ sig = receive_w_tmo(timeOutMillis,
+ PRIO_B_SIGNALS);
+ if(sig == NIL)
+ return false;
+ }
+
+ DEBUG("Received signal: " << sig->sigNo << " "
+ << sigNo2String(sig->sigNo));
+
+ switch(sig->sigNo){
+ case NDB_TRANSPORTER_PRIO_A:
+ {
+ OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId);
+ if (t != 0 && t->isConnected()){
+ insertReceiveBuffer(sig);
+ } else {
+ free_buf(&sig);
+ }
+ }
+ break;
+ case NDB_TRANSPORTER_DATA:
+ {
+ OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId);
+ if (t != 0 && t->isConnected()){
+ int nodeId = sig->dataSignal.senderNodeId;
+ Uint32 currSigId = sig->dataSignal.sigId;
+
+ /**
+ * Check if signal is the next in sequence
+ * nextSigId is always set to the next sigId to wait for
+ */
+ if (nextSigId[nodeId] == currSigId){
+
+ // Insert in receive buffer
+ insertReceiveBuffer(sig);
+
+ // Increase sequence id, set it to the next expected id
+ nextSigId[nodeId]++;
+
+ // Check if there are any signal in the wait stack
+ if (waitStackCount > 0){
+ while(checkWaitStack(nodeId));
+ }
+ } else {
+ // Signal was not received in correct order
+ // Check values and put it in the waitStack
+ ndbout_c("WARNING: sigId out of order,"
+ " currSigId = %d, nextSigId = %d",
+ currSigId, nextSigId[nodeId]);
+
+ if (currSigId < nextSigId[nodeId]){
+ // Current recieved sigId was smaller than nextSigId
+ // There is no use to put it in the waitStack
+ ndbout_c("ERROR: recieved sigId was smaller than nextSigId");
+ reportError(callbackObj, nodeId, TE_TOO_SMALL_SIGID);
+ return false;
+ }
+
+ if (currSigId > (nextSigId[nodeId] + waitStackSize)){
+ // Current sigId was larger than nextSigId + size of waitStack
+ // we can never "save" so many signal's on the stack
+ ndbout_c("ERROR: currSigId > (nextSigId + size of waitStack)");
+ reportError(callbackObj, nodeId, TE_TOO_LARGE_SIGID);
+ return false;
+ }
+
+ // Insert in wait stack
+ insertWaitStack(sig);
+ }
+ } else {
+ free_buf(&sig);
+ }
+ }
+ break;
+ case NDB_TRANSPORTER_HUNT:
+ {
+ NdbTransporterHunt * s = (NdbTransporterHunt*)sig;
+ OSE_Transporter * t = getTransporter(s->remoteNodeId);
+ if(t != 0)
+ t->huntReceived(s);
+ free_buf(&sig);
+ }
+ break;
+ case NDB_TRANSPORTER_CONNECT_REQ:
+ {
+ NdbTransporterConnectReq * s = (NdbTransporterConnectReq*)sig;
+ OSE_Transporter * t = getTransporter(s->senderNodeId);
+ if(t != 0){
+ if(t->connectReq(s)){
+ clearWaitStack(s->senderNodeId);
+ clearRecvBuffer(s->senderNodeId);
+ }
+ }
+ free_buf(&sig);
+ }
+ break;
+ case NDB_TRANSPORTER_CONNECT_REF:
+ {
+ NdbTransporterConnectRef * s = (NdbTransporterConnectRef*)sig;
+ OSE_Transporter * t = getTransporter(s->senderNodeId);
+ if(t != 0){
+ if(t->connectRef(s)){
+ clearWaitStack(s->senderNodeId);
+ clearRecvBuffer(s->senderNodeId);
+ }
+ }
+ free_buf(&sig);
+ }
+ break;
+ case NDB_TRANSPORTER_CONNECT_CONF:
+ {
+ NdbTransporterConnectConf * s = (NdbTransporterConnectConf*)sig;
+ OSE_Transporter * t = getTransporter(s->senderNodeId);
+ if(t != 0){
+ if(t->connectConf(s)){
+ clearWaitStack(s->senderNodeId);
+ clearRecvBuffer(s->senderNodeId);
+ }
+ }
+ free_buf(&sig);
+ }
+ break;
+ case NDB_TRANSPORTER_DISCONNECT_ORD:
+ {
+ NdbTransporterDisconnectOrd * s = (NdbTransporterDisconnectOrd*)sig;
+ OSE_Transporter * t = getTransporter(s->senderNodeId);
+ if(t != 0){
+ if(t->disconnectOrd(s)){
+ clearWaitStack(s->senderNodeId);
+ clearRecvBuffer(s->senderNodeId);
+ }
+ }
+ free_buf(&sig);
+ }
+ }
+ return true;
+}
+
+OSE_Transporter *
+OSE_Receiver::getTransporter(NodeId nodeId){
+ if(theTransporterRegistry->theTransporterTypes[nodeId] != tt_OSE_TRANSPORTER)
+ return 0;
+ return (OSE_Transporter *)
+ theTransporterRegistry->theTransporters[nodeId];
+}
+
+void
+OSE_Receiver::clearRecvBuffer(NodeId nodeId){
+ int tmpIndex = 0;
+ union SIGNAL** tmp = new union SIGNAL * [recBufSize];
+
+ /**
+ * Put all signal that I want to keep into tmp
+ */
+ while(recBufReadIndex != recBufWriteIndex){
+ if(receiveBuffer[recBufReadIndex]->dataSignal.senderNodeId != nodeId){
+ tmp[tmpIndex] = receiveBuffer[recBufReadIndex];
+ tmpIndex++;
+ } else {
+ free_buf(&receiveBuffer[recBufReadIndex]);
+ }
+ recBufReadIndex = (recBufReadIndex + 1) % recBufSize;
+ }
+
+ /**
+ * Put all signals that I kept back into receiveBuffer
+ */
+ for(int i = 0; i<tmpIndex; i++)
+ insertReceiveBuffer(tmp[i]);
+
+ delete [] tmp;
+}