summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbReceiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbReceiver.cpp')
-rw-r--r--ndb/src/ndbapi/NdbReceiver.cpp185
1 files changed, 182 insertions, 3 deletions
diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp
index 4c461698a4a..bdb5e6c7e78 100644
--- a/ndb/src/ndbapi/NdbReceiver.cpp
+++ b/ndb/src/ndbapi/NdbReceiver.cpp
@@ -16,6 +16,10 @@
#include "NdbImpl.hpp"
#include <NdbReceiver.hpp>
+#include "NdbDictionaryImpl.hpp"
+#include <NdbRecAttr.hpp>
+#include <AttributeHeader.hpp>
+#include <NdbConnection.hpp>
NdbReceiver::NdbReceiver(Ndb *aNdb) :
theMagicNumber(0),
@@ -24,8 +28,19 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
m_type(NDB_UNINITIALIZED),
m_owner(0)
{
+ theCurrentRecAttr = theFirstRecAttr = 0;
+ m_defined_rows = 0;
+ m_rows = new NdbRecAttr*[0];
}
+NdbReceiver::~NdbReceiver()
+{
+ if (m_id != NdbObjectIdMap::InvalidId) {
+ m_ndb->theNdbObjectIdMap->unmap(m_id, this);
+ }
+ delete[] m_rows;
+}
+
void
NdbReceiver::init(ReceiverType type, void* owner)
{
@@ -36,11 +51,175 @@ NdbReceiver::init(ReceiverType type, void* owner)
if (m_ndb)
m_id = m_ndb->theNdbObjectIdMap->map(this);
}
+
+ theFirstRecAttr = NULL;
+ theCurrentRecAttr = NULL;
+}
+
+void
+NdbReceiver::release(){
+ NdbRecAttr* tRecAttr = theFirstRecAttr;
+ while (tRecAttr != NULL)
+ {
+ NdbRecAttr* tSaveRecAttr = tRecAttr;
+ tRecAttr = tRecAttr->next();
+ m_ndb->releaseRecAttr(tSaveRecAttr);
+ }
+ theFirstRecAttr = NULL;
+ theCurrentRecAttr = NULL;
}
-NdbReceiver::~NdbReceiver()
+NdbRecAttr *
+NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
+ NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
+ if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
+ if (theFirstRecAttr == NULL)
+ theFirstRecAttr = tRecAttr;
+ else
+ theCurrentRecAttr->next(tRecAttr);
+ theCurrentRecAttr = tRecAttr;
+ tRecAttr->next(NULL);
+ return tRecAttr;
+ }
+ if(tRecAttr){
+ m_ndb->releaseRecAttr(tRecAttr);
+ }
+ return 0;
+}
+
+#define KEY_ATTR_ID (~0)
+
+void
+NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
+ if(rows > m_defined_rows){
+ delete[] m_rows;
+ m_defined_rows = rows;
+ m_rows = new NdbRecAttr*[rows + 1];
+ }
+ m_rows[rows] = 0;
+
+ NdbColumnImpl key;
+ if(key_size){
+ key.m_attrId = KEY_ATTR_ID;
+ key.m_arraySize = key_size+1;
+ key.m_attrSize = 4;
+ key.m_nullable = true; // So that receive works w.r.t KEYINFO20
+ }
+ m_key_info = key_size;
+
+ for(Uint32 i = 0; i<rows; i++){
+ NdbRecAttr * prev = theCurrentRecAttr;
+ assert(prev == 0 || i > 0);
+
+ // Put key-recAttr fir on each row
+ if(key_size && !getValue(&key, (char*)0)){
+ abort();
+ return ; // -1
+ }
+
+ NdbRecAttr* tRecAttr = org->theFirstRecAttr;
+ while(tRecAttr != 0){
+ if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
+ tRecAttr = tRecAttr->next();
+ else
+ break;
+ }
+
+ if(tRecAttr){
+ abort();
+ return ;// -1;
+ }
+
+ // Store first recAttr for each row in m_rows[i]
+ if(prev){
+ m_rows[i] = prev->next();
+ } else {
+ m_rows[i] = theFirstRecAttr;
+ }
+ }
+
+ prepareSend();
+ return ; //0;
+}
+
+void
+NdbReceiver::copyout(NdbReceiver & dstRec){
+ NdbRecAttr* src = m_rows[m_current_row++];
+ NdbRecAttr* dst = dstRec.theFirstRecAttr;
+ Uint32 tmp = m_key_info;
+ if(tmp > 0){
+ src = src->next();
+ }
+
+ while(dst){
+ Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4;
+ dst->receive_data((Uint32*)src->aRef(), src->isNULL() ? 0 : len);
+ src = src->next();
+ dst = dst->next();
+ }
+}
+
+int
+NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
{
- if (m_id != NdbObjectIdMap::InvalidId) {
- m_ndb->theNdbObjectIdMap->unmap(m_id, this);
+ bool ok = true;
+ NdbRecAttr* currRecAttr = theCurrentRecAttr;
+
+ for (Uint32 used = 0; used < aLength ; used++){
+ AttributeHeader ah(* aDataPtr++);
+ const Uint32 tAttrId = ah.getAttributeId();
+ const Uint32 tAttrSize = ah.getDataSize();
+
+ /**
+ * Set all results to NULL if not found...
+ */
+ while(currRecAttr && currRecAttr->attrId() != tAttrId){
+ ok &= currRecAttr->setNULL();
+ currRecAttr = currRecAttr->next();
+ }
+
+ if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
+ used += tAttrSize;
+ aDataPtr += tAttrSize;
+ currRecAttr = currRecAttr->next();
+ } else {
+ ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p",
+ this,ok, tAttrId, currRecAttr);
+ currRecAttr = theCurrentRecAttr;
+ while(currRecAttr != 0){
+ ndbout_c("%d ", currRecAttr->attrId());
+ currRecAttr = currRecAttr->next();
+ }
+ abort();
+ return -1;
+ }
}
+
+ theCurrentRecAttr = currRecAttr;
+
+ /**
+ * Update m_received_result_length
+ */
+ Uint32 tmp = m_received_result_length + aLength;
+ m_received_result_length = tmp;
+
+ return (tmp == m_expected_result_length ? 1 : 0);
+}
+
+int
+NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
+{
+ NdbRecAttr* currRecAttr = m_rows[m_current_row++];
+ assert(currRecAttr->attrId() == KEY_ATTR_ID);
+ currRecAttr->receive_data(aDataPtr, aLength + 1);
+
+ /**
+ * Save scanInfo in the end of keyinfo
+ */
+ ((Uint32*)currRecAttr->aRef())[aLength] = info;
+
+ Uint32 tmp = m_received_result_length + aLength;
+ m_received_result_length = tmp;
+
+ return (tmp == m_expected_result_length ? 1 : 0);
}