summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <tomas@poseidon.ndb.mysql.com>2004-11-22 13:41:46 +0000
committerunknown <tomas@poseidon.ndb.mysql.com>2004-11-22 13:41:46 +0000
commit9f90db614d56af4646a15577b532ac93cf24a700 (patch)
treee2f1062d381bc6ca98a4bac4c3e2bbc2c20f8760
parentba97d10b3cfa895888a045d6dd96ee9643332f59 (diff)
downloadmariadb-git-9f90db614d56af4646a15577b532ac93cf24a700.tar.gz
added force send interface to scan
prepared for using query cache in ndb ndb/include/ndbapi/NdbIndexScanOperation.hpp: added force send interface to scan ndb/include/ndbapi/NdbResultSet.hpp: added force send interface to scan ndb/include/ndbapi/NdbScanOperation.hpp: added force send interface to scan ndb/src/ndbapi/NdbResultSet.cpp: added force send interface to scan ndb/src/ndbapi/NdbScanOperation.cpp: added force send interface to scan
-rw-r--r--ndb/include/ndbapi/NdbIndexScanOperation.hpp6
-rw-r--r--ndb/include/ndbapi/NdbResultSet.hpp6
-rw-r--r--ndb/include/ndbapi/NdbScanOperation.hpp11
-rw-r--r--ndb/src/ndbapi/NdbResultSet.cpp12
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp50
-rw-r--r--sql/ha_ndbcluster.cc33
-rw-r--r--sql/ha_ndbcluster.h2
7 files changed, 74 insertions, 46 deletions
diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
index 66b3fc9d43b..a3388f62f58 100644
--- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
@@ -113,7 +113,7 @@ public:
* Reset bounds and put operation in list that will be
* sent on next execute
*/
- int reset_bounds();
+ int reset_bounds(bool forceSend = false);
bool getSorted() const { return m_ordered; }
private:
@@ -127,8 +127,8 @@ private:
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
void fix_get_values();
- int next_result_ordered(bool fetchAllowed);
- int send_next_scan_ordered(Uint32 idx);
+ int next_result_ordered(bool fetchAllowed, bool forceSend = false);
+ int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns;
diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp
index 478daf8aad2..dc0288a380c 100644
--- a/ndb/include/ndbapi/NdbResultSet.hpp
+++ b/ndb/include/ndbapi/NdbResultSet.hpp
@@ -89,17 +89,17 @@ public:
* - 1: if there are no more tuples to scan.
* - 2: if there are no more cached records in NdbApi
*/
- int nextResult(bool fetchAllowed = true);
+ int nextResult(bool fetchAllowed = true, bool forceSend = false);
/**
* Close result set (scan)
*/
- void close();
+ void close(bool forceSend = false);
/**
* Restart
*/
- int restart();
+ int restart(bool forceSend = false);
/**
* Transfer scan operation to an updating transaction. Use this function
diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp
index 2e4d173ac75..3c95c79e776 100644
--- a/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -90,11 +90,11 @@ protected:
NdbScanOperation(Ndb* aNdb);
virtual ~NdbScanOperation();
- int nextResult(bool fetchAllowed = true);
+ int nextResult(bool fetchAllowed = true, bool forceSend = false);
virtual void release();
- void closeScan();
- int close_impl(class TransporterFacade*);
+ void closeScan(bool forceSend = false);
+ int close_impl(class TransporterFacade*, bool forceSend = false);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
@@ -103,6 +103,7 @@ protected:
int init(const NdbTableImpl* tab, NdbConnection* myConnection);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId);
+ void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode);
@@ -138,7 +139,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here
- int send_next_scan(Uint32 cnt, bool close);
+ int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP();
@@ -148,7 +149,7 @@ protected:
Uint32 m_ordered;
- int restart();
+ int restart(bool forceSend = false);
};
inline
diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp
index f270584d227..d9d71464026 100644
--- a/ndb/src/ndbapi/NdbResultSet.cpp
+++ b/ndb/src/ndbapi/NdbResultSet.cpp
@@ -44,10 +44,10 @@ void NdbResultSet::init()
{
}
-int NdbResultSet::nextResult(bool fetchAllowed)
+int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend)
{
int res;
- if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
+ if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) {
// handle blobs
NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) {
@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return res;
}
-void NdbResultSet::close()
+void NdbResultSet::close(bool forceSend)
{
- m_operation->closeScan();
+ m_operation->closeScan(forceSend);
}
NdbOperation*
@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
}
int
-NdbResultSet::restart(){
- return m_operation->restart();
+NdbResultSet::restart(bool forceSend){
+ return m_operation->restart(forceSend);
}
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 4b10ebb10cd..33fa826e470 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){
#define DEBUG_NEXT_RESULT 0
-int NdbScanOperation::nextResult(bool fetchAllowed)
+int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{
if(m_ordered)
- return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
+ return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
+ forceSend);
/**
* Check current receiver
@@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
+ forceSend) == 0){
idx = m_current_api_receiver;
last = m_api_receivers_count;
@@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}
int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
+ bool forceSend){
if(cnt > 0 || stopScanFlag){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
ret = tp->sendSignal(&tSignal, nodeId);
}
+ if (!ret) checkForceSend(forceSend);
+
m_sent_receivers_count = last + cnt + stopScanFlag;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
@@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
return 0;
}
+void NdbScanOperation::checkForceSend(bool forceSend)
+{
+ if (forceSend) {
+ TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
+ } else {
+ TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
+ }//if
+}
+
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId)
return 0;
}
-void NdbScanOperation::closeScan()
+void NdbScanOperation::closeScan(bool forceSend)
{
if(m_transConnection){
if(DEBUG_NEXT_RESULT)
@@ -657,7 +671,7 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- close_impl(tp);
+ close_impl(tp, forceSend);
} while(0);
@@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}
int
-NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
+NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
+ bool forceSend){
Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted
@@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
+ if(seq == tp->getNodeSequence(nodeId) &&
+ !send_next_scan_ordered(s_idx, forceSend)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
@@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}
int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
if(idx == theParallelism)
return 0;
@@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
- return tp->sendSignal(&tSignal, nodeId);
+ int ret= tp->sendSignal(&tSignal, nodeId);
+ if (!ret) checkForceSend(forceSend);
+ return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
- if(send_next_scan(0, true) == -1){ // Close scan
+ if(send_next_scan(0, true, forceSend) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true;
return -1;
}
@@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}
int
-NdbScanOperation::restart()
+NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
@@ -1529,7 +1547,7 @@ NdbScanOperation::restart()
{
int res;
- if((res= close_impl(tp)))
+ if((res= close_impl(tp, forceSend)))
{
return res;
}
@@ -1548,13 +1566,13 @@ NdbScanOperation::restart()
}
int
-NdbIndexScanOperation::reset_bounds(){
+NdbIndexScanOperation::reset_bounds(bool forceSend){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- res= close_impl(tp);
+ res= close_impl(tp, forceSend);
}
if(!res)
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index bec4dfd9401..6f7940caf75 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -1247,7 +1247,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
m_ops_pending= 0;
m_blobs_pending= FALSE;
}
- check= cursor->nextResult(contact_ndb);
+ check= cursor->nextResult(contact_ndb, m_force_send);
if (check == 0)
{
// One more record found
@@ -1540,7 +1540,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_ASSERT(op->getSorted() == sorted);
DBUG_ASSERT(op->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
- if(op->reset_bounds())
+ if(op->reset_bounds(m_force_send))
DBUG_RETURN(ndb_err(m_active_trans));
}
@@ -2367,7 +2367,7 @@ int ha_ndbcluster::index_last(byte *buf)
int res;
if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
NdbResultSet *cursor= m_active_cursor;
- while((res= cursor->nextResult(TRUE)) == 0);
+ while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
if(res == 1){
unpack_record(buf);
table->status= 0;
@@ -2453,7 +2453,7 @@ int ha_ndbcluster::rnd_init(bool scan)
{
if (!scan)
DBUG_RETURN(1);
- int res= cursor->restart();
+ int res= cursor->restart(m_force_send);
DBUG_ASSERT(res == 0);
}
index_init(table->primary_key);
@@ -2484,7 +2484,7 @@ int ha_ndbcluster::close_scan()
m_ops_pending= 0;
}
- cursor->close();
+ cursor->close(m_force_send);
m_active_cursor= NULL;
DBUG_RETURN(0);
}
@@ -3004,6 +3004,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_transaction_on= FALSE;
else
m_transaction_on= thd->variables.ndb_use_transactions;
+ // m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
m_active_trans= thd->transaction.all.ndb_tid ?
(NdbConnection*)thd->transaction.all.ndb_tid:
@@ -3728,7 +3729,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ha_not_exact_count(FALSE),
m_force_send(TRUE),
m_autoincrement_prefetch(32),
- m_transaction_on(TRUE)
+ m_transaction_on(TRUE),
+ m_use_local_query_cache(FALSE)
{
int i;
@@ -4415,7 +4417,7 @@ bool ha_ndbcluster::low_byte_first() const
}
bool ha_ndbcluster::has_transactions()
{
- return TRUE;
+ return m_transaction_on;
}
const char* ha_ndbcluster::index_type(uint key_number)
{
@@ -4432,7 +4434,10 @@ const char* ha_ndbcluster::index_type(uint key_number)
}
uint8 ha_ndbcluster::table_cache_type()
{
- return HA_CACHE_TBL_NOCACHE;
+ if (m_use_local_query_cache)
+ return HA_CACHE_TBL_TRANSACT;
+ else
+ return HA_CACHE_TBL_NOCACHE;
}
/*
@@ -4600,13 +4605,12 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
-
+ NdbConnection* pTrans= ndb->startTransaction();
do
{
- NdbConnection* pTrans= ndb->startTransaction();
if (pTrans == NULL)
break;
-
+
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
@@ -4623,13 +4627,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
- check= pTrans->execute(NoCommit);
+ check= pTrans->execute(NoCommit, AbortOnError, TRUE);
if (check == -1)
break;
Uint64 sum_rows= 0;
Uint64 sum_commits= 0;
- while((check= rs->nextResult(TRUE)) == 0)
+ while((check= rs->nextResult(TRUE, TRUE)) == 0)
{
sum_rows+= rows;
sum_commits+= commits;
@@ -4638,6 +4642,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (check == -1)
break;
+ rs->close(TRUE);
+
ndb->closeTransaction(pTrans);
if(row_count)
* row_count= sum_rows;
@@ -4647,6 +4653,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(0);
} while(0);
+ ndb->closeTransaction(pTrans);
DBUG_PRINT("exit", ("failed"));
DBUG_RETURN(-1);
}
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 9d7cba459cb..6b878681c05 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -238,10 +238,12 @@ class ha_ndbcluster: public handler
char *m_blobs_buffer;
uint32 m_blobs_buffer_size;
uint m_dupkey;
+ // set from thread variables at external lock
bool m_ha_not_exact_count;
bool m_force_send;
ha_rows m_autoincrement_prefetch;
bool m_transaction_on;
+ bool m_use_local_query_cache;
void set_rec_per_key();
void records_update();