summaryrefslogtreecommitdiff
path: root/storage/connect/tabpivot.cpp
diff options
context:
space:
mode:
authorOlivier Bertrand <bertrandop@gmail.com>2013-05-19 19:25:06 +0200
committerOlivier Bertrand <bertrandop@gmail.com>2013-05-19 19:25:06 +0200
commitc035bde34c3a5a63451b51031d508a425ce9a3ae (patch)
tree40f513d803531c29a04dd18934ab20e799fba964 /storage/connect/tabpivot.cpp
parent3c76e0e2ad05229ea1718b6e9c3dce85d75aaa4d (diff)
downloadmariadb-git-c035bde34c3a5a63451b51031d508a425ce9a3ae.tar.gz
- Allowing views and queries as parameters for PROXY base tables
NOTE: Checking for looping references cannot be done when using views as parameters. This should not be allowed on production servers and should be dependant on a system variable and/or on speciel grant. modified: storage/connect/CMakeLists.txt storage/connect/connect.cc storage/connect/ha_connect.cc storage/connect/myconn.cpp storage/connect/myconn.h storage/connect/mysql-test/connect/r/fmt.result storage/connect/mysql-test/connect/r/pivot.result storage/connect/mysql-test/connect/t/fmt.test storage/connect/mysql-test/connect/t/pivot.test storage/connect/plgdbsem.h storage/connect/plugutil.c storage/connect/tabcol.cpp storage/connect/tabcol.h storage/connect/tabfmt.cpp storage/connect/tabmysql.cpp storage/connect/tabmysql.h storage/connect/taboccur.cpp storage/connect/taboccur.h storage/connect/tabpivot.cpp storage/connect/tabpivot.h storage/connect/tabtbl.cpp storage/connect/tabutil.cpp storage/connect/tabutil.h storage/connect/xtable.h
Diffstat (limited to 'storage/connect/tabpivot.cpp')
-rw-r--r--storage/connect/tabpivot.cpp1003
1 files changed, 176 insertions, 827 deletions
diff --git a/storage/connect/tabpivot.cpp b/storage/connect/tabpivot.cpp
index 4ae1b3ceae6..1a73f353970 100644
--- a/storage/connect/tabpivot.cpp
+++ b/storage/connect/tabpivot.cpp
@@ -1,7 +1,7 @@
/************ TabPivot C++ Program Source Code File (.CPP) *************/
/* PROGRAM NAME: TABPIVOT */
/* ------------- */
-/* Version 1.4 */
+/* Version 1.5 */
/* */
/* COPYRIGHT: */
/* ---------- */
@@ -41,11 +41,9 @@
#include "global.h"
#include "plgdbsem.h"
#include "xtable.h"
-//#include "xindex.h"
#include "tabcol.h"
#include "colblk.h"
-//#include "tabmysql.h"
-#include "myconn.h"
+#include "tabmysql.h"
#include "csort.h"
#include "tabutil.h"
#include "tabpivot.h"
@@ -55,381 +53,6 @@
extern "C" int trace;
-#if 0
-/***********************************************************************/
-/* Prepare the source table Query. */
-/***********************************************************************/
-PQRYRES TDBPIVOT::GetSourceTable(PGLOBAL g)
- {
- if (Qryp)
- return Qryp; // Already done
-
- if (Tabname) {
- char *def, *colist;
- size_t len = 0;
- PCOL colp;
- PDBUSER dup = (PDBUSER)g->Activityp->Aptr;
-
- if (InitTable(g))
- return NULL;
-
- // Evaluate the length of the column list
- for (colp = Tdbp->Columns; colp; colp = colp->GetNext())
- len += (strlen(colp->GetName()) + 2);
-
- *(colist = (char*)PlugSubAlloc(g, NULL, len)) = 0;
-
- // Locate the suballocated string (size is not known yet)
- def = (char*)PlugSubAlloc(g, NULL, 0);
- strcpy(def, "SELECT ");
-
- if (!Fncol) {
- for (colp = Tdbp->Columns; colp; colp = colp->GetNext())
- if (!Picol || stricmp(Picol, colp->GetName()))
- Fncol = colp->GetName();
-
- if (!Fncol) {
- strcpy(g->Message, MSG(NO_DEF_FNCCOL));
- return NULL;
- } // endif Fncol
-
- } else if (!(ColDB(g, Fncol, 0))) {
- // Function column not found in table
- sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
- return NULL;
- } // endif Fcolp
-
- if (!Picol) {
- // Find default Picol as the last one not equal to Fncol
- for (colp = Tdbp->Columns; colp; colp = colp->GetNext())
- if (!Fncol || stricmp(Fncol, colp->GetName()))
- Picol = colp->GetName();
-
- if (!Picol) {
- strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
- return NULL;
- } // endif Picol
-
- } else if (!(ColDB(g, Picol, 0))) {
- // Pivot column not found in table
- sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
- return NULL;
- } // endif Xcolp
-
- // Make the other column list
- for (colp = Columns; colp; colp = colp->GetNext())
- if (stricmp(Picol, colp->GetName()) &&
- stricmp(Fncol, colp->GetName()))
- strcat(strcat(colist, colp->GetName()), ", ");
-
- // Add the Pivot column at the end of the list
- strcat(strcat(def, strcat(colist, Picol)), ", ");
-
- // Continue making the definition
- if (!GBdone) {
- // Make it suitable for Pivot by doing the group by
- strcat(strcat(def, Function), "(");
- strcat(strcat(strcat(def, Fncol), ") "), Fncol);
- strcat(strcat(def, " FROM "), Tabname);
- strcat(strcat(def, " GROUP BY "), colist);
- } else // Gbdone
- strcat(strcat(strcat(def, Fncol), " FROM "), Tabname);
-
- // Now we know how much was suballocated
- Tabsrc = (char*)PlugSubAlloc(g, NULL, strlen(def));
- } else {
- strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
- return NULL;
- } // endif Tabsrc
-
- int w;
-
- // Open a MySQL connection for this table
- if (Myc.Open(g, Host, Database, User, Pwd, Port))
- return NULL;
-
- // Send the source command to MySQL
- if (Myc.ExecSQL(g, Tabsrc, &w) == RC_FX) {
- Myc.Close();
- return NULL;
- } // endif Exec
-
- // We must have a storage query to get pivot column values
- Qryp = Myc.GetResult(g);
- Myc.Close();
- Tqrp = new(g) TDBQRS(Qryp);
- Tqrp->OpenDB(g);
-
- if (MakePivotColumns(g) < 0)
- return NULL;
-
- return Qryp;
- } // end of GetSourceTable
-
-/***********************************************************************/
-/* Allocate PIVOT columns description block. */
-/***********************************************************************/
-int TDBPIVOT::MakePivotColumns(PGLOBAL g)
- {
- if (Mult < 0) {
- int ndif, n = 0, nblin = Qryp->Nblin;
- PVAL valp;
- PCOL cp;
- PSRCCOL colp;
- PFNCCOL cfnp;
-
- // Allocate all the source columns
- Tqrp->ColDB(g, NULL, 0);
- Columns = NULL; // Discard dummy columns blocks
-
- for (cp = Tqrp->GetColumns(); cp; cp = cp->GetNext()) {
- if (cp->InitValue(g))
- return -1;
-
- if (!stricmp(cp->GetName(), Picol)) {
- Xcolp = (PQRSCOL)cp;
- Xresp = Xcolp->GetCrp();
- Rblkp = Xresp->Kdata;
- } else if (!stricmp(cp->GetName(), Fncol)) {
- Fcolp = (PQRSCOL)cp;
- } else
- if ((colp = new(g) SRCCOL(cp, this, ++n))->Init(g, this))
- return -1;
-
- } // endfor cp
-
- if (!Xcolp) {
- sprintf(g->Message, MSG(COL_ISNOT_TABLE),
- Picol, Tabname ? Tabname : "TabSrc");
- return -1;
- } else if (!Fcolp) {
- sprintf(g->Message, MSG(COL_ISNOT_TABLE),
- Fncol, Tabname ? Tabname : "TabSrc");
- return -1;
- } // endif Fcolp
-
- // Before calling sort, initialize all
- Index.Size = nblin * sizeof(int);
- Index.Sub = TRUE; // Should be small enough
-
- if (!PlgDBalloc(g, NULL, Index))
- return -1;
-
- Offset.Size = (nblin + 1) * sizeof(int);
- Offset.Sub = TRUE; // Should be small enough
-
- if (!PlgDBalloc(g, NULL, Offset))
- return -2;
-
- ndif = Qsort(g, nblin);
-
- if (ndif < 0) { // error
- return -3;
- } else
- Ncol = ndif;
-
- // Now make the functional columns
- for (int i = 0; i < Ncol; i++) {
- // Allocate the Value used to retieve column names
- if (!(valp = AllocateValue(g, Xcolp->GetResultType(),
- Xcolp->GetLengthEx(), Xcolp->GetPrecision(),
- Xcolp->GetDomain(), Xcolp->GetTo_Tdb()->GetCat())))
- return -4;
-
- // Get the value that will be the generated column name
- valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
-
- // Copy the functional column with new Name and new Value
- cfnp = (PFNCCOL)new(g) FNCCOL(Fcolp, this);
-
- // Initialize the generated column
- if (cfnp->InitColumn(g, valp))
- return -5;
-
- } // endfor i
-
- // Fields must be updated for ha_connect
-// if (UpdateTableFields(g, n + Ncol))
-// return -6;
-
- // This should be refined later
- Mult = nblin;
- } // endif Mult
-
- return Mult;
- } // end of MakePivotColumns
-
-/***********************************************************************/
-/* Update fields in the MySQL table structure */
-/* Note: this does not work. Indeed the new rows are correctly made */
-/* but the final result still specify the unmodified table and the */
-/* returned table only contains the original column values. */
-/* In addition, a new query on the table, when it is into the cache, */
-/* specifies all the new columns and fails because they do not belong */
-/* to the original table. */
-/***********************************************************************/
-bool TDBPIVOT::UpdateTableFields(PGLOBAL g, int n)
- {
- uchar *trec, *srec, *tptr, *sptr;
- int i = 0, k = 0;
- uint len;
- uint32 nmp, lwm;
- size_t buffsize;
- PCOL colp;
- PHC hc = ((MYCAT*)((PIVOTDEF*)To_Def)->Cat)->GetHandler();
- TABLE *table = hc->GetTable();
- st_mem_root *tmr = &table->mem_root;
- st_mem_root *smr = &table->s->mem_root;
- Field* *field;
- Field *fp, *tfncp, *sfncp;
- Field* *ntf;
- Field* *nsf;
-//my_bitmap_map *org_bitmap;
- const MY_BITMAP *map;
-
- // When sorting read_set selects all columns, so we use def_read_set
- map= (const MY_BITMAP *)&table->def_read_set;
-
- // Find the function field
- for (field= table->field; *field; field++) {
- fp= *field;
-
- if (bitmap_is_set(map, fp->field_index))
- if (!stricmp(fp->field_name, Fncol)) {
- tfncp = fp;
- break;
- } // endif Name
-
- } // endfor field
-
- for (field= table->s->field; *field; field++) {
- fp= *field;
-
- if (bitmap_is_set(map, fp->field_index))
- if (!stricmp(fp->field_name, Fncol)) {
- sfncp = fp;
- break;
- } // endif Name
-
- } // endfor field
-
- // Calculate the new buffer size
- len = tfncp->max_data_length();
- buffsize = table->s->rec_buff_length + len * Ncol;
-
- // Allocate the new record space
- if (!(tptr = trec = (uchar*)alloc_root(tmr, 2 * buffsize)))
- return TRUE;
-
- if (!(sptr = srec = (uchar*)alloc_root(smr, 2 * buffsize)))
- return TRUE;
-
-
- // Allocate the array of all new table field pointers
- if (!(ntf = (Field**)alloc_root(tmr, (uint)((n+1) * sizeof(Field*)))))
- return TRUE;
-
- // Allocate the array of all new table share field pointers
- if (!(nsf = (Field**)alloc_root(smr, (uint)((n+1) * sizeof(Field*)))))
- return TRUE;
-
- // First fields are the the ones of the source columns
- for (colp = Columns; colp; colp = colp->GetNext())
- if (colp->GetAmType() == TYPE_AM_SRC) {
- for (field= table->field; *field; field++) {
- fp= *field;
-
- if (bitmap_is_set(map, fp->field_index))
- if (!stricmp(colp->GetName(), fp->field_name)) {
- ntf[i] = fp;
- fp->field_index = i++;
- fp->ptr = tptr;
- tptr += fp->max_data_length();
- break;
- } // endif Name
-
- } // endfor field
-
- for (field= table->s->field; *field; field++) {
- fp= *field;
-
- if (bitmap_is_set(map, fp->field_index))
- if (!stricmp(colp->GetName(), fp->field_name)) {
- nsf[k] = fp;
- fp->field_index = k++;
- fp->ptr = srec;
- srec += fp->max_data_length();
- break;
- } // endif Name
-
- } // endfor field
-
- } // endif AmType
-
- // Now add the pivot generated columns
- for (colp = Columns; colp; colp = colp->GetNext())
- if (colp->GetAmType() == TYPE_AM_FNC) {
- if ((fp = (Field*)memdup_root(tmr, (char*)tfncp, tfncp->size_of()))) {
- ntf[i] = fp;
- fp->ptr = tptr;
- fp->field_name = colp->GetName();
- fp->field_index = i++;
- fp->vcol_info = NULL;
- fp->stored_in_db = TRUE;
- tptr += len;
- } else
- return TRUE;
-
- if ((fp = (Field*)memdup_root(smr, (char*)sfncp, sfncp->size_of()))) {
- nsf[i] = fp;
- fp->ptr = sptr;
- fp->field_name = colp->GetName();
- fp->field_index = k++;
- fp->vcol_info = NULL;
- fp->stored_in_db = TRUE;
- sptr += len;
- } else
- return TRUE;
-
- } // endif AM_FNC
-
- // Mark end of the list
- ntf[i] = NULL;
- nsf[k] = NULL;
-
- // Update the table fields
- nmp = (uint32)((1<<i) - 1);
- lwm = (uint32)((-1)<<i);
- table->field = ntf;
- table->used_fields = i;
- table->record[0] = trec;
- table->record[1] = trec + buffsize;
- *table->def_read_set.bitmap = nmp;
- *table->def_read_set.last_word_ptr = nmp;
- table->def_read_set.last_word_mask = lwm;
- table->def_read_set.n_bits = i;
- *table->read_set->bitmap = nmp;
- *table->read_set->last_word_ptr = nmp;
- table->read_set->last_word_mask = lwm;
- table->read_set->n_bits = i;
- table->write_set->n_bits = i;
- *table->vcol_set->bitmap = 0;
- table->vcol_set->n_bits = i;
-
- // and the share fields
- table->s->field = nsf;
- table->s->reclength = sptr - srec;
- table->s->stored_rec_length = sptr - srec;
- table->s->fields = k;
- table->s->stored_fields = k;
- table->s->rec_buff_length = buffsize;
-//table->s->varchar_fields = ???;
-//table->s->db_record_offset = ???;
-//table->s->null_field_first = ???;
- return FALSE;
- } // end of UpdateTableFields
-#endif // 0
-
/* --------------- Implementation of the PIVOT classes --------------- */
/***********************************************************************/
@@ -451,13 +74,12 @@ bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
char *p1, *p2;
PHC hc = ((MYCAT*)Cat)->GetHandler();
- if (!PRXDEF::DefineAM(g, am, poff)) {
- Tabname = (char*)Tablep->GetName();
- DB = (char*)Tablep->GetQualifier();
- } else {
- DB = Cat->GetStringCatInfo(g, "Database", "*");
- Tabsrc = Cat->GetStringCatInfo(g, "SrcDef", NULL);
- } // endif
+ if (PRXDEF::DefineAM(g, am, poff))
+ return TRUE;
+
+ Tabname = (char*)Tablep->GetName();
+ DB = (char*)Tablep->GetQualifier();
+ Tabsrc = (char*)Tablep->GetSrc();
Host = Cat->GetStringCatInfo(g, "Host", "localhost");
User = Cat->GetStringCatInfo(g, "User", "*");
@@ -477,7 +99,7 @@ bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
GBdone = Cat->GetBoolCatInfo("Groupby", false);
Accept = Cat->GetBoolCatInfo("Accept", false);
Port = Cat->GetIntCatInfo("Port", 3306);
- Desc = (Tabname) ? Tabname : Tabsrc;
+ Desc = (Tabsrc) ? Tabsrc : Tabname;
return FALSE;
} // end of DefineAM
@@ -521,6 +143,25 @@ TDBPIVOT::TDBPIVOT(PPIVOTDEF tdp) : TDBPRX(tdp)
} // end of TDBPIVOT constructor
/***********************************************************************/
+/* Allocate source column description block. */
+/***********************************************************************/
+PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
+ {
+ PCOL colp;
+
+ if (cdp->GetOffset()) {
+ colp = new(g) FNCCOL(cdp, this, cprec, n);
+
+ if (cdp->GetOffset() > 1)
+ Dcolp = colp;
+
+ } else
+ colp = new(g) SRCCOL(cdp, this, cprec, n);
+
+ return colp;
+ } // end of MakeCol
+
+/***********************************************************************/
/* Prepare the source table Query. */
/***********************************************************************/
bool TDBPIVOT::GetSourceTable(PGLOBAL g)
@@ -528,72 +169,74 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g)
if (Tdbp)
return false; // Already done
- if (Tabname) {
- PTABDEF defp;
- PCOLDEF cdp;
-
- if (InitTable(g))
+ if (!Tabsrc && Tabname) {
+ // Get the table description block of this table
+ if (!(Tdbp = GetSubTable(g, ((PPIVOTDEF)To_Def)->Tablep, true)))
return true;
- else
- defp = Tdbp->GetDef();
- if (!Fncol) {
- for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
- if (!Picol || stricmp(Picol, cdp->GetName()))
- Fncol = cdp->GetName();
+ if (!Tdbp->IsView()) {
+ PCOLDEF cdp;
+ PTABDEF defp = Tdbp->GetDef();
if (!Fncol) {
- strcpy(g->Message, MSG(NO_DEF_FNCCOL));
- return true;
+ for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
+ if (!Picol || stricmp(Picol, cdp->GetName()))
+ Fncol = cdp->GetName();
+
+ if (!Fncol) {
+ strcpy(g->Message, MSG(NO_DEF_FNCCOL));
+ return true;
+ } // endif Fncol
+
} // endif Fncol
-
- } // endif Fncol
-
- if (!Picol) {
- // Find default Picol as the last one not equal to Fncol
- for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
- if (!Fncol || stricmp(Fncol, cdp->GetName()))
- Picol = cdp->GetName();
-
+
if (!Picol) {
- strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
- return true;
+ // Find default Picol as the last one not equal to Fncol
+ for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
+ if (!Fncol || stricmp(Fncol, cdp->GetName()))
+ Picol = cdp->GetName();
+
+ if (!Picol) {
+ strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
+ return true;
+ } // endif Picol
+
} // endif Picol
-
- } // endif Picol
-
- if (!GBdone) {
- char *colist;
- // Locate the suballocated colist (size is not known yet)
- *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0;
+ if (!GBdone) {
+ char *colist;
- // Make the column list
- for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext())
- if (!cdp->GetOffset())
- strcat(strcat(colist, cdp->GetName()), ", ");
+ // Locate the suballocated colist (size is not known yet)
+ *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0;
- // Add the Pivot column at the end of the list
- strcat(colist, Picol);
+ // Make the column list
+ for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext())
+ if (!cdp->GetOffset())
+ strcat(strcat(colist, cdp->GetName()), ", ");
- // Now we know how much was suballocated
- PlugSubAlloc(g, NULL, strlen(colist));
+ // Add the Pivot column at the end of the list
+ strcat(colist, Picol);
- // Locate the source string (size is not known yet)
- Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
-
- // Start making the definition
- strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", ");
+ // Now we know how much was suballocated
+ PlugSubAlloc(g, NULL, strlen(colist));
+
+ // Locate the source string (size is not known yet)
+ Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
+
+ // Start making the definition
+ strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", ");
- // Make it suitable for Pivot by doing the group by
- strcat(strcat(Tabsrc, Function), "(");
- strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol);
- strcat(strcat(Tabsrc, " FROM "), Tabname);
- strcat(strcat(Tabsrc, " GROUP BY "), colist);
+ // Make it suitable for Pivot by doing the group by
+ strcat(strcat(Tabsrc, Function), "(");
+ strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol);
+ strcat(strcat(Tabsrc, " FROM "), Tabname);
+ strcat(strcat(Tabsrc, " GROUP BY "), colist);
- // Now we know how much was suballocated
- PlugSubAlloc(g, NULL, strlen(Tabsrc));
- } // endif !GBdone
+ // Now we know how much was suballocated
+ PlugSubAlloc(g, NULL, strlen(Tabsrc));
+ } // endif !GBdone
+
+ } // endif IsView
} else if (!Tabsrc) {
strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
@@ -601,26 +244,76 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g)
} // endif
if (Tabsrc) {
- MYSQLC myc; // MySQL connection class
- PQRYRES qryp;
- PCOLRES crp;
- int w;
+ // Get the new table description block of this source table
+ PTABLE tablep = new(g) XTAB("whatever", Tabsrc);
+
+ tablep->SetQualifier(Database);
- // Open a MySQL connection for this table
- if (myc.Open(g, Host, Database, User, Pwd, Port))
+ if (!(Tdbp = GetSubTable(g, tablep, true)))
return true;
- // Send the source command to MySQL
- if (myc.ExecSQL(g, Tabsrc, &w) == RC_FX) {
- myc.Close();
+ } // endif Tabsrc
+
+ return false;
+ } // end of GetSourceTable
+
+/***********************************************************************/
+/* Make the required pivot columns. */
+/***********************************************************************/
+bool TDBPIVOT::MakePivotColumns(PGLOBAL g)
+ {
+ if (!Tdbp->IsView()) {
+ // Now it is time to allocate the pivot and function columns
+ if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) {
+ // Function column not found in table
+ sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
+ return true;
+ } else if (Fcolp->InitValue(g))
return true;
- } // endif Exec
- // We must have a storage query to get pivot column values
- qryp = myc.GetResult(g);
- myc.Close();
- Tdbp = new(g) TDBQRS(qryp);
+ if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) {
+ // Pivot column not found in table
+ sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
+ return true;
+ } else if (Xcolp->InitValue(g))
+ return true;
+
+ // Check and initialize the subtable columns
+ for (PCOL cp = Columns; cp; cp = cp->GetNext())
+ if (cp->GetAmType() == TYPE_AM_SRC) {
+ if (((PSRCCOL)cp)->Init(g))
+ return TRUE;
+
+ } else if (cp->GetAmType() == TYPE_AM_FNC)
+ if (((PFNCCOL)cp)->InitColumn(g))
+ return TRUE;
+
+ } // endif isview
+
+ return false;
+ } // end of MakePivotColumns
+/***********************************************************************/
+/* Make the required pivot columns for an object view. */
+/***********************************************************************/
+bool TDBPIVOT::MakeViewColumns(PGLOBAL g)
+ {
+ if (Tdbp->IsView()) {
+ // Tdbp is a view ColDB cannot be used
+ PCOL colp, cp;
+ PTDBMY tdbp;
+
+ if (Tdbp->GetAmType() != TYPE_AM_MYSQL) {
+ strcpy(g->Message, "View is not MySQL");
+ return true;
+ } else
+ tdbp = (PTDBMY)Tdbp;
+
+ if (!Fncol || !Picol) {
+ strcpy(g->Message, "Missing Function or Pivot column");
+ return true;
+ } // endif
+#if 0
if (!Fncol) {
for (crp = qryp->Colresp; crp; crp = crp->Next)
if (!Picol || stricmp(Picol, crp->Name))
@@ -645,45 +338,33 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g)
} // endif Picol
} // endif Picol
+#endif // 0
- } // endif Tabsrc
-
- // Now it is time to allocate the pivot and function columns
- if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) {
- // Function column not found in table
- sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
- return true;
- } else if (Fcolp->InitValue(g))
- return true;
-
- if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) {
- // Pivot column not found in table
- sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
- return true;
- } else if (Xcolp->InitValue(g))
- return true;
-
- return false;
- } // end of GetSourceTable
+ // Now it is time to allocate the pivot and function columns
+ if (!(Fcolp = tdbp->MakeFieldColumn(g, Fncol)))
+ return true;
-/***********************************************************************/
-/* Allocate source column description block. */
-/***********************************************************************/
-PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
- {
- PCOL colp;
+ if (!(Xcolp = tdbp->MakeFieldColumn(g, Picol)))
+ return true;
- if (cdp->GetOffset()) {
- colp = new(g) FNCCOL(cdp, this, cprec, n);
+ // Check and initialize the subtable columns
+ for (cp = Columns; cp; cp = cp->GetNext())
+ if (cp->GetAmType() == TYPE_AM_SRC) {
+ if ((colp = tdbp->MakeFieldColumn(g, cp->GetName()))) {
+ ((PSRCCOL)cp)->Colp = colp;
+ ((PSRCCOL)cp)->To_Val = colp->GetValue();
+ cp->AddStatus(BUF_READ); // All is done here
+ } else
+ return true;
- if (cdp->GetOffset() > 1)
- Dcolp = colp;
+ } else if (cp->GetAmType() == TYPE_AM_FNC)
+ if (((PFNCCOL)cp)->InitColumn(g))
+ return TRUE;
- } else
- colp = new(g) SRCCOL(cdp, this, cprec, n);
+ } // endif isview
- return colp;
- } // end of MakeCol
+ return false;
+ } // end of MakeViewColumns
/***********************************************************************/
/* PIVOT GetMaxSize: returns the maximum number of rows in the table. */
@@ -696,7 +377,7 @@ int TDBPIVOT::GetMaxSize(PGLOBAL g)
return MaxSize;
#endif // 0
- return 0;
+ return 10;
} // end of GetMaxSize
/***********************************************************************/
@@ -713,8 +394,6 @@ int TDBPIVOT::RowNumber(PGLOBAL g, bool b)
/***********************************************************************/
bool TDBPIVOT::OpenDB(PGLOBAL g)
{
-//PDBUSER dup = (PDBUSER)g->Activityp->Aptr;
-
if (Use == USE_OPEN) {
/*******************************************************************/
/* Table already open, just replace it at its beginning. */
@@ -746,18 +425,10 @@ bool TDBPIVOT::OpenDB(PGLOBAL g)
/*********************************************************************/
if (GetSourceTable(g))
return TRUE;
-
- /*********************************************************************/
- /* Check and initialize the subtable columns. */
- /*********************************************************************/
- for (PCOL cp = Columns; cp; cp = cp->GetNext())
- if (cp->GetAmType() == TYPE_AM_SRC) {
- if (((PPRXCOL)cp)->Init(g))
- return TRUE;
-
- } else if (cp->GetAmType() == TYPE_AM_FNC)
- if (((PFNCCOL)cp)->InitColumn(g))
- return TRUE;
+
+ // For tables, columns must be allocated before opening
+ if (MakePivotColumns(g))
+ return TRUE;
/*********************************************************************/
/* Physically open the object table. */
@@ -765,7 +436,10 @@ bool TDBPIVOT::OpenDB(PGLOBAL g)
if (Tdbp->OpenDB(g))
return TRUE;
- return FALSE;
+ /*********************************************************************/
+ /* Make all required pivot columns for object views. */
+ /*********************************************************************/
+ return MakeViewColumns(g);
} // end of OpenDB
/***********************************************************************/
@@ -881,17 +555,6 @@ void TDBPIVOT::CloseDB(PGLOBAL g)
} // end of CloseDB
-#if 0
-/***********************************************************************/
-/* TDBPIVOT: Compare routine for sorting pivot column values. */
-/***********************************************************************/
-int TDBPIVOT::Qcompare(int *i1, int *i2)
- {
- // TODO: the actual comparison between pivot column result values.
- return Rblkp->CompVal(*i1, *i2);
- } // end of Qcompare
-#endif // 0
-
// ------------------------ FNCCOL functions ----------------------------
/***********************************************************************/
@@ -948,8 +611,6 @@ bool FNCCOL::CompareColumn(void)
SRCCOL::SRCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int n)
: PRXCOL(cdp, tdbp, cprec, n)
{
- // Set additional SRC access method information for column.
-//Cnval = NULL;
} // end of SRCCOL constructor
/***********************************************************************/
@@ -960,9 +621,6 @@ bool SRCCOL::Init(PGLOBAL g)
if (PRXCOL::Init(g))
return true;
- // Will contain the last value
-//Cnval = AllocateValue(g, Value, TYPE_VOID);
-
AddStatus(BUF_READ); // All is done here
return false;
} // end of SRCCOL constructor
@@ -972,7 +630,6 @@ bool SRCCOL::Init(PGLOBAL g)
/***********************************************************************/
void SRCCOL::SetColumn(void)
{
-//Cnval->SetValue_pval(Value);
Value->SetValue_pval(To_Val);
} // end of SetColumn
@@ -982,315 +639,7 @@ void SRCCOL::SetColumn(void)
bool SRCCOL::CompareLast(void)
{
// Compare the unconverted values
-//return !Cnval->IsEqual(Colp->GetValue(), true);
return !Value->IsEqual(To_Val, true);
} // end of CompareColumn
-
-/* ------------------------------------------------------------------- */
-
-/***********************************************************************/
-/* Implementation of the TDBQRS class. */
-/***********************************************************************/
-TDBQRS::TDBQRS(PTDBQRS tdbp) : TDBASE(tdbp)
- {
- Qrp = tdbp->Qrp;
- CurPos = tdbp->CurPos;
- } // end of TDBQRS copy constructor
-
-// Method
-PTDB TDBQRS::CopyOne(PTABS t)
- {
- PTDB tp;
- PQRSCOL cp1, cp2;
- PGLOBAL g = t->G; // Is this really useful ???
-
- tp = new(g) TDBQRS(this);
-
- for (cp1 = (PQRSCOL)Columns; cp1; cp1 = (PQRSCOL)cp1->GetNext()) {
- cp2 = new(g) QRSCOL(cp1, tp); // Make a copy
- NewPointer(t, cp1, cp2);
- } // endfor cp1
-
- return tp;
- } // end of CopyOne
-
-#if 0 // The TDBASE functions return NULL when To_Def is NULL
-/***********************************************************************/
-/* Return the pointer on the DB catalog this table belongs to. */
-/***********************************************************************/
-PCATLG TDBQRS::GetCat(void)
- {
- // To_Def is null for QRYRES tables
- return NULL;
- } // end of GetCat
-
-/***********************************************************************/
-/* Return the datapath of the DB this table belongs to. */
-/***********************************************************************/
-PSZ TDBQRS::GetPath(void)
- {
- // To_Def is null for QRYRES tables
- return NULL;
- } // end of GetPath
-#endif // 0
-
-/***********************************************************************/
-/* Initialize QRS column description block construction. */
-/* name is used to call columns by name. */
-/* num is used by LNA to construct columns by index number. */
-/* Note: name=Null and num=0 for constructing all columns (select *) */
-/***********************************************************************/
-PCOL TDBQRS::ColDB(PGLOBAL g, PSZ name, int num)
- {
- int i;
- PCOLRES crp;
- PCOL cp, colp = NULL, cprec = NULL;
-
- if (trace)
- htrc("QRS ColDB: colname=%s tabname=%s num=%d\n",
- SVP(name), Name, num);
-
- for (crp = Qrp->Colresp, i = 1; crp; crp = crp->Next, i++)
- if ((!name && !num) ||
- (name && !stricmp(crp->Name, name)) || num == i) {
- // Check for existence of desired column
- // Also find where to insert the new block
- for (cp = Columns; cp; cp = cp->GetNext())
- if (cp->GetIndex() < i)
- cprec = cp;
- else if (cp->GetIndex() == i)
- break;
-
- if (trace) {
- if (cp)
- htrc("cp(%d).Name=%s cp=%p\n", i, cp->GetName(), cp);
- else
- htrc("cp(%d) cp=%p\n", i, cp);
- } // endif trace
-
- // Now take care of Column Description Block
- if (cp)
- colp = cp;
- else
- colp = new(g) QRSCOL(g, crp, this, cprec, i);
-
- if (name || num)
- break;
- else
- cprec = colp;
-
- } // endif Name
-
- return (colp);
- } // end of ColDB
-
-/***********************************************************************/
-/* QRS GetMaxSize: returns maximum table size in number of lines. */
-/***********************************************************************/
-int TDBQRS::GetMaxSize(PGLOBAL g)
- {
- MaxSize = Qrp->Maxsize;
- return MaxSize;
- } // end of GetMaxSize
-
-/***********************************************************************/
-/* RowNumber: returns the current row ordinal number. */
-/***********************************************************************/
-int TDBQRS::RowNumber(PGLOBAL g, BOOL b)
- {
- return (CurPos + 1);
- } // end of RowNumber
-
-/***********************************************************************/
-/* QRS Access Method opening routine. */
-/* New method now that this routine is called recursively (last table */
-/* first in reverse order): index blocks are immediately linked to */
-/* join block of next table if it exists or else are discarted. */
-/***********************************************************************/
-bool TDBQRS::OpenDB(PGLOBAL g)
- {
- if (trace)
- htrc("QRS OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n",
- this, Tdb_No, Use, To_Key_Col, Mode);
-
- if (Mode != MODE_READ) {
- sprintf(g->Message, MSG(BAD_QUERY_OPEN), Mode);
- return TRUE;
- } // endif Mode
-
- CurPos = -1;
-
- if (Use == USE_OPEN)
- return FALSE;
-
- /*********************************************************************/
- /* Open (retrieve data from) the query if not already open. */
- /*********************************************************************/
- Use = USE_OPEN; // Do it now in case we are recursively called
-
- return FALSE;
- } // end of OpenDB
-
-/***********************************************************************/
-/* GetRecpos: returns current position of next sequential read. */
-/***********************************************************************/
-int TDBQRS::GetRecpos(void)
- {
- return (CurPos);
- } // end of GetRecpos
-
-/***********************************************************************/
-/* ReadDB: Data Base read routine for QRS access method. */
-/***********************************************************************/
-int TDBQRS::ReadDB(PGLOBAL g)
- {
- int rc = RC_OK;
-
- if (trace)
- htrc("QRS ReadDB: R%d CurPos=%d key=%p link=%p Kindex=%p\n",
- GetTdb_No(), CurPos, To_Key_Col, To_Link, To_Kindex);
-
-#if 0
- if (To_Kindex) {
- /*******************************************************************/
- /* Reading is by an index table. */
- /*******************************************************************/
- int recpos = To_Kindex->Fetch(g);
-
- switch (recpos) {
- case -1: // End of file reached
- rc = RC_EF;
- break;
- case -2: // No match for join
- rc = RC_NF;
- break;
- case -3: // Same record as last non null one
- rc = RC_OK;
- break;
- default:
- /***************************************************************/
- /* Set the file position according to record to read. */
- /***************************************************************/
- CurPos = recpos;
- } // endswitch recpos
-
- if (trace)
- htrc("Position is now %d\n", CurPos);
-
- } else
-#endif // 0
- /*******************************************************************/
- /* !To_Kindex ---> sequential reading */
- /*******************************************************************/
- rc = (++CurPos < Qrp->Nblin) ? RC_OK : RC_EF;
-
- return rc;
- } // end of ReadDB
-
-/***********************************************************************/
-/* Dummy WriteDB: just send back an error return. */
-/***********************************************************************/
-int TDBQRS::WriteDB(PGLOBAL g)
- {
- strcpy(g->Message, MSG(QRY_READ_ONLY));
- return RC_FX;
- } // end of WriteDB
-
-/***********************************************************************/
-/* Dummy DeleteDB routine, just returns an error code. */
-/***********************************************************************/
-int TDBQRS::DeleteDB(PGLOBAL g, int irc)
- {
- strcpy(g->Message, MSG(NO_QRY_DELETE));
- return RC_FX;
- } // end of DeleteDB
-
-/***********************************************************************/
-/* Data Base close routine for QRS access method. */
-/***********************************************************************/
-void TDBQRS::CloseDB(PGLOBAL g)
- {
-//if (To_Kindex) {
-// To_Kindex->Close();
-// To_Kindex = NULL;
-// } // endif
-
- if (trace)
- htrc("Qryres CloseDB");
-
-//Qryp->Sqlp->CloseDB();
- } // end of CloseDB
-
-// ------------------------ QRSCOL functions ----------------------------
-
-/***********************************************************************/
-/* QRSCOL public constructor. */
-/***********************************************************************/
-QRSCOL::QRSCOL(PGLOBAL g, PCOLRES crp, PTDB tdbp, PCOL cprec, int i)
- : COLBLK(NULL, tdbp, i)
- {
- if (cprec) {
- Next = cprec->GetNext();
- cprec->SetNext(this);
- } else {
- Next = tdbp->GetColumns();
- tdbp->SetColumns(this);
- } // endif cprec
-
- // Set additional QRS access method information for column.
- Crp = crp;
- Name = Crp->Name;
- Long = Crp->Clen;
- Buf_Type = crp->Type;
- strcpy(Format.Type, GetFormatType(Buf_Type));
- Format.Length = (short)Long;
- Format.Prec = (short)Crp->Prec;
-
- if (trace) {
- htrc("Making new QRSCOL C%d %s at %p\n", Index, Name, this);
- htrc(" BufType=%d Long=%d length=%d clen=%d\n",
- Buf_Type, Long, Format.Length, Crp->Clen);
- } // endif trace
-
- } // end of QRSCOL constructor
-
-/***********************************************************************/
-/* QRSCOL constructor used for copying columns. */
-/* tdbp is the pointer to the new table descriptor. */
-/***********************************************************************/
-QRSCOL::QRSCOL(QRSCOL *col1, PTDB tdbp) : COLBLK(col1, tdbp)
- {
- Crp = col1->Crp;
- } // end of QRSCOL copy constructor
-
-/***********************************************************************/
-/* ReadColumn: what this routine does is to extract the RESCOL block */
-/* current value and convert it to the column buffer type. */
-/***********************************************************************/
-void QRSCOL::ReadColumn(PGLOBAL g)
- {
- PTDBQRS tdbp = (PTDBQRS)To_Tdb;
-
- if (trace)
- htrc("QRS RC: col %s R%d type=%d CurPos=%d Len=%d\n",
- Name, tdbp->GetTdb_No(), Buf_Type, tdbp->CurPos, Crp->Clen);
-
- if (Crp->Kdata)
- Value->SetValue_pvblk(Crp->Kdata, tdbp->CurPos);
- else
- Value->Reset();
-
- } // end of ReadColumn
-
-/***********************************************************************/
-/* Make file output of a Dos column descriptor block. */
-/***********************************************************************/
-void QRSCOL::Print(PGLOBAL g, FILE *f, uint n)
- {
- COLBLK::Print(g, f, n);
-
- fprintf(f, " Crp=%p\n", Crp);
- } // end of Print
-
/* --------------------- End of TabPivot/TabQrs ---------------------- */