summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbBlob.cpp
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-10-31 15:33:56 +0100
committerunknown <pekka@mysql.com>2004-10-31 15:33:56 +0100
commite4f0614cf40ae894290b2a792d22ed6b0992c6c5 (patch)
tree6eec39e305bbb9a82a9a4f1a5d4ba6697559f1a1 /ndb/src/ndbapi/NdbBlob.cpp
parent2e7b38016c951d982261f24fc23ca542f7edb39b (diff)
downloadmariadb-git-e4f0614cf40ae894290b2a792d22ed6b0992c6c5.tar.gz
NDB bug-6018 support writeTuple with blobs
mysql-test/r/ndb_blob.result: bug-6018 mysql-test/t/ndb_blob.test: bug-6018 ndb/include/ndbapi/NdbBlob.hpp: bug-6018 ndb/include/ndbapi/NdbConnection.hpp: bug-6018 ndb/include/ndbapi/NdbIndexOperation.hpp: bug-6018 ndb/include/ndbapi/NdbOperation.hpp: bug-6018 ndb/src/ndbapi/NdbBlob.cpp: bug-6018 ndb/src/ndbapi/NdbConnection.cpp: bug-6018 ndb/src/ndbapi/NdbDictionaryImpl.cpp: bug-6018 ndb/src/ndbapi/NdbIndexOperation.cpp: bug-6018 ndb/src/ndbapi/NdbOperation.cpp: bug-6018 ndb/src/ndbapi/NdbOperationExec.cpp: bug-6018 ndb/test/ndbapi/testBlobs.cpp: bug-6018
Diffstat (limited to 'ndb/src/ndbapi/NdbBlob.cpp')
-rw-r--r--ndb/src/ndbapi/NdbBlob.cpp359
1 files changed, 247 insertions, 112 deletions
diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp
index feab95d8ca5..5b193870558 100644
--- a/ndb/src/ndbapi/NdbBlob.cpp
+++ b/ndb/src/ndbapi/NdbBlob.cpp
@@ -33,21 +33,24 @@
ndbout << prefix << " " << hex << (void*)this << " " << cname; \
ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \
} while (0)
-#else
-#define DBG(x)
-#endif
static char*
ndb_blob_debug(const Uint32* data, unsigned size)
{
- static char buf[128 + 1]; // MT irrelevant
+ static char buf[200]; // MT irrelevant
buf[0] = 0;
- for (unsigned i = 0; i < size && i < 128 / 4; i++) {
- sprintf(buf + strlen(buf), "%*s%08x", i != 0, "", data[i]);
+ for (unsigned i = 0; i < size; i++) {
+ unsigned n = strlen(buf);
+ if (n + 10 < sizeof(buf))
+ sprintf(buf + n, "%*s%08x", i != 0, "", data[i]);
}
return buf;
}
+#else
+#define DBG(x)
+#endif
+
/*
* Reading index table directly (as a table) is faster but there are
* bugs or limitations. Keep the code and make possible to choose.
@@ -162,6 +165,7 @@ NdbBlob::init()
theHead = NULL;
theInlineData = NULL;
theHeadInlineRecAttr = NULL;
+ theHeadInlineReadOp = NULL;
theHeadInlineUpdateFlag = false;
theNullFlag = -1;
theLength = 0;
@@ -206,6 +210,13 @@ NdbBlob::Buf::alloc(unsigned n)
#endif
}
+void
+NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
+{
+ assert(size == src.size);
+ memcpy(data, src.data, size);
+}
+
// classify operations (inline)
inline bool
@@ -226,6 +237,7 @@ NdbBlob::isKeyOp()
return
theNdbOp->theOperationType == NdbOperation::InsertRequest ||
theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
+ theNdbOp->theOperationType == NdbOperation::WriteRequest ||
theNdbOp->theOperationType == NdbOperation::ReadRequest ||
theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
theNdbOp->theOperationType == NdbOperation::DeleteRequest;
@@ -254,6 +266,13 @@ NdbBlob::isUpdateOp()
}
inline bool
+NdbBlob::isWriteOp()
+{
+ return
+ theNdbOp->theOperationType == NdbOperation::WriteRequest;
+}
+
+inline bool
NdbBlob::isDeleteOp()
{
return
@@ -289,7 +308,7 @@ inline Uint32
NdbBlob::getDistKey(Uint32 part)
{
assert(theStripeSize != 0);
- return (part / theStripeSize) % theStripeSize;
+ return part / theStripeSize;
}
// getters and setters
@@ -401,7 +420,7 @@ NdbBlob::getHeadFromRecAttr()
theNullFlag = theHeadInlineRecAttr->isNULL();
assert(theNullFlag != -1);
theLength = ! theNullFlag ? theHead->length : 0;
- DBG("getHeadFromRecAttr out");
+ DBG("getHeadFromRecAttr [out]");
}
int
@@ -453,7 +472,7 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
setErrorCode(ErrState);
return -1;
}
- if (! isInsertOp() && ! isUpdateOp()) {
+ if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) {
setErrorCode(ErrUsage);
return -1;
}
@@ -466,11 +485,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
theGetSetBytes = bytes;
if (isInsertOp()) {
// write inline part now
- if (theSetBuf != 0) {
- unsigned n = theGetSetBytes;
+ if (theSetBuf != NULL) {
+ Uint32 n = theGetSetBytes;
if (n > theInlineSize)
n = theInlineSize;
- if (writeDataPrivate(0, theSetBuf, n) == -1)
+ assert(thePos == 0);
+ if (writeDataPrivate(theSetBuf, n) == -1)
return -1;
} else {
theNullFlag = true;
@@ -555,7 +575,7 @@ NdbBlob::getLength(Uint64& len)
int
NdbBlob::truncate(Uint64 length)
{
- DBG("truncate length=" << length);
+ DBG("truncate [in] length=" << length);
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
@@ -573,7 +593,10 @@ NdbBlob::truncate(Uint64 length)
}
theLength = length;
theHeadInlineUpdateFlag = true;
+ if (thePos > length)
+ thePos = length;
}
+ DBG("truncate [out]");
return 0;
}
@@ -610,32 +633,20 @@ NdbBlob::setPos(Uint64 pos)
int
NdbBlob::readData(void* data, Uint32& bytes)
{
- if (readData(thePos, data, bytes) == -1)
- return -1;
- thePos += bytes;
- assert(thePos <= theLength);
- return 0;
-}
-
-int
-NdbBlob::readData(Uint64 pos, void* data, Uint32& bytes)
-{
if (theState != Active) {
setErrorCode(ErrState);
return -1;
}
char* buf = static_cast<char*>(data);
- return readDataPrivate(pos, buf, bytes);
+ return readDataPrivate(buf, bytes);
}
int
-NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
+NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
{
- DBG("readData pos=" << pos << " bytes=" << bytes);
- if (pos > theLength) {
- setErrorCode(ErrSeek);
- return -1;
- }
+ DBG("readData [in] bytes=" << bytes);
+ assert(thePos <= theLength);
+ Uint64 pos = thePos;
if (bytes > theLength - pos)
bytes = theLength - pos;
Uint32 len = bytes;
@@ -709,38 +720,29 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
len -= n;
}
assert(len == 0);
- return 0;
-}
-
-int
-NdbBlob::writeData(const void* data, Uint32 bytes)
-{
- if (writeData(thePos, data, bytes) == -1)
- return -1;
- thePos += bytes;
+ thePos = pos;
assert(thePos <= theLength);
+ DBG("readData [out]");
return 0;
}
int
-NdbBlob::writeData(Uint64 pos, const void* data, Uint32 bytes)
+NdbBlob::writeData(const void* data, Uint32 bytes)
{
if (theState != Active) {
setErrorCode(ErrState);
return -1;
}
const char* buf = static_cast<const char*>(data);
- return writeDataPrivate(pos, buf, bytes);
+ return writeDataPrivate(buf, bytes);
}
int
-NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
+NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes)
{
- DBG("writeData pos=" << pos << " bytes=" << bytes);
- if (pos > theLength) {
- setErrorCode(ErrSeek);
- return -1;
- }
+ DBG("writeData [in] bytes=" << bytes);
+ assert(thePos <= theLength);
+ Uint64 pos = thePos;
Uint32 len = bytes;
// any write makes blob not NULL
if (theNullFlag) {
@@ -778,7 +780,7 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
// need result now
- DBG("execute pending part reafs");
+ DBG("execute pending part reads");
if (executePendingBlobReads() == -1)
return -1;
Uint32 n = thePartSize - off;
@@ -855,14 +857,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
theLength = pos;
theHeadInlineUpdateFlag = true;
}
- DBG("writeData out");
+ thePos = pos;
+ assert(thePos <= theLength);
+ DBG("writeData [out]");
return 0;
}
int
NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
{
- DBG("readParts part=" << part << " count=" << count);
+ DBG("readParts [in] part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -873,6 +877,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
return -1;
}
+ tOp->m_abortOption = AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
@@ -884,7 +889,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
int
NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
{
- DBG("insertParts part=" << part << " count=" << count);
+ DBG("insertParts [in] part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -895,6 +900,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
return -1;
}
+ tOp->m_abortOption = AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
@@ -906,7 +912,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
int
NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
{
- DBG("updateParts part=" << part << " count=" << count);
+ DBG("updateParts [in] part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -917,6 +923,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
setErrorCode(tOp);
return -1;
}
+ tOp->m_abortOption = AbortOnError;
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
@@ -928,7 +935,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
int
NdbBlob::deleteParts(Uint32 part, Uint32 count)
{
- DBG("deleteParts part=" << part << " count=" << count);
+ DBG("deleteParts [in] part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -938,6 +945,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
setErrorCode(tOp);
return -1;
}
+ tOp->m_abortOption = AbortOnError;
n++;
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
@@ -945,6 +953,57 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
return 0;
}
+/*
+ * Number of blob parts not known. Used to check for race condition
+ * when writeTuple is used for insert. Deletes all parts found.
+ */
+int
+NdbBlob::deletePartsUnknown(Uint32 part)
+{
+ DBG("deletePartsUnknown [in] part=" << part << " count=all");
+ static const unsigned maxbat = 256;
+ static const unsigned minbat = 1;
+ unsigned bat = minbat;
+ NdbOperation* tOpList[maxbat];
+ Uint32 count = 0;
+ while (true) {
+ Uint32 n;
+ n = 0;
+ while (n < bat) {
+ NdbOperation*& tOp = tOpList[n]; // ref
+ tOp = theNdbCon->getNdbOperation(theBlobTable);
+ if (tOp == NULL ||
+ tOp->deleteTuple() == -1 ||
+ setPartKeyValue(tOp, part + count + n) == -1) {
+ setErrorCode(tOp);
+ return -1;
+ }
+ tOp->m_abortOption = IgnoreError;
+ n++;
+ if (theNdbCon->executeNoBlobs(NoCommit) == -1)
+ return -1;
+ }
+ n = 0;
+ while (n < bat) {
+ NdbOperation* tOp = tOpList[n];
+ if (tOp->theError.code != 0) {
+ if (tOp->theError.code != 626) {
+ setErrorCode(tOp);
+ return -1;
+ }
+ // first non-existent part
+ DBG("deletePartsUnknown [out] count=" << count);
+ return 0;
+ }
+ n++;
+ count++;
+ }
+ bat *= 4;
+ if (bat > maxbat)
+ bat = maxbat;
+ }
+}
+
// pending ops
int
@@ -1007,7 +1066,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theTable = anOp->m_currentTable;
theAccessTable = anOp->m_accessTable;
theColumn = aColumn;
- DBG("atPrepare");
+ DBG("atPrepare [in]");
NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
switch (theColumn->getType()) {
case NdbDictionary::Column::Blob:
@@ -1046,6 +1105,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2);
theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
+ theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
thePartBuf.alloc(thePartSize);
theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head);
@@ -1080,6 +1140,12 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theNullFlag = true;
theLength = 0;
}
+ if (isWriteOp()) {
+ // becomes NULL unless set before execute
+ theNullFlag = true;
+ theLength = 0;
+ theHeadInlineUpdateFlag = true;
+ }
supportedOp = true;
}
if (isScanOp()) {
@@ -1093,19 +1159,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
return -1;
}
setState(Prepared);
- DBG("atPrepare out");
+ DBG("atPrepare [out]");
return 0;
}
/*
* Before execute of prepared operation. May add new operations before
* this one. May ask that this operation and all before it (a "batch")
- * is executed immediately in no-commit mode.
+ * is executed immediately in no-commit mode. In this case remaining
+ * prepared operations are saved in a separate list. They are added
+ * back after postExecute.
*/
int
NdbBlob::preExecute(ExecType anExecType, bool& batch)
{
- DBG("preExecute");
+ DBG("preExecute [in]");
if (theState == Invalid)
return -1;
assert(theState == Prepared);
@@ -1120,11 +1188,11 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
if (isInsertOp()) {
if (theSetFlag && theGetSetBytes > theInlineSize) {
// add ops to write rest of a setValue
- assert(theSetBuf != 0);
- Uint64 pos = theInlineSize;
+ assert(theSetBuf != NULL);
const char* buf = theSetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
- if (writeDataPrivate(pos, buf, bytes) == -1)
+ assert(thePos == theInlineSize);
+ if (writeDataPrivate(buf, bytes) == -1)
return -1;
if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline
@@ -1136,11 +1204,12 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(ErrAbort);
return -1;
}
+ DBG("add op to update head+inline");
}
}
}
if (isTableOp()) {
- if (isUpdateOp() || isDeleteOp()) {
+ if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
// add operation before this one to read head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
if (tOp == NULL ||
@@ -1150,8 +1219,13 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(tOp);
return -1;
}
+ if (isWriteOp()) {
+ tOp->m_abortOption = IgnoreError;
+ }
+ theHeadInlineReadOp = tOp;
// execute immediately
batch = true;
+ DBG("add op before to read head+inline");
}
}
if (isIndexOp()) {
@@ -1180,6 +1254,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
}
}
}
+ DBG("added op before to read table key");
if (isUpdateOp() || isDeleteOp()) {
// add op before this one to read head+inline via index
NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp);
@@ -1190,15 +1265,43 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
setErrorCode(tOp);
return -1;
}
+ if (isWriteOp()) {
+ tOp->m_abortOption = IgnoreError;
+ }
+ theHeadInlineReadOp = tOp;
// execute immediately
batch = true;
+ DBG("added index op before to read head+inline");
+ }
+ if (isWriteOp()) {
+ // XXX until IgnoreError fixed for index op
+ batch = true;
+ }
+ }
+ if (isWriteOp()) {
+ if (theSetFlag) {
+ // write head+inline now
+ theNullFlag = true;
+ theLength = 0;
+ if (theSetBuf != NULL) {
+ Uint32 n = theGetSetBytes;
+ if (n > theInlineSize)
+ n = theInlineSize;
+ assert(thePos == 0);
+ if (writeDataPrivate(theSetBuf, n) == -1)
+ return -1;
+ }
+ if (setHeadInlineValue(theNdbOp) == -1)
+ return -1;
+ // the read op before us may overwrite
+ theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf);
}
}
if (theActiveHook != NULL) {
// need blob head for callback
batch = true;
}
- DBG("preExecute out batch=" << batch);
+ DBG("preExecute [out] batch=" << batch);
return 0;
}
@@ -1211,15 +1314,16 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
int
NdbBlob::postExecute(ExecType anExecType)
{
- DBG("postExecute type=" << anExecType);
+ DBG("postExecute [in] type=" << anExecType);
if (theState == Invalid)
return -1;
if (theState == Active) {
setState(anExecType == NoCommit ? Active : Closed);
- DBG("postExecute skip");
+ DBG("postExecute [skip]");
return 0;
}
assert(theState == Prepared);
+ setState(anExecType == NoCommit ? Active : Closed);
assert(isKeyOp());
if (isIndexOp()) {
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
@@ -1231,22 +1335,13 @@ NdbBlob::postExecute(ExecType anExecType)
}
if (isReadOp()) {
getHeadFromRecAttr();
- if (theGetFlag && theGetSetBytes > 0) {
- // copy inline bytes to user buffer
- assert(theGetBuf != NULL);
- unsigned n = theGetSetBytes;
- if (n > theInlineSize)
- n = theInlineSize;
- memcpy(theGetBuf, theInlineData, n);
- }
- if (theGetFlag && theGetSetBytes > theInlineSize) {
- // add ops to read rest of a getValue
- assert(anExecType == NoCommit);
- assert(theGetBuf != 0);
- Uint64 pos = theInlineSize;
- char* buf = theGetBuf + theInlineSize;
- Uint32 bytes = theGetSetBytes - theInlineSize;
- if (readDataPrivate(pos, buf, bytes) == -1)
+ if (setPos(0) == -1)
+ return -1;
+ if (theGetFlag) {
+ assert(theGetSetBytes == 0 || theGetBuf != 0);
+ assert(theGetSetBytes <= theInlineSize || anExecType == NoCommit);
+ Uint32 bytes = theGetSetBytes;
+ if (readDataPrivate(theGetBuf, bytes) == -1)
return -1;
}
}
@@ -1255,10 +1350,11 @@ NdbBlob::postExecute(ExecType anExecType)
getHeadFromRecAttr();
if (theSetFlag) {
// setValue overwrites everything
- if (theSetBuf != 0) {
+ if (theSetBuf != NULL) {
if (truncate(0) == -1)
return -1;
- if (writeDataPrivate(0, theSetBuf, theGetSetBytes) == -1)
+ assert(thePos == 0);
+ if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1)
return -1;
} else {
if (setNull() == -1)
@@ -1266,6 +1362,57 @@ NdbBlob::postExecute(ExecType anExecType)
}
}
}
+ if (isWriteOp() && isTableOp()) {
+ assert(anExecType == NoCommit);
+ if (theHeadInlineReadOp->theError.code == 0) {
+ int tNullFlag = theNullFlag;
+ Uint64 tLength = theLength;
+ Uint64 tPos = thePos;
+ getHeadFromRecAttr();
+ DBG("tuple found");
+ if (truncate(0) == -1)
+ return -1;
+ // restore previous head+inline
+ theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf);
+ theNullFlag = tNullFlag;
+ theLength = tLength;
+ thePos = tPos;
+ } else {
+ if (theHeadInlineReadOp->theError.code != 626) {
+ setErrorCode(theHeadInlineReadOp);
+ return -1;
+ }
+ DBG("tuple not found");
+ /*
+ * Read found no tuple but it is possible that a tuple was
+ * created after the read by another transaction. Delete all
+ * blob parts which may exist.
+ */
+ if (deletePartsUnknown(0) == -1)
+ return -1;
+ }
+ if (theSetFlag && theGetSetBytes > theInlineSize) {
+ assert(theSetBuf != NULL);
+ const char* buf = theSetBuf + theInlineSize;
+ Uint32 bytes = theGetSetBytes - theInlineSize;
+ assert(thePos == theInlineSize);
+ if (writeDataPrivate(buf, bytes) == -1)
+ return -1;
+ }
+ }
+ if (isWriteOp() && isIndexOp()) {
+ // XXX until IgnoreError fixed for index op
+ if (deletePartsUnknown(0) == -1)
+ return -1;
+ if (theSetFlag && theGetSetBytes > theInlineSize) {
+ assert(theSetBuf != NULL);
+ const char* buf = theSetBuf + theInlineSize;
+ Uint32 bytes = theGetSetBytes - theInlineSize;
+ assert(thePos == theInlineSize);
+ if (writeDataPrivate(buf, bytes) == -1)
+ return -1;
+ }
+ }
if (isDeleteOp()) {
assert(anExecType == NoCommit);
getHeadFromRecAttr();
@@ -1278,7 +1425,7 @@ NdbBlob::postExecute(ExecType anExecType)
if (invokeActiveHook() == -1)
return -1;
}
- DBG("postExecute out");
+ DBG("postExecute [out]");
return 0;
}
@@ -1289,12 +1436,12 @@ NdbBlob::postExecute(ExecType anExecType)
int
NdbBlob::preCommit()
{
- DBG("preCommit");
+ DBG("preCommit [in]");
if (theState == Invalid)
return -1;
assert(theState == Active);
assert(isKeyOp());
- if (isInsertOp() || isUpdateOp()) {
+ if (isInsertOp() || isUpdateOp() || isWriteOp()) {
if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
@@ -1305,9 +1452,11 @@ NdbBlob::preCommit()
setErrorCode(ErrAbort);
return -1;
}
+ tOp->m_abortOption = AbortOnError;
+ DBG("added op to update head+inline");
}
}
- DBG("preCommit out");
+ DBG("preCommit [out]");
return 0;
}
@@ -1317,13 +1466,10 @@ NdbBlob::preCommit()
int
NdbBlob::atNextResult()
{
- DBG("atNextResult");
+ DBG("atNextResult [in]");
if (theState == Invalid)
return -1;
assert(isScanOp());
- getHeadFromRecAttr();
- // reset position
- thePos = 0;
// get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords;
@@ -1332,26 +1478,14 @@ NdbBlob::atNextResult()
return -1;
}
}
- if (! theNullFlag) {
- if (theGetFlag && theGetSetBytes > 0) {
- // copy inline bytes to user buffer
- assert(theGetBuf != NULL);
- unsigned n = theGetSetBytes;
- if (n > theLength)
- n = theLength;
- if (n > theInlineSize)
- n = theInlineSize;
- memcpy(theGetBuf, theInlineData, n);
- }
- if (theGetFlag && theGetSetBytes > theInlineSize && theLength > theInlineSize) {
- // add ops to read rest of a getValue
- assert(theGetBuf != 0);
- Uint64 pos = theInlineSize;
- char* buf = theGetBuf + theInlineSize;
- Uint32 bytes = theGetSetBytes - theInlineSize;
- if (readDataPrivate(pos, buf, bytes) == -1)
- return -1;
- }
+ getHeadFromRecAttr();
+ if (setPos(0) == -1)
+ return -1;
+ if (theGetFlag) {
+ assert(theGetSetBytes == 0 || theGetBuf != 0);
+ Uint32 bytes = theGetSetBytes;
+ if (readDataPrivate(theGetBuf, bytes) == -1)
+ return -1;
}
setState(Active);
// activation callback
@@ -1359,7 +1493,7 @@ NdbBlob::atNextResult()
if (invokeActiveHook() == -1)
return -1;
}
- DBG("atNextResult out");
+ DBG("atNextResult [out]");
return 0;
}
@@ -1444,7 +1578,8 @@ operator<<(NdbOut& out, const NdbBlob& blob)
ndbout << dec << " n=" << blob.theNullFlag;;
ndbout << dec << " l=" << blob.theLength;
ndbout << dec << " p=" << blob.thePos;
- ndbout << dec << " u=" << (Uint32) blob.theHeadInlineUpdateFlag;
+ ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag;
+ ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes;
return out;
}
#endif