summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbBlob.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbBlob.cpp')
-rw-r--r--ndb/src/ndbapi/NdbBlob.cpp291
1 files changed, 196 insertions, 95 deletions
diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp
index 8e067f770e8..7939f54d846 100644
--- a/ndb/src/ndbapi/NdbBlob.cpp
+++ b/ndb/src/ndbapi/NdbBlob.cpp
@@ -14,23 +14,25 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "Ndb.hpp"
-#include "NdbDictionaryImpl.hpp"
-#include "NdbConnection.hpp"
-#include "NdbOperation.hpp"
-#include "NdbIndexOperation.hpp"
-#include "NdbRecAttr.hpp"
-#include "NdbBlob.hpp"
+#include <Ndb.hpp>
+#include <NdbDictionaryImpl.hpp>
+#include <NdbConnection.hpp>
+#include <NdbOperation.hpp>
+#include <NdbIndexOperation.hpp>
+#include <NdbRecAttr.hpp>
+#include <NdbBlob.hpp>
+#include <NdbScanOperation.hpp>
#ifdef NDB_BLOB_DEBUG
#define DBG(x) \
do { \
static const char* p = getenv("NDB_BLOB_DEBUG"); \
if (p == 0 || *p == 0 || *p == '0') break; \
- const char* cname = theColumn == NULL ? "BLOB" : theColumn->m_name.c_str(); \
- ndbout << cname << " " << __LINE__ << " " << x << " " << *this << endl; \
+ static char* prefix = "BLOB"; \
+ const char* cname = theColumn == NULL ? "-" : theColumn->m_name.c_str(); \
+ ndbout << prefix << " " << hex << (void*)this << " " << cname; \
+ ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \
} while (0)
-#define EXE() assert(theNdbCon->executeNoBlobs(NoCommit) == 0)
#else
#define DBG(x)
#endif
@@ -48,7 +50,7 @@ ndb_blob_debug(const Uint32* data, unsigned size)
/*
* Reading index table directly (as a table) is faster but there are
- * bugs or limitations. Keep the code but make possible to choose.
+ * bugs or limitations. Keep the code and make possible to choose.
*/
static const bool g_ndb_blob_ok_to_read_index_table = false;
@@ -81,7 +83,7 @@ NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnIm
{
assert(t != 0 && c != 0 && c->getBlobType());
memset(btname, 0, BlobTableNameSize);
- sprintf(btname, "NDB$BLOB_%d_%d_%d", (int)t->m_tableId, (int)t->m_version, (int)c->m_attrId);
+ sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_tableId, (int)c->m_attrId);
}
void
@@ -115,7 +117,7 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
case NdbDictionary::Column::Blob:
bc.setType(NdbDictionary::Column::Binary);
break;
- case NdbDictionary::Column::Clob:
+ case NdbDictionary::Column::Text:
bc.setType(NdbDictionary::Column::Char);
break;
default:
@@ -138,12 +140,12 @@ void
NdbBlob::init()
{
theState = Idle;
- theBlobTableName[0] = 0;
theNdb = NULL;
theNdbCon = NULL;
theNdbOp = NULL;
theTable = NULL;
theAccessTable = NULL;
+ theBlobTable = NULL;
theColumn = NULL;
theFillChar = 0;
theInlineSize = 0;
@@ -154,11 +156,13 @@ NdbBlob::init()
theSetFlag = false;
theSetBuf = NULL;
theGetSetBytes = 0;
+ thePendingBlobOps = 0;
+ theActiveHook = NULL;
+ theActiveHookArg = NULL;
theHead = NULL;
theInlineData = NULL;
theHeadInlineRecAttr = NULL;
theHeadInlineUpdateFlag = false;
- theNewPartFlag = false;
theNullFlag = -1;
theLength = 0;
thePos = 0;
@@ -269,7 +273,7 @@ NdbBlob::isScanOp()
inline Uint32
NdbBlob::getPartNumber(Uint64 pos)
{
- assert(pos >= theInlineSize);
+ assert(thePartSize != 0 && pos >= theInlineSize);
return (pos - theInlineSize) / thePartSize;
}
@@ -301,7 +305,7 @@ NdbBlob::getTableKeyValue(NdbOperation* anOp)
assert(c != NULL);
if (c->m_pk) {
unsigned len = c->m_attrSize * c->m_arraySize;
- if (anOp->getValue(c, (char*)&data[pos]) == NULL) {
+ if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) {
setErrorCode(anOp);
return -1;
}
@@ -321,10 +325,10 @@ int
NdbBlob::setTableKeyValue(NdbOperation* anOp)
{
const Uint32* data = (const Uint32*)theKeyBuf.data;
+ DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_sizeOfKeysInWords));
+ const unsigned columns = theTable->m_columns.size();
unsigned pos = 0;
- const unsigned size = theTable->m_columns.size();
- DBG("setTableKeyValue key=" << ndb_blob_debug(data, size));
- for (unsigned i = 0; i < size; i++) {
+ for (unsigned i = 0; i < columns; i++) {
NdbColumnImpl* c = theTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
@@ -344,10 +348,10 @@ int
NdbBlob::setAccessKeyValue(NdbOperation* anOp)
{
const Uint32* data = (const Uint32*)theAccessKeyBuf.data;
+ DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_sizeOfKeysInWords));
+ const unsigned columns = theAccessTable->m_columns.size();
unsigned pos = 0;
- const unsigned size = theAccessTable->m_columns.size();
- DBG("setAccessKeyValue key=" << ndb_blob_debug(data, size));
- for (unsigned i = 0; i < size; i++) {
+ for (unsigned i = 0; i < columns; i++) {
NdbColumnImpl* c = theAccessTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
@@ -382,7 +386,7 @@ int
NdbBlob::getHeadInlineValue(NdbOperation* anOp)
{
DBG("getHeadInlineValue");
- theHeadInlineRecAttr = anOp->getValue(theColumn, theHeadInlineBuf.data);
+ theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
if (theHeadInlineRecAttr == NULL) {
setErrorCode(anOp);
return -1;
@@ -478,11 +482,27 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
return 0;
}
+// activation hook
+
+int
+NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
+{
+ DBG("setActiveHook hook=" << hex << (void*)activeHook << " arg=" << hex << arg);
+ if (theState != Prepared) {
+ setErrorCode(ErrState);
+ return -1;
+ }
+ theActiveHook = activeHook;
+ theActiveHookArg = arg;
+ return 0;
+}
+
// misc operations
int
NdbBlob::getNull(bool& isNull)
{
+ DBG("getNull");
if (theState == Prepared && theSetFlag) {
isNull = (theSetBuf == NULL);
return 0;
@@ -519,6 +539,7 @@ NdbBlob::setNull()
int
NdbBlob::getLength(Uint64& len)
{
+ DBG("getLength");
if (theState == Prepared && theSetFlag) {
len = theGetSetBytes;
return 0;
@@ -534,17 +555,17 @@ NdbBlob::getLength(Uint64& len)
int
NdbBlob::truncate(Uint64 length)
{
- DBG("truncate kength=" << length);
+ DBG("truncate length=" << length);
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
}
if (theLength > length) {
- if (length >= theInlineSize) {
- Uint32 part1 = getPartNumber(length);
+ if (length > theInlineSize) {
+ Uint32 part1 = getPartNumber(length - 1);
Uint32 part2 = getPartNumber(theLength - 1);
assert(part2 >= part1);
- if (deleteParts(part1, part2 - part1) == -1)
+ if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
return -1;
} else {
if (deleteParts(0, getPartCount()) == -1)
@@ -559,6 +580,7 @@ NdbBlob::truncate(Uint64 length)
int
NdbBlob::getPos(Uint64& pos)
{
+ DBG("getPos");
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
@@ -570,6 +592,7 @@ NdbBlob::getPos(Uint64& pos)
int
NdbBlob::setPos(Uint64 pos)
{
+ DBG("setPos pos=" << pos);
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
@@ -628,6 +651,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
len -= n;
}
}
+ if (len > 0 && thePartSize == 0) {
+ setErrorCode(ErrSeek);
+ return -1;
+ }
if (len > 0) {
assert(pos >= theInlineSize);
Uint32 off = (pos - theInlineSize) % thePartSize;
@@ -637,11 +664,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
+ // need result now
+ DBG("execute pending part reads");
+ if (executePendingBlobReads() == -1)
return -1;
- }
Uint32 n = thePartSize - off;
if (n > len)
n = len;
@@ -672,11 +698,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
+ // need result now
+ DBG("execute pending part reads");
+ if (executePendingBlobReads() == -1)
return -1;
- }
memcpy(buf, thePartBuf.data, len);
Uint32 n = len;
pos += n;
@@ -735,29 +760,27 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
len -= n;
}
}
+ if (len > 0 && thePartSize == 0) {
+ setErrorCode(ErrSeek);
+ return -1;
+ }
if (len > 0) {
assert(pos >= theInlineSize);
Uint32 off = (pos - theInlineSize) % thePartSize;
// partial first block
if (off != 0) {
DBG("partial first block pos=" << pos << " len=" << len);
- if (theNewPartFlag) {
- // must flush insert to guarantee read
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
- return -1;
- }
- theNewPartFlag = false;
- }
+ // flush writes to guarantee correct read
+ DBG("execute pending part writes");
+ if (executePendingBlobWrites() == -1)
+ return -1;
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
+ // need result now
+ DBG("execute pending part reafs");
+ if (executePendingBlobReads() == -1)
return -1;
- }
Uint32 n = thePartSize - off;
if (n > len) {
memset(thePartBuf.data + off + len, theFillChar, n - len);
@@ -798,22 +821,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
Uint32 part = (pos - theInlineSize) / thePartSize;
if (theLength > pos + len) {
- if (theNewPartFlag) {
- // must flush insert to guarantee read
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
- return -1;
- }
- theNewPartFlag = false;
- }
+ // flush writes to guarantee correct read
+ DBG("execute pending part writes");
+ if (executePendingBlobWrites() == -1)
+ return -1;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode(theNdbOp);
+ // need result now
+ DBG("execute pending part reads");
+ if (executePendingBlobReads() == -1)
return -1;
- }
memcpy(thePartBuf.data, buf, len);
if (updateParts(thePartBuf.data, part, 1) == -1)
return -1;
@@ -848,7 +865,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
DBG("readParts part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
- NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName);
+ NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL ||
tOp->readTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 ||
@@ -858,6 +875,8 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
+ thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
+ theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
}
return 0;
}
@@ -868,7 +887,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
DBG("insertParts part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
- NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName);
+ NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL ||
tOp->insertTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 ||
@@ -878,7 +897,8 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
- theNewPartFlag = true;
+ thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
+ theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
}
return 0;
}
@@ -889,7 +909,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
DBG("updateParts part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
- NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName);
+ NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL ||
tOp->updateTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1 ||
@@ -899,7 +919,8 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
- theNewPartFlag = true;
+ thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
+ theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
}
return 0;
}
@@ -910,7 +931,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
DBG("deleteParts part=" << part << " count=" << count);
Uint32 n = 0;
while (n < count) {
- NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName);
+ NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
if (tOp == NULL ||
tOp->deleteTuple() == -1 ||
setPartKeyValue(tOp, part + n) == -1) {
@@ -918,6 +939,52 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
return -1;
}
n++;
+ thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
+ theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
+ }
+ return 0;
+}
+
+// pending ops
+
+int
+NdbBlob::executePendingBlobReads()
+{
+ Uint8 flags = (1 << NdbOperation::ReadRequest);
+ if (thePendingBlobOps & flags) {
+ if (theNdbCon->executeNoBlobs(NoCommit) == -1)
+ return -1;
+ thePendingBlobOps = 0;
+ theNdbCon->thePendingBlobOps = 0;
+ }
+ return 0;
+}
+
+int
+NdbBlob::executePendingBlobWrites()
+{
+ Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
+ if (thePendingBlobOps & flags) {
+ if (theNdbCon->executeNoBlobs(NoCommit) == -1)
+ return -1;
+ thePendingBlobOps = 0;
+ theNdbCon->thePendingBlobOps = 0;
+ }
+ return 0;
+}
+
+// callbacks
+
+int
+NdbBlob::invokeActiveHook()
+{
+ DBG("invokeActiveHook");
+ assert(theState == Active && theActiveHook != NULL);
+ int ret = (*theActiveHook)(this, theActiveHookArg);
+ DBG("invokeActiveHook ret=" << ret);
+ if (ret != 0) {
+ // no error is set on blob level
+ return -1;
}
return 0;
}
@@ -947,7 +1014,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
partType = NdbDictionary::Column::Binary;
theFillChar = 0x0;
break;
- case NdbDictionary::Column::Clob:
+ case NdbDictionary::Column::Text:
partType = NdbDictionary::Column::Char;
theFillChar = 0x20;
break;
@@ -959,22 +1026,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theInlineSize = theColumn->getInlineSize();
thePartSize = theColumn->getPartSize();
theStripeSize = theColumn->getStripeSize();
- // blob table sanity check
+ // sanity check
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
- getBlobTableName(theBlobTableName, theTable, theColumn);
- const NdbDictionary::Table* bt;
- const NdbDictionary::Column* bc;
- if (theInlineSize >= (1 << 16) ||
- thePartSize == 0 ||
- thePartSize >= (1 << 16) ||
- theStripeSize == 0 ||
- (bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL ||
- (bc = bt->getColumn("DATA")) == NULL ||
- bc->getType() != partType ||
- bc->getLength() != (int)thePartSize) {
- setErrorCode(ErrTable);
- return -1;
+ if (thePartSize > 0) {
+ const NdbDictionary::Table* bt = NULL;
+ const NdbDictionary::Column* bc = NULL;
+ if (theStripeSize == 0 ||
+ (bt = theColumn->getBlobTable()) == NULL ||
+ (bc = bt->getColumn("DATA")) == NULL ||
+ bc->getType() != partType ||
+ bc->getLength() != (int)thePartSize) {
+ setErrorCode(ErrTable);
+ return -1;
+ }
+ theBlobTable = &NdbTableImpl::getImpl(*bt);
}
// buffers
theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2);
@@ -1060,7 +1126,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
Uint32 bytes = theGetSetBytes - theInlineSize;
if (writeDataPrivate(pos, buf, bytes) == -1)
return -1;
- if (anExecType == Commit && theHeadInlineUpdateFlag) {
+ if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
if (tOp == NULL ||
@@ -1128,6 +1194,10 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
batch = true;
}
}
+ if (theActiveHook != NULL) {
+ // need blob head for callback
+ batch = true;
+ }
DBG("preExecute out batch=" << batch);
return 0;
}
@@ -1144,8 +1214,11 @@ NdbBlob::postExecute(ExecType anExecType)
DBG("postExecute type=" << anExecType);
if (theState == Invalid)
return -1;
- if (theState == Active)
+ if (theState == Active) {
+ setState(anExecType == NoCommit ? Active : Closed);
+ DBG("postExecute skip");
return 0;
+ }
assert(theState == Prepared);
assert(isKeyOp());
if (isIndexOp()) {
@@ -1199,8 +1272,12 @@ NdbBlob::postExecute(ExecType anExecType)
if (deleteParts(0, getPartCount()) == -1)
return -1;
}
- theNewPartFlag = false;
setState(anExecType == NoCommit ? Active : Closed);
+ // activation callback
+ if (theActiveHook != NULL) {
+ if (invokeActiveHook() == -1)
+ return -1;
+ }
DBG("postExecute out");
return 0;
}
@@ -1250,7 +1327,7 @@ NdbBlob::atNextResult()
// get primary key
{ Uint32* data = (Uint32*)theKeyBuf.data;
unsigned size = theTable->m_sizeOfKeysInWords;
- if (theNdbOp->getKeyFromKEYINFO20(data, size) == -1) {
+ if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(ErrUsage);
return -1;
}
@@ -1274,20 +1351,18 @@ NdbBlob::atNextResult()
Uint32 bytes = theGetSetBytes - theInlineSize;
if (readDataPrivate(pos, buf, bytes) == -1)
return -1;
- // must also execute them
- DBG("force execute");
- if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
- setErrorCode((NdbOperation*)0);
- return -1;
- }
}
}
setState(Active);
+ // activation callback
+ if (theActiveHook != NULL) {
+ if (invokeActiveHook() == -1)
+ return -1;
+ }
DBG("atNextResult out");
return 0;
}
-
// misc
const NdbDictionary::Column*
@@ -1303,6 +1378,9 @@ NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag)
{
DBG("setErrorCode code=" << anErrorCode);
theError.code = anErrorCode;
+ // conditionally copy error to operation level
+ if (theNdbOp != NULL && theNdbOp->theError.code == 0)
+ theNdbOp->setErrorCode(theError.code);
if (invalidFlag)
setState(Invalid);
}
@@ -1335,11 +1413,34 @@ NdbBlob::setErrorCode(NdbConnection* aCon, bool invalidFlag)
setErrorCode(code, invalidFlag);
}
+// info about all blobs in this operation
+
+NdbBlob*
+NdbBlob::blobsFirstBlob()
+{
+ return theNdbOp->theBlobList;
+}
+
+NdbBlob*
+NdbBlob::blobsNextBlob()
+{
+ return theNext;
+}
+
+// debug
+
#ifdef VM_TRACE
+inline int
+NdbBlob::getOperationType() const
+{
+ return theNdbOp != NULL ? theNdbOp->theOperationType : -1;
+}
+
NdbOut&
operator<<(NdbOut& out, const NdbBlob& blob)
{
- ndbout << dec << "s=" << blob.theState;
+ ndbout << dec << "o=" << blob.getOperationType();
+ ndbout << dec << " s=" << blob.theState;
ndbout << dec << " n=" << blob.theNullFlag;;
ndbout << dec << " l=" << blob.theLength;
ndbout << dec << " p=" << blob.thePos;