diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbReceiver.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbReceiver.cpp | 65 |
1 files changed, 62 insertions, 3 deletions
diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index bdb5e6c7e78..14f8d4b8440 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -14,12 +14,15 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include <ndb_global.h> #include "NdbImpl.hpp" #include <NdbReceiver.hpp> #include "NdbDictionaryImpl.hpp" #include <NdbRecAttr.hpp> #include <AttributeHeader.hpp> #include <NdbConnection.hpp> +#include <TransporterFacade.hpp> +#include <signaldata/TcKeyConf.hpp> NdbReceiver::NdbReceiver(Ndb *aNdb) : theMagicNumber(0), @@ -35,10 +38,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) : NdbReceiver::~NdbReceiver() { + DBUG_ENTER("NdbReceiver::~NdbReceiver"); if (m_id != NdbObjectIdMap::InvalidId) { m_ndb->theNdbObjectIdMap->unmap(m_id, this); } delete[] m_rows; + DBUG_VOID_RETURN; } void @@ -87,7 +92,52 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){ return 0; } -#define KEY_ATTR_ID (~0) +#define KEY_ATTR_ID (~(Uint32)0) + +void +NdbReceiver::calculate_batch_size(Uint32 key_size, + Uint32 parallelism, + Uint32& batch_size, + Uint32& batch_byte_size, + Uint32& first_batch_size) +{ + TransporterFacade *tp= TransporterFacade::instance(); + Uint32 max_scan_batch_size= tp->get_scan_batch_size(); + Uint32 max_batch_byte_size= tp->get_batch_byte_size(); + Uint32 max_batch_size= tp->get_batch_size(); + Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead + NdbRecAttr *rec_attr= theFirstRecAttr; + while (rec_attr != NULL) { + Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize(); + attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead + tot_size+= attr_size; + rec_attr= rec_attr->next(); + } + tot_size+= 32; //include signal overhead + + /** + * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE + * bytes sent for each batch from each node. We do however ensure that + * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per + * batch. + */ + batch_byte_size= max_batch_byte_size; + if (batch_byte_size * parallelism > max_scan_batch_size) { + batch_byte_size= max_scan_batch_size / parallelism; + } + batch_size= batch_byte_size / tot_size; + if (batch_size == 0) { + batch_size= 1; + } else { + if (batch_size > max_batch_size) { + batch_size= max_batch_size; + } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) { + batch_size= MAX_PARALLEL_OP_PER_SCAN; + } + } + first_batch_size= batch_size; + return; +} void NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ @@ -139,7 +189,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ } prepareSend(); - return ; //0; + return; } void @@ -200,10 +250,11 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength) /** * Update m_received_result_length */ + Uint32 exp = m_expected_result_length; Uint32 tmp = m_received_result_length + aLength; m_received_result_length = tmp; - return (tmp == m_expected_result_length ? 1 : 0); + return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0); } int @@ -223,3 +274,11 @@ NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength) return (tmp == m_expected_result_length ? 1 : 0); } + +void +NdbReceiver::setErrorCode(int code) +{ + theMagicNumber = 0; + NdbOperation* op = (NdbOperation*)getOwner(); + op->setErrorCode(code); +} |