diff options
author | Olivier Bertrand <bertrandop@gmail.com> | 2013-05-19 19:25:06 +0200 |
---|---|---|
committer | Olivier Bertrand <bertrandop@gmail.com> | 2013-05-19 19:25:06 +0200 |
commit | c035bde34c3a5a63451b51031d508a425ce9a3ae (patch) | |
tree | 40f513d803531c29a04dd18934ab20e799fba964 /storage/connect/tabpivot.cpp | |
parent | 3c76e0e2ad05229ea1718b6e9c3dce85d75aaa4d (diff) | |
download | mariadb-git-c035bde34c3a5a63451b51031d508a425ce9a3ae.tar.gz |
- Allowing views and queries as parameters for PROXY base tables
NOTE: Checking for looping references cannot be done when using views
as parameters. This should not be allowed on production servers and
should be dependant on a system variable and/or on speciel grant.
modified:
storage/connect/CMakeLists.txt
storage/connect/connect.cc
storage/connect/ha_connect.cc
storage/connect/myconn.cpp
storage/connect/myconn.h
storage/connect/mysql-test/connect/r/fmt.result
storage/connect/mysql-test/connect/r/pivot.result
storage/connect/mysql-test/connect/t/fmt.test
storage/connect/mysql-test/connect/t/pivot.test
storage/connect/plgdbsem.h
storage/connect/plugutil.c
storage/connect/tabcol.cpp
storage/connect/tabcol.h
storage/connect/tabfmt.cpp
storage/connect/tabmysql.cpp
storage/connect/tabmysql.h
storage/connect/taboccur.cpp
storage/connect/taboccur.h
storage/connect/tabpivot.cpp
storage/connect/tabpivot.h
storage/connect/tabtbl.cpp
storage/connect/tabutil.cpp
storage/connect/tabutil.h
storage/connect/xtable.h
Diffstat (limited to 'storage/connect/tabpivot.cpp')
-rw-r--r-- | storage/connect/tabpivot.cpp | 1003 |
1 files changed, 176 insertions, 827 deletions
diff --git a/storage/connect/tabpivot.cpp b/storage/connect/tabpivot.cpp index 4ae1b3ceae6..1a73f353970 100644 --- a/storage/connect/tabpivot.cpp +++ b/storage/connect/tabpivot.cpp @@ -1,7 +1,7 @@ /************ TabPivot C++ Program Source Code File (.CPP) *************/ /* PROGRAM NAME: TABPIVOT */ /* ------------- */ -/* Version 1.4 */ +/* Version 1.5 */ /* */ /* COPYRIGHT: */ /* ---------- */ @@ -41,11 +41,9 @@ #include "global.h" #include "plgdbsem.h" #include "xtable.h" -//#include "xindex.h" #include "tabcol.h" #include "colblk.h" -//#include "tabmysql.h" -#include "myconn.h" +#include "tabmysql.h" #include "csort.h" #include "tabutil.h" #include "tabpivot.h" @@ -55,381 +53,6 @@ extern "C" int trace; -#if 0 -/***********************************************************************/ -/* Prepare the source table Query. */ -/***********************************************************************/ -PQRYRES TDBPIVOT::GetSourceTable(PGLOBAL g) - { - if (Qryp) - return Qryp; // Already done - - if (Tabname) { - char *def, *colist; - size_t len = 0; - PCOL colp; - PDBUSER dup = (PDBUSER)g->Activityp->Aptr; - - if (InitTable(g)) - return NULL; - - // Evaluate the length of the column list - for (colp = Tdbp->Columns; colp; colp = colp->GetNext()) - len += (strlen(colp->GetName()) + 2); - - *(colist = (char*)PlugSubAlloc(g, NULL, len)) = 0; - - // Locate the suballocated string (size is not known yet) - def = (char*)PlugSubAlloc(g, NULL, 0); - strcpy(def, "SELECT "); - - if (!Fncol) { - for (colp = Tdbp->Columns; colp; colp = colp->GetNext()) - if (!Picol || stricmp(Picol, colp->GetName())) - Fncol = colp->GetName(); - - if (!Fncol) { - strcpy(g->Message, MSG(NO_DEF_FNCCOL)); - return NULL; - } // endif Fncol - - } else if (!(ColDB(g, Fncol, 0))) { - // Function column not found in table - sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname); - return NULL; - } // endif Fcolp - - if (!Picol) { - // Find default Picol as the last one not equal to Fncol - for (colp = Tdbp->Columns; colp; colp = colp->GetNext()) - if (!Fncol || stricmp(Fncol, colp->GetName())) - Picol = colp->GetName(); - - if (!Picol) { - strcpy(g->Message, MSG(NO_DEF_PIVOTCOL)); - return NULL; - } // endif Picol - - } else if (!(ColDB(g, Picol, 0))) { - // Pivot column not found in table - sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname); - return NULL; - } // endif Xcolp - - // Make the other column list - for (colp = Columns; colp; colp = colp->GetNext()) - if (stricmp(Picol, colp->GetName()) && - stricmp(Fncol, colp->GetName())) - strcat(strcat(colist, colp->GetName()), ", "); - - // Add the Pivot column at the end of the list - strcat(strcat(def, strcat(colist, Picol)), ", "); - - // Continue making the definition - if (!GBdone) { - // Make it suitable for Pivot by doing the group by - strcat(strcat(def, Function), "("); - strcat(strcat(strcat(def, Fncol), ") "), Fncol); - strcat(strcat(def, " FROM "), Tabname); - strcat(strcat(def, " GROUP BY "), colist); - } else // Gbdone - strcat(strcat(strcat(def, Fncol), " FROM "), Tabname); - - // Now we know how much was suballocated - Tabsrc = (char*)PlugSubAlloc(g, NULL, strlen(def)); - } else { - strcpy(g->Message, MSG(SRC_TABLE_UNDEF)); - return NULL; - } // endif Tabsrc - - int w; - - // Open a MySQL connection for this table - if (Myc.Open(g, Host, Database, User, Pwd, Port)) - return NULL; - - // Send the source command to MySQL - if (Myc.ExecSQL(g, Tabsrc, &w) == RC_FX) { - Myc.Close(); - return NULL; - } // endif Exec - - // We must have a storage query to get pivot column values - Qryp = Myc.GetResult(g); - Myc.Close(); - Tqrp = new(g) TDBQRS(Qryp); - Tqrp->OpenDB(g); - - if (MakePivotColumns(g) < 0) - return NULL; - - return Qryp; - } // end of GetSourceTable - -/***********************************************************************/ -/* Allocate PIVOT columns description block. */ -/***********************************************************************/ -int TDBPIVOT::MakePivotColumns(PGLOBAL g) - { - if (Mult < 0) { - int ndif, n = 0, nblin = Qryp->Nblin; - PVAL valp; - PCOL cp; - PSRCCOL colp; - PFNCCOL cfnp; - - // Allocate all the source columns - Tqrp->ColDB(g, NULL, 0); - Columns = NULL; // Discard dummy columns blocks - - for (cp = Tqrp->GetColumns(); cp; cp = cp->GetNext()) { - if (cp->InitValue(g)) - return -1; - - if (!stricmp(cp->GetName(), Picol)) { - Xcolp = (PQRSCOL)cp; - Xresp = Xcolp->GetCrp(); - Rblkp = Xresp->Kdata; - } else if (!stricmp(cp->GetName(), Fncol)) { - Fcolp = (PQRSCOL)cp; - } else - if ((colp = new(g) SRCCOL(cp, this, ++n))->Init(g, this)) - return -1; - - } // endfor cp - - if (!Xcolp) { - sprintf(g->Message, MSG(COL_ISNOT_TABLE), - Picol, Tabname ? Tabname : "TabSrc"); - return -1; - } else if (!Fcolp) { - sprintf(g->Message, MSG(COL_ISNOT_TABLE), - Fncol, Tabname ? Tabname : "TabSrc"); - return -1; - } // endif Fcolp - - // Before calling sort, initialize all - Index.Size = nblin * sizeof(int); - Index.Sub = TRUE; // Should be small enough - - if (!PlgDBalloc(g, NULL, Index)) - return -1; - - Offset.Size = (nblin + 1) * sizeof(int); - Offset.Sub = TRUE; // Should be small enough - - if (!PlgDBalloc(g, NULL, Offset)) - return -2; - - ndif = Qsort(g, nblin); - - if (ndif < 0) { // error - return -3; - } else - Ncol = ndif; - - // Now make the functional columns - for (int i = 0; i < Ncol; i++) { - // Allocate the Value used to retieve column names - if (!(valp = AllocateValue(g, Xcolp->GetResultType(), - Xcolp->GetLengthEx(), Xcolp->GetPrecision(), - Xcolp->GetDomain(), Xcolp->GetTo_Tdb()->GetCat()))) - return -4; - - // Get the value that will be the generated column name - valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]); - - // Copy the functional column with new Name and new Value - cfnp = (PFNCCOL)new(g) FNCCOL(Fcolp, this); - - // Initialize the generated column - if (cfnp->InitColumn(g, valp)) - return -5; - - } // endfor i - - // Fields must be updated for ha_connect -// if (UpdateTableFields(g, n + Ncol)) -// return -6; - - // This should be refined later - Mult = nblin; - } // endif Mult - - return Mult; - } // end of MakePivotColumns - -/***********************************************************************/ -/* Update fields in the MySQL table structure */ -/* Note: this does not work. Indeed the new rows are correctly made */ -/* but the final result still specify the unmodified table and the */ -/* returned table only contains the original column values. */ -/* In addition, a new query on the table, when it is into the cache, */ -/* specifies all the new columns and fails because they do not belong */ -/* to the original table. */ -/***********************************************************************/ -bool TDBPIVOT::UpdateTableFields(PGLOBAL g, int n) - { - uchar *trec, *srec, *tptr, *sptr; - int i = 0, k = 0; - uint len; - uint32 nmp, lwm; - size_t buffsize; - PCOL colp; - PHC hc = ((MYCAT*)((PIVOTDEF*)To_Def)->Cat)->GetHandler(); - TABLE *table = hc->GetTable(); - st_mem_root *tmr = &table->mem_root; - st_mem_root *smr = &table->s->mem_root; - Field* *field; - Field *fp, *tfncp, *sfncp; - Field* *ntf; - Field* *nsf; -//my_bitmap_map *org_bitmap; - const MY_BITMAP *map; - - // When sorting read_set selects all columns, so we use def_read_set - map= (const MY_BITMAP *)&table->def_read_set; - - // Find the function field - for (field= table->field; *field; field++) { - fp= *field; - - if (bitmap_is_set(map, fp->field_index)) - if (!stricmp(fp->field_name, Fncol)) { - tfncp = fp; - break; - } // endif Name - - } // endfor field - - for (field= table->s->field; *field; field++) { - fp= *field; - - if (bitmap_is_set(map, fp->field_index)) - if (!stricmp(fp->field_name, Fncol)) { - sfncp = fp; - break; - } // endif Name - - } // endfor field - - // Calculate the new buffer size - len = tfncp->max_data_length(); - buffsize = table->s->rec_buff_length + len * Ncol; - - // Allocate the new record space - if (!(tptr = trec = (uchar*)alloc_root(tmr, 2 * buffsize))) - return TRUE; - - if (!(sptr = srec = (uchar*)alloc_root(smr, 2 * buffsize))) - return TRUE; - - - // Allocate the array of all new table field pointers - if (!(ntf = (Field**)alloc_root(tmr, (uint)((n+1) * sizeof(Field*))))) - return TRUE; - - // Allocate the array of all new table share field pointers - if (!(nsf = (Field**)alloc_root(smr, (uint)((n+1) * sizeof(Field*))))) - return TRUE; - - // First fields are the the ones of the source columns - for (colp = Columns; colp; colp = colp->GetNext()) - if (colp->GetAmType() == TYPE_AM_SRC) { - for (field= table->field; *field; field++) { - fp= *field; - - if (bitmap_is_set(map, fp->field_index)) - if (!stricmp(colp->GetName(), fp->field_name)) { - ntf[i] = fp; - fp->field_index = i++; - fp->ptr = tptr; - tptr += fp->max_data_length(); - break; - } // endif Name - - } // endfor field - - for (field= table->s->field; *field; field++) { - fp= *field; - - if (bitmap_is_set(map, fp->field_index)) - if (!stricmp(colp->GetName(), fp->field_name)) { - nsf[k] = fp; - fp->field_index = k++; - fp->ptr = srec; - srec += fp->max_data_length(); - break; - } // endif Name - - } // endfor field - - } // endif AmType - - // Now add the pivot generated columns - for (colp = Columns; colp; colp = colp->GetNext()) - if (colp->GetAmType() == TYPE_AM_FNC) { - if ((fp = (Field*)memdup_root(tmr, (char*)tfncp, tfncp->size_of()))) { - ntf[i] = fp; - fp->ptr = tptr; - fp->field_name = colp->GetName(); - fp->field_index = i++; - fp->vcol_info = NULL; - fp->stored_in_db = TRUE; - tptr += len; - } else - return TRUE; - - if ((fp = (Field*)memdup_root(smr, (char*)sfncp, sfncp->size_of()))) { - nsf[i] = fp; - fp->ptr = sptr; - fp->field_name = colp->GetName(); - fp->field_index = k++; - fp->vcol_info = NULL; - fp->stored_in_db = TRUE; - sptr += len; - } else - return TRUE; - - } // endif AM_FNC - - // Mark end of the list - ntf[i] = NULL; - nsf[k] = NULL; - - // Update the table fields - nmp = (uint32)((1<<i) - 1); - lwm = (uint32)((-1)<<i); - table->field = ntf; - table->used_fields = i; - table->record[0] = trec; - table->record[1] = trec + buffsize; - *table->def_read_set.bitmap = nmp; - *table->def_read_set.last_word_ptr = nmp; - table->def_read_set.last_word_mask = lwm; - table->def_read_set.n_bits = i; - *table->read_set->bitmap = nmp; - *table->read_set->last_word_ptr = nmp; - table->read_set->last_word_mask = lwm; - table->read_set->n_bits = i; - table->write_set->n_bits = i; - *table->vcol_set->bitmap = 0; - table->vcol_set->n_bits = i; - - // and the share fields - table->s->field = nsf; - table->s->reclength = sptr - srec; - table->s->stored_rec_length = sptr - srec; - table->s->fields = k; - table->s->stored_fields = k; - table->s->rec_buff_length = buffsize; -//table->s->varchar_fields = ???; -//table->s->db_record_offset = ???; -//table->s->null_field_first = ???; - return FALSE; - } // end of UpdateTableFields -#endif // 0 - /* --------------- Implementation of the PIVOT classes --------------- */ /***********************************************************************/ @@ -451,13 +74,12 @@ bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff) char *p1, *p2; PHC hc = ((MYCAT*)Cat)->GetHandler(); - if (!PRXDEF::DefineAM(g, am, poff)) { - Tabname = (char*)Tablep->GetName(); - DB = (char*)Tablep->GetQualifier(); - } else { - DB = Cat->GetStringCatInfo(g, "Database", "*"); - Tabsrc = Cat->GetStringCatInfo(g, "SrcDef", NULL); - } // endif + if (PRXDEF::DefineAM(g, am, poff)) + return TRUE; + + Tabname = (char*)Tablep->GetName(); + DB = (char*)Tablep->GetQualifier(); + Tabsrc = (char*)Tablep->GetSrc(); Host = Cat->GetStringCatInfo(g, "Host", "localhost"); User = Cat->GetStringCatInfo(g, "User", "*"); @@ -477,7 +99,7 @@ bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff) GBdone = Cat->GetBoolCatInfo("Groupby", false); Accept = Cat->GetBoolCatInfo("Accept", false); Port = Cat->GetIntCatInfo("Port", 3306); - Desc = (Tabname) ? Tabname : Tabsrc; + Desc = (Tabsrc) ? Tabsrc : Tabname; return FALSE; } // end of DefineAM @@ -521,6 +143,25 @@ TDBPIVOT::TDBPIVOT(PPIVOTDEF tdp) : TDBPRX(tdp) } // end of TDBPIVOT constructor /***********************************************************************/ +/* Allocate source column description block. */ +/***********************************************************************/ +PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n) + { + PCOL colp; + + if (cdp->GetOffset()) { + colp = new(g) FNCCOL(cdp, this, cprec, n); + + if (cdp->GetOffset() > 1) + Dcolp = colp; + + } else + colp = new(g) SRCCOL(cdp, this, cprec, n); + + return colp; + } // end of MakeCol + +/***********************************************************************/ /* Prepare the source table Query. */ /***********************************************************************/ bool TDBPIVOT::GetSourceTable(PGLOBAL g) @@ -528,72 +169,74 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g) if (Tdbp) return false; // Already done - if (Tabname) { - PTABDEF defp; - PCOLDEF cdp; - - if (InitTable(g)) + if (!Tabsrc && Tabname) { + // Get the table description block of this table + if (!(Tdbp = GetSubTable(g, ((PPIVOTDEF)To_Def)->Tablep, true))) return true; - else - defp = Tdbp->GetDef(); - if (!Fncol) { - for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext()) - if (!Picol || stricmp(Picol, cdp->GetName())) - Fncol = cdp->GetName(); + if (!Tdbp->IsView()) { + PCOLDEF cdp; + PTABDEF defp = Tdbp->GetDef(); if (!Fncol) { - strcpy(g->Message, MSG(NO_DEF_FNCCOL)); - return true; + for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext()) + if (!Picol || stricmp(Picol, cdp->GetName())) + Fncol = cdp->GetName(); + + if (!Fncol) { + strcpy(g->Message, MSG(NO_DEF_FNCCOL)); + return true; + } // endif Fncol + } // endif Fncol - - } // endif Fncol - - if (!Picol) { - // Find default Picol as the last one not equal to Fncol - for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext()) - if (!Fncol || stricmp(Fncol, cdp->GetName())) - Picol = cdp->GetName(); - + if (!Picol) { - strcpy(g->Message, MSG(NO_DEF_PIVOTCOL)); - return true; + // Find default Picol as the last one not equal to Fncol + for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext()) + if (!Fncol || stricmp(Fncol, cdp->GetName())) + Picol = cdp->GetName(); + + if (!Picol) { + strcpy(g->Message, MSG(NO_DEF_PIVOTCOL)); + return true; + } // endif Picol + } // endif Picol - - } // endif Picol - - if (!GBdone) { - char *colist; - // Locate the suballocated colist (size is not known yet) - *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0; + if (!GBdone) { + char *colist; - // Make the column list - for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext()) - if (!cdp->GetOffset()) - strcat(strcat(colist, cdp->GetName()), ", "); + // Locate the suballocated colist (size is not known yet) + *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0; - // Add the Pivot column at the end of the list - strcat(colist, Picol); + // Make the column list + for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext()) + if (!cdp->GetOffset()) + strcat(strcat(colist, cdp->GetName()), ", "); - // Now we know how much was suballocated - PlugSubAlloc(g, NULL, strlen(colist)); + // Add the Pivot column at the end of the list + strcat(colist, Picol); - // Locate the source string (size is not known yet) - Tabsrc = (char*)PlugSubAlloc(g, NULL, 0); - - // Start making the definition - strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", "); + // Now we know how much was suballocated + PlugSubAlloc(g, NULL, strlen(colist)); + + // Locate the source string (size is not known yet) + Tabsrc = (char*)PlugSubAlloc(g, NULL, 0); + + // Start making the definition + strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", "); - // Make it suitable for Pivot by doing the group by - strcat(strcat(Tabsrc, Function), "("); - strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol); - strcat(strcat(Tabsrc, " FROM "), Tabname); - strcat(strcat(Tabsrc, " GROUP BY "), colist); + // Make it suitable for Pivot by doing the group by + strcat(strcat(Tabsrc, Function), "("); + strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol); + strcat(strcat(Tabsrc, " FROM "), Tabname); + strcat(strcat(Tabsrc, " GROUP BY "), colist); - // Now we know how much was suballocated - PlugSubAlloc(g, NULL, strlen(Tabsrc)); - } // endif !GBdone + // Now we know how much was suballocated + PlugSubAlloc(g, NULL, strlen(Tabsrc)); + } // endif !GBdone + + } // endif IsView } else if (!Tabsrc) { strcpy(g->Message, MSG(SRC_TABLE_UNDEF)); @@ -601,26 +244,76 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g) } // endif if (Tabsrc) { - MYSQLC myc; // MySQL connection class - PQRYRES qryp; - PCOLRES crp; - int w; + // Get the new table description block of this source table + PTABLE tablep = new(g) XTAB("whatever", Tabsrc); + + tablep->SetQualifier(Database); - // Open a MySQL connection for this table - if (myc.Open(g, Host, Database, User, Pwd, Port)) + if (!(Tdbp = GetSubTable(g, tablep, true))) return true; - // Send the source command to MySQL - if (myc.ExecSQL(g, Tabsrc, &w) == RC_FX) { - myc.Close(); + } // endif Tabsrc + + return false; + } // end of GetSourceTable + +/***********************************************************************/ +/* Make the required pivot columns. */ +/***********************************************************************/ +bool TDBPIVOT::MakePivotColumns(PGLOBAL g) + { + if (!Tdbp->IsView()) { + // Now it is time to allocate the pivot and function columns + if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) { + // Function column not found in table + sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname); + return true; + } else if (Fcolp->InitValue(g)) return true; - } // endif Exec - // We must have a storage query to get pivot column values - qryp = myc.GetResult(g); - myc.Close(); - Tdbp = new(g) TDBQRS(qryp); + if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) { + // Pivot column not found in table + sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname); + return true; + } else if (Xcolp->InitValue(g)) + return true; + + // Check and initialize the subtable columns
+ for (PCOL cp = Columns; cp; cp = cp->GetNext())
+ if (cp->GetAmType() == TYPE_AM_SRC) {
+ if (((PSRCCOL)cp)->Init(g))
+ return TRUE;
+
+ } else if (cp->GetAmType() == TYPE_AM_FNC)
+ if (((PFNCCOL)cp)->InitColumn(g))
+ return TRUE;
+
+ } // endif isview + + return false; + } // end of MakePivotColumns +/***********************************************************************/ +/* Make the required pivot columns for an object view. */ +/***********************************************************************/ +bool TDBPIVOT::MakeViewColumns(PGLOBAL g) + { + if (Tdbp->IsView()) { + // Tdbp is a view ColDB cannot be used + PCOL colp, cp; + PTDBMY tdbp; + + if (Tdbp->GetAmType() != TYPE_AM_MYSQL) { + strcpy(g->Message, "View is not MySQL"); + return true; + } else + tdbp = (PTDBMY)Tdbp; + + if (!Fncol || !Picol) { + strcpy(g->Message, "Missing Function or Pivot column"); + return true; + } // endif +#if 0 if (!Fncol) { for (crp = qryp->Colresp; crp; crp = crp->Next) if (!Picol || stricmp(Picol, crp->Name)) @@ -645,45 +338,33 @@ bool TDBPIVOT::GetSourceTable(PGLOBAL g) } // endif Picol } // endif Picol +#endif // 0 - } // endif Tabsrc - - // Now it is time to allocate the pivot and function columns - if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) { - // Function column not found in table - sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname); - return true; - } else if (Fcolp->InitValue(g)) - return true; - - if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) { - // Pivot column not found in table - sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname); - return true; - } else if (Xcolp->InitValue(g)) - return true; - - return false; - } // end of GetSourceTable + // Now it is time to allocate the pivot and function columns + if (!(Fcolp = tdbp->MakeFieldColumn(g, Fncol))) + return true; -/***********************************************************************/ -/* Allocate source column description block. */ -/***********************************************************************/ -PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n) - { - PCOL colp; + if (!(Xcolp = tdbp->MakeFieldColumn(g, Picol))) + return true; - if (cdp->GetOffset()) { - colp = new(g) FNCCOL(cdp, this, cprec, n); + // Check and initialize the subtable columns
+ for (cp = Columns; cp; cp = cp->GetNext()) + if (cp->GetAmType() == TYPE_AM_SRC) { + if ((colp = tdbp->MakeFieldColumn(g, cp->GetName()))) { + ((PSRCCOL)cp)->Colp = colp; + ((PSRCCOL)cp)->To_Val = colp->GetValue(); + cp->AddStatus(BUF_READ); // All is done here + } else + return true; - if (cdp->GetOffset() > 1) - Dcolp = colp; + } else if (cp->GetAmType() == TYPE_AM_FNC)
+ if (((PFNCCOL)cp)->InitColumn(g))
+ return TRUE;
- } else - colp = new(g) SRCCOL(cdp, this, cprec, n); + } // endif isview - return colp; - } // end of MakeCol + return false; + } // end of MakeViewColumns /***********************************************************************/ /* PIVOT GetMaxSize: returns the maximum number of rows in the table. */ @@ -696,7 +377,7 @@ int TDBPIVOT::GetMaxSize(PGLOBAL g) return MaxSize; #endif // 0 - return 0; + return 10; } // end of GetMaxSize /***********************************************************************/ @@ -713,8 +394,6 @@ int TDBPIVOT::RowNumber(PGLOBAL g, bool b) /***********************************************************************/ bool TDBPIVOT::OpenDB(PGLOBAL g) { -//PDBUSER dup = (PDBUSER)g->Activityp->Aptr; - if (Use == USE_OPEN) { /*******************************************************************/ /* Table already open, just replace it at its beginning. */ @@ -746,18 +425,10 @@ bool TDBPIVOT::OpenDB(PGLOBAL g) /*********************************************************************/ if (GetSourceTable(g)) return TRUE; - - /*********************************************************************/ - /* Check and initialize the subtable columns. */ - /*********************************************************************/ - for (PCOL cp = Columns; cp; cp = cp->GetNext()) - if (cp->GetAmType() == TYPE_AM_SRC) { - if (((PPRXCOL)cp)->Init(g)) - return TRUE; - - } else if (cp->GetAmType() == TYPE_AM_FNC) - if (((PFNCCOL)cp)->InitColumn(g)) - return TRUE; + + // For tables, columns must be allocated before opening + if (MakePivotColumns(g)) + return TRUE; /*********************************************************************/ /* Physically open the object table. */ @@ -765,7 +436,10 @@ bool TDBPIVOT::OpenDB(PGLOBAL g) if (Tdbp->OpenDB(g)) return TRUE; - return FALSE; + /*********************************************************************/ + /* Make all required pivot columns for object views. */ + /*********************************************************************/ + return MakeViewColumns(g); } // end of OpenDB /***********************************************************************/ @@ -881,17 +555,6 @@ void TDBPIVOT::CloseDB(PGLOBAL g) } // end of CloseDB -#if 0 -/***********************************************************************/ -/* TDBPIVOT: Compare routine for sorting pivot column values. */ -/***********************************************************************/ -int TDBPIVOT::Qcompare(int *i1, int *i2) - { - // TODO: the actual comparison between pivot column result values. - return Rblkp->CompVal(*i1, *i2); - } // end of Qcompare -#endif // 0 - // ------------------------ FNCCOL functions ---------------------------- /***********************************************************************/ @@ -948,8 +611,6 @@ bool FNCCOL::CompareColumn(void) SRCCOL::SRCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int n) : PRXCOL(cdp, tdbp, cprec, n) { - // Set additional SRC access method information for column. -//Cnval = NULL; } // end of SRCCOL constructor /***********************************************************************/ @@ -960,9 +621,6 @@ bool SRCCOL::Init(PGLOBAL g) if (PRXCOL::Init(g)) return true; - // Will contain the last value -//Cnval = AllocateValue(g, Value, TYPE_VOID); - AddStatus(BUF_READ); // All is done here return false; } // end of SRCCOL constructor @@ -972,7 +630,6 @@ bool SRCCOL::Init(PGLOBAL g) /***********************************************************************/ void SRCCOL::SetColumn(void) { -//Cnval->SetValue_pval(Value); Value->SetValue_pval(To_Val); } // end of SetColumn @@ -982,315 +639,7 @@ void SRCCOL::SetColumn(void) bool SRCCOL::CompareLast(void) { // Compare the unconverted values -//return !Cnval->IsEqual(Colp->GetValue(), true); return !Value->IsEqual(To_Val, true); } // end of CompareColumn - -/* ------------------------------------------------------------------- */ - -/***********************************************************************/ -/* Implementation of the TDBQRS class. */ -/***********************************************************************/ -TDBQRS::TDBQRS(PTDBQRS tdbp) : TDBASE(tdbp) - { - Qrp = tdbp->Qrp; - CurPos = tdbp->CurPos; - } // end of TDBQRS copy constructor - -// Method -PTDB TDBQRS::CopyOne(PTABS t) - { - PTDB tp; - PQRSCOL cp1, cp2; - PGLOBAL g = t->G; // Is this really useful ??? - - tp = new(g) TDBQRS(this); - - for (cp1 = (PQRSCOL)Columns; cp1; cp1 = (PQRSCOL)cp1->GetNext()) { - cp2 = new(g) QRSCOL(cp1, tp); // Make a copy - NewPointer(t, cp1, cp2); - } // endfor cp1 - - return tp; - } // end of CopyOne - -#if 0 // The TDBASE functions return NULL when To_Def is NULL -/***********************************************************************/ -/* Return the pointer on the DB catalog this table belongs to. */ -/***********************************************************************/ -PCATLG TDBQRS::GetCat(void) - { - // To_Def is null for QRYRES tables - return NULL; - } // end of GetCat - -/***********************************************************************/ -/* Return the datapath of the DB this table belongs to. */ -/***********************************************************************/ -PSZ TDBQRS::GetPath(void) - { - // To_Def is null for QRYRES tables - return NULL; - } // end of GetPath -#endif // 0 - -/***********************************************************************/ -/* Initialize QRS column description block construction. */ -/* name is used to call columns by name. */ -/* num is used by LNA to construct columns by index number. */ -/* Note: name=Null and num=0 for constructing all columns (select *) */ -/***********************************************************************/ -PCOL TDBQRS::ColDB(PGLOBAL g, PSZ name, int num) - { - int i; - PCOLRES crp; - PCOL cp, colp = NULL, cprec = NULL; - - if (trace) - htrc("QRS ColDB: colname=%s tabname=%s num=%d\n", - SVP(name), Name, num); - - for (crp = Qrp->Colresp, i = 1; crp; crp = crp->Next, i++) - if ((!name && !num) || - (name && !stricmp(crp->Name, name)) || num == i) { - // Check for existence of desired column - // Also find where to insert the new block - for (cp = Columns; cp; cp = cp->GetNext()) - if (cp->GetIndex() < i) - cprec = cp; - else if (cp->GetIndex() == i) - break; - - if (trace) { - if (cp) - htrc("cp(%d).Name=%s cp=%p\n", i, cp->GetName(), cp); - else - htrc("cp(%d) cp=%p\n", i, cp); - } // endif trace - - // Now take care of Column Description Block - if (cp) - colp = cp; - else - colp = new(g) QRSCOL(g, crp, this, cprec, i); - - if (name || num) - break; - else - cprec = colp; - - } // endif Name - - return (colp); - } // end of ColDB - -/***********************************************************************/ -/* QRS GetMaxSize: returns maximum table size in number of lines. */ -/***********************************************************************/ -int TDBQRS::GetMaxSize(PGLOBAL g) - { - MaxSize = Qrp->Maxsize; - return MaxSize; - } // end of GetMaxSize - -/***********************************************************************/ -/* RowNumber: returns the current row ordinal number. */ -/***********************************************************************/ -int TDBQRS::RowNumber(PGLOBAL g, BOOL b) - { - return (CurPos + 1); - } // end of RowNumber - -/***********************************************************************/ -/* QRS Access Method opening routine. */ -/* New method now that this routine is called recursively (last table */ -/* first in reverse order): index blocks are immediately linked to */ -/* join block of next table if it exists or else are discarted. */ -/***********************************************************************/ -bool TDBQRS::OpenDB(PGLOBAL g) - { - if (trace) - htrc("QRS OpenDB: tdbp=%p tdb=R%d use=%d key=%p mode=%d\n", - this, Tdb_No, Use, To_Key_Col, Mode); - - if (Mode != MODE_READ) { - sprintf(g->Message, MSG(BAD_QUERY_OPEN), Mode); - return TRUE; - } // endif Mode - - CurPos = -1; - - if (Use == USE_OPEN) - return FALSE; - - /*********************************************************************/ - /* Open (retrieve data from) the query if not already open. */ - /*********************************************************************/ - Use = USE_OPEN; // Do it now in case we are recursively called - - return FALSE; - } // end of OpenDB - -/***********************************************************************/ -/* GetRecpos: returns current position of next sequential read. */ -/***********************************************************************/ -int TDBQRS::GetRecpos(void) - { - return (CurPos); - } // end of GetRecpos - -/***********************************************************************/ -/* ReadDB: Data Base read routine for QRS access method. */ -/***********************************************************************/ -int TDBQRS::ReadDB(PGLOBAL g) - { - int rc = RC_OK; - - if (trace) - htrc("QRS ReadDB: R%d CurPos=%d key=%p link=%p Kindex=%p\n", - GetTdb_No(), CurPos, To_Key_Col, To_Link, To_Kindex); - -#if 0 - if (To_Kindex) { - /*******************************************************************/ - /* Reading is by an index table. */ - /*******************************************************************/ - int recpos = To_Kindex->Fetch(g); - - switch (recpos) { - case -1: // End of file reached - rc = RC_EF; - break; - case -2: // No match for join - rc = RC_NF; - break; - case -3: // Same record as last non null one - rc = RC_OK; - break; - default: - /***************************************************************/ - /* Set the file position according to record to read. */ - /***************************************************************/ - CurPos = recpos; - } // endswitch recpos - - if (trace) - htrc("Position is now %d\n", CurPos); - - } else -#endif // 0 - /*******************************************************************/ - /* !To_Kindex ---> sequential reading */ - /*******************************************************************/ - rc = (++CurPos < Qrp->Nblin) ? RC_OK : RC_EF; - - return rc; - } // end of ReadDB - -/***********************************************************************/ -/* Dummy WriteDB: just send back an error return. */ -/***********************************************************************/ -int TDBQRS::WriteDB(PGLOBAL g) - { - strcpy(g->Message, MSG(QRY_READ_ONLY)); - return RC_FX; - } // end of WriteDB - -/***********************************************************************/ -/* Dummy DeleteDB routine, just returns an error code. */ -/***********************************************************************/ -int TDBQRS::DeleteDB(PGLOBAL g, int irc) - { - strcpy(g->Message, MSG(NO_QRY_DELETE)); - return RC_FX; - } // end of DeleteDB - -/***********************************************************************/ -/* Data Base close routine for QRS access method. */ -/***********************************************************************/ -void TDBQRS::CloseDB(PGLOBAL g) - { -//if (To_Kindex) { -// To_Kindex->Close(); -// To_Kindex = NULL; -// } // endif - - if (trace) - htrc("Qryres CloseDB"); - -//Qryp->Sqlp->CloseDB(); - } // end of CloseDB - -// ------------------------ QRSCOL functions ---------------------------- - -/***********************************************************************/ -/* QRSCOL public constructor. */ -/***********************************************************************/ -QRSCOL::QRSCOL(PGLOBAL g, PCOLRES crp, PTDB tdbp, PCOL cprec, int i) - : COLBLK(NULL, tdbp, i) - { - if (cprec) { - Next = cprec->GetNext(); - cprec->SetNext(this); - } else { - Next = tdbp->GetColumns(); - tdbp->SetColumns(this); - } // endif cprec - - // Set additional QRS access method information for column. - Crp = crp; - Name = Crp->Name; - Long = Crp->Clen; - Buf_Type = crp->Type; - strcpy(Format.Type, GetFormatType(Buf_Type)); - Format.Length = (short)Long; - Format.Prec = (short)Crp->Prec; - - if (trace) { - htrc("Making new QRSCOL C%d %s at %p\n", Index, Name, this); - htrc(" BufType=%d Long=%d length=%d clen=%d\n", - Buf_Type, Long, Format.Length, Crp->Clen); - } // endif trace - - } // end of QRSCOL constructor - -/***********************************************************************/ -/* QRSCOL constructor used for copying columns. */ -/* tdbp is the pointer to the new table descriptor. */ -/***********************************************************************/ -QRSCOL::QRSCOL(QRSCOL *col1, PTDB tdbp) : COLBLK(col1, tdbp) - { - Crp = col1->Crp; - } // end of QRSCOL copy constructor - -/***********************************************************************/ -/* ReadColumn: what this routine does is to extract the RESCOL block */ -/* current value and convert it to the column buffer type. */ -/***********************************************************************/ -void QRSCOL::ReadColumn(PGLOBAL g) - { - PTDBQRS tdbp = (PTDBQRS)To_Tdb; - - if (trace) - htrc("QRS RC: col %s R%d type=%d CurPos=%d Len=%d\n", - Name, tdbp->GetTdb_No(), Buf_Type, tdbp->CurPos, Crp->Clen); - - if (Crp->Kdata) - Value->SetValue_pvblk(Crp->Kdata, tdbp->CurPos); - else - Value->Reset(); - - } // end of ReadColumn - -/***********************************************************************/ -/* Make file output of a Dos column descriptor block. */ -/***********************************************************************/ -void QRSCOL::Print(PGLOBAL g, FILE *f, uint n) - { - COLBLK::Print(g, f, n); - - fprintf(f, " Crp=%p\n", Crp); - } // end of Print - /* --------------------- End of TabPivot/TabQrs ---------------------- */ |