summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbReceiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbReceiver.cpp')
-rw-r--r--ndb/src/ndbapi/NdbReceiver.cpp65
1 files changed, 62 insertions, 3 deletions
diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp
index bdb5e6c7e78..14f8d4b8440 100644
--- a/ndb/src/ndbapi/NdbReceiver.cpp
+++ b/ndb/src/ndbapi/NdbReceiver.cpp
@@ -14,12 +14,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include <ndb_global.h>
#include "NdbImpl.hpp"
#include <NdbReceiver.hpp>
#include "NdbDictionaryImpl.hpp"
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <NdbConnection.hpp>
+#include <TransporterFacade.hpp>
+#include <signaldata/TcKeyConf.hpp>
NdbReceiver::NdbReceiver(Ndb *aNdb) :
theMagicNumber(0),
@@ -35,10 +38,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
NdbReceiver::~NdbReceiver()
{
+ DBUG_ENTER("NdbReceiver::~NdbReceiver");
if (m_id != NdbObjectIdMap::InvalidId) {
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
}
delete[] m_rows;
+ DBUG_VOID_RETURN;
}
void
@@ -87,7 +92,52 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
return 0;
}
-#define KEY_ATTR_ID (~0)
+#define KEY_ATTR_ID (~(Uint32)0)
+
+void
+NdbReceiver::calculate_batch_size(Uint32 key_size,
+ Uint32 parallelism,
+ Uint32& batch_size,
+ Uint32& batch_byte_size,
+ Uint32& first_batch_size)
+{
+ TransporterFacade *tp= TransporterFacade::instance();
+ Uint32 max_scan_batch_size= tp->get_scan_batch_size();
+ Uint32 max_batch_byte_size= tp->get_batch_byte_size();
+ Uint32 max_batch_size= tp->get_batch_size();
+ Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
+ NdbRecAttr *rec_attr= theFirstRecAttr;
+ while (rec_attr != NULL) {
+ Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize();
+ attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
+ tot_size+= attr_size;
+ rec_attr= rec_attr->next();
+ }
+ tot_size+= 32; //include signal overhead
+
+ /**
+ * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
+ * bytes sent for each batch from each node. We do however ensure that
+ * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
+ * batch.
+ */
+ batch_byte_size= max_batch_byte_size;
+ if (batch_byte_size * parallelism > max_scan_batch_size) {
+ batch_byte_size= max_scan_batch_size / parallelism;
+ }
+ batch_size= batch_byte_size / tot_size;
+ if (batch_size == 0) {
+ batch_size= 1;
+ } else {
+ if (batch_size > max_batch_size) {
+ batch_size= max_batch_size;
+ } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
+ batch_size= MAX_PARALLEL_OP_PER_SCAN;
+ }
+ }
+ first_batch_size= batch_size;
+ return;
+}
void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
@@ -139,7 +189,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
}
prepareSend();
- return ; //0;
+ return;
}
void
@@ -200,10 +250,11 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
/**
* Update m_received_result_length
*/
+ Uint32 exp = m_expected_result_length;
Uint32 tmp = m_received_result_length + aLength;
m_received_result_length = tmp;
- return (tmp == m_expected_result_length ? 1 : 0);
+ return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0);
}
int
@@ -223,3 +274,11 @@ NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
return (tmp == m_expected_result_length ? 1 : 0);
}
+
+void
+NdbReceiver::setErrorCode(int code)
+{
+ theMagicNumber = 0;
+ NdbOperation* op = (NdbOperation*)getOwner();
+ op->setErrorCode(code);
+}