summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/Ndb.cpp
diff options
context:
space:
mode:
authorcmiller@zippy.cornsilk.net <>2007-10-17 14:05:43 -0400
committercmiller@zippy.cornsilk.net <>2007-10-17 14:05:43 -0400
commitf3d77c1979bad93a304cbb5b93d672178815df00 (patch)
tree892096ddfa11f5fea89ad26528d5c64fbc05ffce /storage/ndb/src/ndbapi/Ndb.cpp
parentc6b5c6af86cb7fed9ad86690136d668071b6f6cc (diff)
parent571200d48750beba84a2936d727afcb347e1b704 (diff)
downloadmariadb-git-f3d77c1979bad93a304cbb5b93d672178815df00.tar.gz
Merge zippy.cornsilk.net:/home/cmiller/work/mysql/mysql-5.1-comeng-unification
into zippy.cornsilk.net:/home/cmiller/work/mysql/mysql-5.1-recentcommmerge
Diffstat (limited to 'storage/ndb/src/ndbapi/Ndb.cpp')
-rw-r--r--storage/ndb/src/ndbapi/Ndb.cpp274
1 files changed, 248 insertions, 26 deletions
diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
index 78b7af5522b..15647861eef 100644
--- a/storage/ndb/src/ndbapi/Ndb.cpp
+++ b/storage/ndb/src/ndbapi/Ndb.cpp
@@ -37,6 +37,7 @@ Name: Ndb.cpp
#include "API.hpp"
#include <NdbEnv.h>
#include <BaseString.hpp>
+#include <NdbSqlUtil.hpp>
/****************************************************************************
void connect();
@@ -201,9 +202,10 @@ Ndb::NDB_connect(Uint32 tNode)
DBUG_PRINT("info",
("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d",
tReturnCode, tNdbCon->Status()));
- if (theError.code == 299)
+ if (theError.code == 299 || // single user mode
+ theError.code == 281 ) // cluster shutdown in progress
{
- // single user mode so no need to retry with other node
+ // no need to retry with other node
DBUG_RETURN(-1);
}
DBUG_RETURN(3);
@@ -304,6 +306,180 @@ Return Value: Returns a pointer to a connection object.
Return NULL otherwise.
Remark: Start transaction. Synchronous.
*****************************************************************************/
+int
+Ndb::computeHash(Uint32 *retval,
+ const NdbDictionary::Table *table,
+ const struct Key_part_ptr * keyData,
+ void* buf, Uint32 bufLen)
+{
+ Uint32 j = 0;
+ Uint32 sumlen = 0; // Needed len
+ const NdbTableImpl* impl = &NdbTableImpl::getImpl(*table);
+ const NdbColumnImpl* const * cols = impl->m_columns.getBase();
+ Uint32 len;
+ char* pos;
+
+ Uint32 colcnt = impl->m_columns.size();
+ Uint32 parts = impl->m_noOfDistributionKeys;
+ if (parts == 0)
+ {
+ parts = impl->m_noOfKeys;
+ }
+
+ for (Uint32 i = 0; i<parts; i++)
+ {
+ if (unlikely(keyData[i].ptr == 0))
+ goto enullptr;
+ }
+
+ if (unlikely(keyData[parts].ptr != 0))
+ goto emissingnullptr;
+
+ const NdbColumnImpl* partcols[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
+ for (Uint32 i = 0; i<colcnt && j < parts; i++)
+ {
+ if (cols[i]->m_distributionKey)
+ {
+ // wl3717_todo
+ // char allowed now as dist key so this case should be tested
+ partcols[j++] = cols[i];
+ }
+ }
+
+ for (Uint32 i = 0; i<parts; i++)
+ {
+ Uint32 lb, len;
+ if (unlikely(!NdbSqlUtil::get_var_length(partcols[i]->m_type,
+ keyData[i].ptr,
+ keyData[i].len,
+ lb, len)))
+ goto emalformedkey;
+
+ if (unlikely(keyData[i].len < (lb + len)))
+ goto elentosmall;
+
+ Uint32 maxlen = (partcols[i]->m_attrSize * partcols[i]->m_arraySize);
+
+ if (unlikely(lb == 0 && keyData[i].len != maxlen))
+ goto emalformedkey;
+
+ if (partcols[i]->m_cs)
+ {
+ Uint32 xmul = partcols[i]->m_cs->strxfrm_multiply;
+ xmul = xmul ? xmul : 1;
+ len = xmul * (maxlen - lb);
+ }
+
+ len = (lb + len + 3) & ~(Uint32)3;
+ sumlen += len;
+
+ }
+
+ if (buf)
+ {
+ UintPtr org = UintPtr(buf);
+ UintPtr use = (org + 7) & ~(UintPtr)7;
+
+ buf = (void*)use;
+ bufLen -= (use - org);
+
+ if (unlikely(sumlen > bufLen))
+ goto ebuftosmall;
+ }
+ else
+ {
+ buf = malloc(sumlen);
+ if (unlikely(buf == 0))
+ goto enomem;
+ bufLen = 0;
+ assert((UintPtr(buf) & 7) == 0);
+ }
+
+ pos = (char*)buf;
+ for (Uint32 i = 0; i<parts; i++)
+ {
+ Uint32 lb, len;
+ NdbSqlUtil::get_var_length(partcols[i]->m_type,
+ keyData[i].ptr, keyData[i].len, lb, len);
+ CHARSET_INFO* cs;
+ if ((cs = partcols[i]->m_cs))
+ {
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ /*
+ * Varchar end-spaces are ignored in comparisons. To get same hash
+ * we blank-pad to maximum length via strnxfrm.
+ */
+ Uint32 maxlen = (partcols[i]->m_attrSize * partcols[i]->m_arraySize);
+ Uint32 dstLen = xmul * (maxlen - lb);
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs,
+ (unsigned char*)pos,
+ dstLen,
+ ((unsigned char*)keyData[i].ptr)+lb,
+ len);
+
+ if (unlikely(n == -1))
+ goto emalformedstring;
+
+ while ((n & 3) != 0)
+ {
+ pos[n++] = 0;
+ }
+ pos += n;
+ }
+ else
+ {
+ len += lb;
+ memcpy(pos, keyData[i].ptr, len);
+ while (len & 3)
+ {
+ * (pos + len++) = 0;
+ }
+ pos += len;
+ }
+ }
+ len = UintPtr(pos) - UintPtr(buf);
+ assert((len & 3) == 0);
+
+ Uint32 values[4];
+ md5_hash(values, (const Uint64*)buf, len >> 2);
+
+ if (retval)
+ {
+ * retval = values[1];
+ }
+
+ if (bufLen == 0)
+ free(buf);
+
+ return 0;
+
+enullptr:
+ return 4316;
+
+emissingnullptr:
+ return 4276;
+
+elentosmall:
+ return 4277;
+
+ebuftosmall:
+ return 4278;
+
+emalformedstring:
+ if (bufLen == 0)
+ free(buf);
+
+ return 4279;
+
+emalformedkey:
+ return 4280;
+
+enomem:
+ return 4000;
+}
+
NdbTransaction*
Ndb::startTransaction(const NdbDictionary::Table *table,
const char * keyData, Uint32 keyLen)
@@ -754,17 +930,27 @@ Ndb::getNodeId()
}
/****************************************************************************
-Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );
-
-Parameters: aTableId : The TableId.
- cacheSize: Prefetch this many values
-Remark: Returns a new TupleId to the application.
- The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
- It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
+Uint64 getAutoIncrementValue( const char* aTableName,
+ Uint64 & autoValue,
+ Uint32 cacheSize,
+ Uint64 step,
+ Uint64 start);
+
+Parameters: aTableName (IN) : The table name.
+ autoValue (OUT) : Returns new autoincrement value
+ cacheSize (IN) : Prefetch this many values
+ step (IN) : Specifies the step between the
+ autoincrement values.
+ start (IN) : Start value for first value
+Remark: Returns a new autoincrement value to the application.
+ The autoincrement values can be increased by steps
+ (default 1) and a number of values can be prefetched
+ by specifying cacheSize (default 10).
****************************************************************************/
int
Ndb::getAutoIncrementValue(const char* aTableName,
- Uint64 & tupleId, Uint32 cacheSize)
+ Uint64 & autoValue, Uint32 cacheSize,
+ Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -778,15 +964,16 @@ Ndb::getAutoIncrementValue(const char* aTableName,
}
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
- if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+ if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong) tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong) autoValue));
DBUG_RETURN(0);
}
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
- Uint64 & tupleId, Uint32 cacheSize)
+ Uint64 & autoValue, Uint32 cacheSize,
+ Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -801,51 +988,86 @@ Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
- if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+ if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
- TupleIdRange & range, Uint64 & tupleId,
- Uint32 cacheSize)
+ TupleIdRange & range, Uint64 & autoValue,
+ Uint32 cacheSize, Uint64 step, Uint64 start)
{
DBUG_ENTER("Ndb::getAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
- if (getTupleIdFromNdb(table, range, tupleId, cacheSize) == -1)
+ if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
- TupleIdRange & range, Uint64 & tupleId, Uint32 cacheSize)
+ TupleIdRange & range, Uint64 & tupleId,
+ Uint32 cacheSize, Uint64 step, Uint64 start)
{
+/*
+ Returns a new TupleId to the application.
+ The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
+ It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
+ In most cases step= start= 1, in which case we get:
+ 1,2,3,4,5,...
+ If step=10 and start=5 and first number is 1, we get:
+ 5,15,25,35,...
+*/
DBUG_ENTER("Ndb::getTupleIdFromNdb");
- if (range.m_first_tuple_id != range.m_last_tuple_id)
+ /*
+ Check if the next value can be taken from the pre-fetched
+ sequence.
+ */
+ if (range.m_first_tuple_id != range.m_last_tuple_id &&
+ range.m_first_tuple_id + step <= range.m_last_tuple_id)
{
assert(range.m_first_tuple_id < range.m_last_tuple_id);
- tupleId = ++range.m_first_tuple_id;
- DBUG_PRINT("info", ("next cached value %lu", (ulong)tupleId));
+ range.m_first_tuple_id += step;
+ tupleId = range.m_first_tuple_id;
+ DBUG_PRINT("info", ("Next cached value %lu", (ulong) tupleId));
}
else
{
+ /*
+ If start value is greater than step it is ignored
+ */
+ Uint64 offset = (start > step) ? 1 : start;
+
+ /*
+ Pre-fetch a number of values depending on cacheSize
+ */
if (cacheSize == 0)
cacheSize = 1;
+
DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
/*
* reserve next cacheSize entries in db. adds cacheSize to NEXTID
- * and returns first tupleId in the new range.
+ * and returns first tupleId in the new range. If tupleId's are
+ * incremented in steps then multiply the cacheSize with step size.
*/
- Uint64 opValue = cacheSize;
+ Uint64 opValue = cacheSize * step;
+
if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
DBUG_RETURN(-1);
- tupleId = opValue;
+ DBUG_PRINT("info", ("Next value fetched from database %lu", (ulong) opValue));
+ DBUG_PRINT("info", ("Increasing %lu by offset %lu, increment is %lu", (ulong) (ulong) opValue, (ulong) offset, (ulong) step));
+ Uint64 current, next;
+ Uint64 div = ((Uint64) (opValue + step - offset)) / step;
+ next = div * step + offset;
+ current = (next < step) ? next : next - step;
+ tupleId = (opValue <= current) ? current : next;
+ DBUG_PRINT("info", ("Returning %lu", (ulong) tupleId));
+ range.m_first_tuple_id = tupleId;
}
DBUG_RETURN(0);
}