summaryrefslogtreecommitdiff
path: root/storage/connect/tabtbl.cpp
diff options
context:
space:
mode:
authorOlivier Bertrand <bertrandop@gmail.com>2013-06-04 17:18:33 +0200
committerOlivier Bertrand <bertrandop@gmail.com>2013-06-04 17:18:33 +0200
commit9df57eba9f7b9bbee19bba5e880cbf611ef82d5b (patch)
tree5bc13d213d7fda2315b958595c714c9746cbabbb /storage/connect/tabtbl.cpp
parent0a01953c1a6e586f286d44222f9371a46da1ac45 (diff)
downloadmariadb-git-9df57eba9f7b9bbee19bba5e880cbf611ef82d5b.tar.gz
- Adding parallelism to the TBL table type
modified: storage/connect/tabcol.h storage/connect/tabtbl.cpp storage/connect/tabtbl.h storage/connect/value.cpp
Diffstat (limited to 'storage/connect/tabtbl.cpp')
-rw-r--r--storage/connect/tabtbl.cpp286
1 files changed, 280 insertions, 6 deletions
diff --git a/storage/connect/tabtbl.cpp b/storage/connect/tabtbl.cpp
index 9207ad636cf..61f9c8628d9 100644
--- a/storage/connect/tabtbl.cpp
+++ b/storage/connect/tabtbl.cpp
@@ -1,7 +1,7 @@
/************* TabTbl C++ Program Source Code File (.CPP) **************/
/* PROGRAM NAME: TABTBL */
/* ------------- */
-/* Version 1.5 */
+/* Version 1.6 */
/* */
/* COPYRIGHT: */
/* ---------- */
@@ -77,6 +77,16 @@
#include "ha_connect.h"
#include "mycat.h" // For GetHandler
+#if defined(WIN32)
+#if defined(__BORLANDC__)
+#define SYSEXIT void _USERENTRY
+#else
+#define SYSEXIT void
+#endif
+#else // !WIN32
+#define SYSEXIT void *
+#endif // !WIN32
+
extern "C" int trace;
/* ---------------------------- Class TBLDEF ---------------------------- */
@@ -87,6 +97,9 @@ extern "C" int trace;
TBLDEF::TBLDEF(void)
{
//To_Tables = NULL;
+ Accept = false;
+ Thread = false;
+ Maxerr = 0;
Ntables = 0;
Pseudo = 3;
} // end of TBLDEF constructor
@@ -143,8 +156,9 @@ bool TBLDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
} // endfor pdb
Maxerr = Cat->GetIntCatInfo("Maxerr", 0);
- Accept = (Cat->GetBoolCatInfo("Accept", 0) != 0);
- } // endif fsec || tablist
+ Accept = Cat->GetBoolCatInfo("Accept", false);
+ Thread = Cat->GetBoolCatInfo("Thread", false);
+ } // endif tablist
return FALSE;
} // end of DefineAM
@@ -156,6 +170,8 @@ PTDB TBLDEF::GetTable(PGLOBAL g, MODE m)
{
if (Catfunc == FNC_COL)
return new(g) TDBTBC(this);
+ else if (Thread)
+ return new(g) TDBTBM(this);
else
return new(g) TDBTBL(this);
@@ -173,7 +189,7 @@ TDBTBL::TDBTBL(PTBLDEF tdp) : TDBPRX(tdp)
//Tdbp = NULL;
Accept = tdp->Accept;
Maxerr = tdp->Maxerr;
- Nbf = 0;
+ Nbc = 0;
Rows = 0;
Crp = 0;
// NTables = 0;
@@ -227,7 +243,7 @@ bool TDBTBL::InitTableList(PGLOBAL g)
// Get the table description block of this table
if (!(Tdbp = GetSubTable(g, tabp))) {
- if (++Nbf > Maxerr)
+ if (++Nbc > Maxerr)
return TRUE; // Error return
else
continue; // Skip this table
@@ -389,7 +405,7 @@ bool TDBTBL::OpenDB(PGLOBAL g)
/*********************************************************************/
if (To_Filter && Tablist) {
Tablist = NULL;
- Nbf = 0;
+ Nbc = 0;
} // endif To_Filter
/*********************************************************************/
@@ -497,4 +513,262 @@ void TBTBLK::ReadColumn(PGLOBAL g)
} // end of ReadColumn
+/* ------------------------- Class TDBTBM ---------------------------- */
+
+/***********************************************************************/
+/* Thread routine that check and open one remote connection. */
+/***********************************************************************/
+pthread_handler_t ThreadOpen(void *p)
+ {
+ PTBMT cmp = (PTBMT)p;
+
+ if (!my_thread_init()) {
+ set_current_thd(cmp->Thd);
+
+ // Try to open the connection
+ if (!cmp->Tap->GetTo_Tdb()->OpenDB(cmp->G)) {
+ cmp->Ready = true;
+ } else
+ cmp->Rc = RC_FX;
+
+ my_thread_end();
+ } else
+ cmp->Rc = RC_FX;
+
+ return NULL;
+ } // end of ThreadOpen
+
+/***********************************************************************/
+/* TDBTBM constructors. */
+/***********************************************************************/
+TDBTBM::TDBTBM(PTBLDEF tdp) : TDBTBL(tdp)
+ {
+ Tmp = NULL; // To data table TBMT structures
+ Cmp = NULL; // Current data table TBMT
+ Bmp = NULL; // To bad (unconnected) TBMT structures
+ Done = false; // TRUE after first GetAllResults
+ Nrc = 0; // Number of remote connections
+ Nlc = 0; // Number of local connections
+ } // end of TDBTBL standard constructor
+
+/***********************************************************************/
+/* Reset read/write position values. */
+/***********************************************************************/
+void TDBTBM::ResetDB(void)
+ {
+ for (PCOL colp = Columns; colp; colp = colp->GetNext())
+ if (colp->GetAmType() == TYPE_AM_TABID)
+ colp->COLBLK::Reset();
+
+ for (PTABLE tabp = Tablist; tabp; tabp = tabp->GetNext())
+ ((PTDBASE)tabp->GetTo_Tdb())->ResetDB();
+
+ Tdbp = (PTDBASE)Tablist->GetTo_Tdb();
+ Crp = 0;
+ } // end of ResetDB
+
+/***********************************************************************/
+/* Returns RowId if b is false or Rownum if b is true. */
+/***********************************************************************/
+int TDBTBM::RowNumber(PGLOBAL g, bool b)
+ {
+ return Tdbp->RowNumber(g) + ((b) ? 0 : Rows);
+ } // end of RowNumber
+
+/***********************************************************************/
+/* Initialyze table parallel processing. */
+/***********************************************************************/
+bool TDBTBM::OpenTables(PGLOBAL g)
+ {
+ int k;
+ THD *thd = current_thd;
+ PTABLE tabp, *ptabp = &Tablist;
+ PTBMT tp, *ptp = &Tmp;
+
+ // Allocates the TBMT blocks for the tables
+ for (tabp = Tablist; tabp; tabp = tabp->Next)
+ if (tabp->GetTo_Tdb()->GetAmType() == TYPE_AM_MYSQL) {
+ // Remove remote table from the local list
+ *ptabp = tabp->Next;
+
+ // Make the remote table block
+ tp = (PTBMT)PlugSubAlloc(g, NULL, sizeof(TBMT));
+ memset(tp, 0, sizeof(TBMT));
+ tp->G = g;
+ tp->Tap = tabp;
+ tp->Thd = thd;
+
+ // Create the thread that will do the table opening.
+ pthread_attr_init(&tp->attr);
+// pthread_attr_setdetachstate(&tp->attr, PTHREAD_CREATE_JOINABLE);
+
+ if ((k = pthread_create(&tp->Tid, &tp->attr, ThreadOpen, tp))) {
+ sprintf(g->Message, "pthread_create error %d", k);
+ Nbc++;
+ continue;
+ } // endif k
+
+ // Add it to the remote list
+ *ptp = tp;
+ ptp = &tp->Next;
+ Nrc++; // Number of remote connections
+ } else {
+ ptabp = &tabp->Next;
+ Nlc++; // Number of local connections
+ } // endif Type
+
+ return false;
+ } // end of OpenTables
+
+/***********************************************************************/
+/* TBL Access Method opening routine. */
+/* Open first file, other will be opened sequencially when reading. */
+/***********************************************************************/
+bool TDBTBM::OpenDB(PGLOBAL g)
+ {
+ if (trace)
+ htrc("TBM OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n",
+ this, Tdb_No, Use, To_Key_Col, Mode);
+
+ if (Use == USE_OPEN) {
+ /*******************************************************************/
+ /* Table already open, replace it at its beginning. */
+ /*******************************************************************/
+ ResetDB();
+ return Tdbp->OpenDB(g); // Re-open fist table
+ } // endif use
+
+#if 0
+ /*********************************************************************/
+ /* When GetMaxsize was called, To_Filter was not set yet. */
+ /*********************************************************************/
+ if (To_Filter && Tablist) {
+ Tablist = NULL;
+ Nbc = 0;
+ } // endif To_Filter
+#endif // 0
+
+ /*********************************************************************/
+ /* Make the table list. */
+ /*********************************************************************/
+ if (/*!Tablist &&*/ InitTableList(g))
+ return TRUE;
+
+ /*********************************************************************/
+ /* Open all remote tables of the list. */
+ /*********************************************************************/
+ if (OpenTables(g))
+ return TRUE;
+
+ /*********************************************************************/
+ /* Proceed with local tables. */
+ /*********************************************************************/
+ if ((CurTable = Tablist)) {
+ Tdbp = (PTDBASE)CurTable->GetTo_Tdb();
+ Tdbp->SetMode(Mode);
+
+ // Check and initialize the subtable columns
+ for (PCOL cp = Columns; cp; cp = cp->GetNext())
+ if (cp->GetAmType() == TYPE_AM_TABID)
+ cp->COLBLK::Reset();
+ else if (((PPRXCOL)cp)->Init(g) && !Accept)
+ return TRUE;
+
+ if (trace)
+ htrc("Opening subtable %s\n", Tdbp->GetName());
+
+ // Now we can safely open the table
+ if (Tdbp->OpenDB(g))
+ return TRUE;
+
+ } // endif *Tablist
+
+ Use = USE_OPEN;
+ return FALSE;
+ } // end of OpenDB
+
+/***********************************************************************/
+/* ReadDB: Data Base read routine for MUL access method. */
+/***********************************************************************/
+int TDBTBM::ReadDB(PGLOBAL g)
+ {
+ int rc;
+
+ if (!Done) {
+ // Get result from local tables
+ if ((rc = TDBTBL::ReadDB(g)) != RC_EF)
+ return rc;
+ else if ((rc = ReadNextRemote(g)) != RC_OK)
+ return rc;
+
+ Done = true;
+ } // endif Done
+
+ /*********************************************************************/
+ /* Now start the reading process of remote tables. */
+ /*********************************************************************/
+ retry:
+ rc = Tdbp->ReadDB(g);
+
+ if (rc == RC_EF) {
+ // Total number of rows met so far
+ Rows += Tdbp->RowNumber(g) - 1;
+ Crp += Tdbp->GetProgMax(g);
+ Cmp->Complete = true;
+
+ if ((rc = ReadNextRemote(g)) == RC_OK)
+ goto retry;
+
+ } else if (rc == RC_FX)
+ strcat(strcat(strcat(g->Message, " ("), Tdbp->GetName()), ")");
+
+ return rc;
+ } // end of ReadDB
+
+/***********************************************************************/
+/* ReadNext: Continue reading from next table. */
+/***********************************************************************/
+int TDBTBM::ReadNextRemote(PGLOBAL g)
+ {
+ bool b = false;
+
+ if (Tdbp)
+ Tdbp->CloseDB(g);
+
+ Cmp = NULL;
+
+ retry:
+ // Search for a remote table having its result set
+ for (PTBMT tp = Tmp; tp; tp = tp->Next)
+ if (tp->Ready) {
+ if (!tp->Complete)
+ Cmp = tp;
+
+ } else
+ b = true;
+
+ if (!Cmp) {
+ if (b) { // more result to come
+// sleep(20);
+ goto retry;
+ } else
+ return RC_EF;
+
+ } // endif Curtable
+
+ Tdbp = (PTDBASE)Cmp->Tap->GetTo_Tdb();
+
+ // Check and initialize the subtable columns
+ for (PCOL cp = Columns; cp; cp = cp->GetNext())
+ if (cp->GetAmType() == TYPE_AM_TABID)
+ cp->COLBLK::Reset();
+ else if (((PPRXCOL)cp)->Init(g) && !Accept)
+ return RC_FX;
+
+ if (trace)
+ htrc("Reading subtable %s\n", Tdbp->GetName());
+
+ return RC_OK;
+ } // end of ReadNextRemote
+
/* ------------------------------------------------------------------- */