summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <mskold@mysql.com>2006-06-12 09:37:19 +0200
committerunknown <mskold@mysql.com>2006-06-12 09:37:19 +0200
commite3a975c5d6b92f951c2e2e811c2ae4f3e6fe61a3 (patch)
tree5b2d6993076fe52b35248ed22e5af5f8e3de027f
parentb2d3ac1b6bd14493c03f6d58c624b47a3557e918 (diff)
downloadmariadb-git-e3a975c5d6b92f951c2e2e811c2ae4f3e6fe61a3.tar.gz
Fix for Bug #18184 SELECT ... FOR UPDATE does not work..: Adapted to 5.0 code changes
-rw-r--r--ndb/include/ndbapi/NdbIndexScanOperation.hpp7
-rw-r--r--ndb/include/ndbapi/NdbScanOperation.hpp32
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp2
-rw-r--r--sql/ha_ndbcluster.cc91
4 files changed, 117 insertions, 15 deletions
diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
index e9f92d84d1c..638ecb17779 100644
--- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
@@ -61,11 +61,14 @@ public:
Uint32 parallel,
bool order_by,
bool order_desc = false,
- bool read_range_no = false) {
+ bool read_range_no = false,
+ bool keyinfo = false) {
Uint32 scan_flags =
(SF_OrderBy & -(Int32)order_by) |
(SF_Descending & -(Int32)order_desc) |
- (SF_ReadRangeNo & -(Int32)read_range_no);
+ (SF_ReadRangeNo & -(Int32)read_range_no) |
+ (SF_KeyInfo & -(Int32)keyinfo);
+
return readTuples(lock_mode, scan_flags, parallel);
}
#endif
diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp
index 6a393223c4c..4a8425852b9 100644
--- a/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -44,7 +44,8 @@ public:
SF_TupScan = (1 << 16), // scan TUP - only LM_CommittedRead
SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order
- SF_ReadRangeNo = (4 << 24) // enable @ref get_range_no
+ SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
+ SF_KeyInfo = 1 // request KeyInfo to be sent back
};
/**
@@ -68,7 +69,7 @@ public:
*/
#ifdef ndb_readtuples_impossible_overload
int readTuples(LockMode lock_mode = LM_Read,
- Uint32 batch = 0, Uint32 parallel = 0);
+ Uint32 batch = 0, Uint32 parallel = 0, bool keyinfo = false);
#endif
inline int readTuples(int parallell){
@@ -141,6 +142,20 @@ public:
void close(bool forceSend = false, bool releaseOp = false);
/**
+ * Lock current tuple
+ *
+ * @return an NdbOperation or NULL.
+ */
+ NdbOperation* lockCurrentTuple();
+ /**
+ * Lock current tuple
+ *
+ * @param lockTrans Transaction that should perform the lock
+ *
+ * @return an NdbOperation or NULL.
+ */
+ NdbOperation* lockCurrentTuple(NdbTransaction* lockTrans);
+ /**
* Update current tuple
*
* @return an NdbOperation or NULL.
@@ -250,6 +265,19 @@ protected:
inline
NdbOperation*
+NdbScanOperation::lockCurrentTuple(){
+ return lockCurrentTuple(m_transConnection);
+}
+
+inline
+NdbOperation*
+NdbScanOperation::lockCurrentTuple(NdbTransaction* takeOverTrans){
+ return takeOverScanOp(NdbOperation::ReadRequest,
+ takeOverTrans);
+}
+
+inline
+NdbOperation*
NdbScanOperation::updateCurrentTuple(){
return updateCurrentTuple(m_transConnection);
}
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 8278264fca2..7d0712e117d 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -160,7 +160,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return -1;
}
- m_keyInfo = (keyinfo || lockExcl) ? 1 : 0;
+ m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
bool rangeScan = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index dc04d60e868..31a33aab5c0 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -1174,12 +1174,23 @@ void ha_ndbcluster::release_metadata()
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
+ DBUG_ENTER("ha_ndbcluster::get_ndb_lock_type");
if (type >= TL_WRITE_ALLOW_WRITE)
- return NdbOperation::LM_Exclusive;
- else if (uses_blob_value(m_retrieve_all_fields))
- return NdbOperation::LM_Read;
+ {
+ DBUG_PRINT("info", ("Using exclusive lock"));
+ DBUG_RETURN(NdbOperation::LM_Exclusive);
+ }
+ else if (type == TL_READ_WITH_SHARED_LOCKS ||
+ uses_blob_value(m_retrieve_all_fields))
+ {
+ DBUG_PRINT("info", ("Using read lock"));
+ DBUG_RETURN(NdbOperation::LM_Read);
+ }
else
- return NdbOperation::LM_CommittedRead;
+ {
+ DBUG_PRINT("info", ("Using committed read"));
+ DBUG_RETURN(NdbOperation::LM_CommittedRead);
+ }
}
static const ulong index_type_flags[]=
@@ -1679,7 +1690,30 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
int check;
NdbTransaction *trans= m_active_trans;
- bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
+ if (m_lock_tuple)
+ {
+ /*
+ Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
+ (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
+ LOCK WITH SHARE MODE) and row was not explictly unlocked
+ with unlock_row() call
+ */
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ // Lock row
+ DBUG_PRINT("info", ("Keeping lock on scanned row"));
+
+ if (!(op= m_active_cursor->lockCurrentTuple()))
+ {
+ m_lock_tuple= false;
+ ERR_RETURN(trans->getNdbError());
+ }
+ m_ops_pending++;
+ }
+ m_lock_tuple= false;
+
+ bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
+ m_lock.type != TL_READ_WITH_SHARED_LOCKS;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
@@ -1695,6 +1729,13 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
{
+ /*
+ Explicitly lock tuple if "select for update" or
+ "select lock in share mode"
+ */
+ m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE
+ ||
+ m_lock.type == TL_READ_WITH_SHARED_LOCKS);
DBUG_RETURN(0);
}
else if (check == 1 || check == 2)
@@ -1983,10 +2024,11 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
restart= FALSE;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index,
(const NDBTAB *) m_table)) ||
- op->readTuples(lm, 0, parallelism, sorted, descending))
+ op->readTuples(lm, 0, parallelism, sorted, descending, need_pk))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
} else {
@@ -2036,8 +2078,11 @@ int ha_ndbcluster::full_table_scan(byte *buf)
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
- op->readTuples(lm, 0, parallelism))
+ op->readTuples(lm,
+ (need_pk)?NdbScanOperation::SF_KeyInfo:0,
+ parallelism))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
if (generate_scan_filter(m_cond_stack, op))
@@ -2327,6 +2372,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_PRINT("info", ("Calling updateTuple on cursor"));
if (!(op= cursor->updateCurrentTuple()))
ERR_RETURN(trans->getNdbError());
+ m_lock_tuple= false;
m_ops_pending++;
if (uses_blob_value(FALSE))
m_blobs_pending= TRUE;
@@ -2406,6 +2452,7 @@ int ha_ndbcluster::delete_row(const byte *record)
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
if (cursor->deleteCurrentTuple() != 0)
ERR_RETURN(trans->getNdbError());
+ m_lock_tuple= false;
m_ops_pending++;
no_uncommitted_rows_update(-1);
@@ -2529,9 +2576,9 @@ void ha_ndbcluster::unpack_record(byte* buf)
const NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
- hidden_col->getName(), rec->u_64_value()));
+ hidden_col->getName(), rec->u_64_value()));
}
- //print_results();
+ print_results();
#endif
DBUG_VOID_RETURN;
}
@@ -2605,6 +2652,12 @@ int ha_ndbcluster::index_init(uint index)
{
DBUG_ENTER("ha_ndbcluster::index_init");
DBUG_PRINT("enter", ("index: %u", index));
+ /*
+ Locks are are explicitly released in scan
+ unless m_lock.type == TL_READ_HIGH_PRIORITY
+ and no sub-sequent call to unlock_row()
+ */
+ m_lock_tuple= false;
DBUG_RETURN(handler::index_init(index));
}
@@ -3614,6 +3667,22 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
}
/*
+ Unlock the last row read in an open scan.
+ Rows are unlocked by default in ndb, but
+ for SELECT FOR UPDATE and SELECT LOCK WIT SHARE MODE
+ locks are kept if unlock_row() is not called.
+*/
+
+void ha_ndbcluster::unlock_row()
+{
+ DBUG_ENTER("unlock_row");
+
+ DBUG_PRINT("info", ("Unlocking row"));
+ m_lock_tuple= false;
+ DBUG_VOID_RETURN;
+}
+
+/*
Start a transaction for running a statement if one is not
already running in a transaction. This will be the case in
a BEGIN; COMMIT; block
@@ -5897,6 +5966,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
byte *end_of_buffer= (byte*)buffer->buffer_end;
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ bool need_pk = (lm == NdbOperation::LM_Read);
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBINDEX *unique_idx= (NDBINDEX *) m_index[active_index].unique_index;
const NDBINDEX *idx= (NDBINDEX *) m_index[active_index].index;
@@ -5963,7 +6033,8 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
end_of_buffer -= reclength;
}
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
- &&!scanOp->readTuples(lm, 0, parallelism, sorted, FALSE, TRUE)
+ &&!scanOp->readTuples(lm, 0, parallelism, sorted,
+ FALSE, TRUE, need_pk)
&&!generate_scan_filter(m_cond_stack, scanOp)
&&!define_read_attrs(end_of_buffer-reclength, scanOp))
{