summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbScanOperation.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbScanOperation.cpp')
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp54
1 files changed, 36 insertions, 18 deletions
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 6eb5167e385..88208409c08 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -453,10 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
-int NdbScanOperation::nextResult(bool fetchAllowed)
+
+int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{
if(m_ordered)
- return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
+ return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
+ forceSend);
/**
* Check current receiver
@@ -496,7 +498,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
+ forceSend) == 0){
idx = m_current_api_receiver;
last = m_api_receivers_count;
@@ -587,9 +590,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}
int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
- if(cnt > 0)
- {
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
+ bool forceSend){
+ if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -636,6 +639,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
}
}
+ if (!ret) checkForceSend(forceSend);
+
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
@@ -645,6 +650,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
return 0;
}
+void NdbScanOperation::checkForceSend(bool forceSend)
+{
+ if (forceSend) {
+ TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
+ } else {
+ TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
+ }//if
+}
+
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@@ -660,7 +674,7 @@ NdbScanOperation::doSend(int ProcessorId)
return 0;
}
-void NdbScanOperation::closeScan()
+void NdbScanOperation::closeScan(bool forceSend)
{
if(m_transConnection){
if(DEBUG_NEXT_RESULT)
@@ -675,7 +689,7 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- close_impl(tp);
+ close_impl(tp, forceSend);
} while(0);
@@ -1310,7 +1324,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}
int
-NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
+NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
+ bool forceSend){
Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted
@@ -1338,7 +1353,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
+ if(seq == tp->getNodeSequence(nodeId) &&
+ !send_next_scan_ordered(s_idx, forceSend)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
@@ -1432,7 +1448,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}
int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
if(idx == theParallelism)
return 0;
@@ -1469,11 +1485,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
- return tp->sendSignal(&tSignal, nodeId);
+ int ret= tp->sendSignal(&tSignal, nodeId);
+ if (!ret) checkForceSend(forceSend);
+ return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1547,7 +1565,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
// Send close scan
- if(send_next_scan(api+conf, true) == -1)
+ if(send_next_scan(api+conf, true, forceSend) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
@@ -1596,7 +1614,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}
int
-NdbScanOperation::restart()
+NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
@@ -1605,7 +1623,7 @@ NdbScanOperation::restart()
{
int res;
- if((res= close_impl(tp)))
+ if((res= close_impl(tp, forceSend)))
{
return res;
}
@@ -1624,13 +1642,13 @@ NdbScanOperation::restart()
}
int
-NdbIndexScanOperation::reset_bounds(){
+NdbIndexScanOperation::reset_bounds(bool forceSend){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- res= close_impl(tp);
+ res= close_impl(tp, forceSend);
}
if(!res)