summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorOlivier Bertrand <bertrandop@gmail.com>2014-03-10 18:59:36 +0100
committerOlivier Bertrand <bertrandop@gmail.com>2014-03-10 18:59:36 +0100
commitd67ad26b33ea16a3b59215ef967bdd9b89345e04 (patch)
tree3f7f41b17a0678b6c01a89b59b04a99dfcfcfc4b /storage
parent85e8aee47d2e1cc58857148293f84ccd7e2ec620 (diff)
downloadmariadb-git-d67ad26b33ea16a3b59215ef967bdd9b89345e04.tar.gz
- Adding files needed for block indexing
added: storage/connect/array.cpp storage/connect/array.h storage/connect/blkfil.cpp storage/connect/blkfil.h storage/connect/filter.cpp storage/connect/filter.h
Diffstat (limited to 'storage')
-rw-r--r--storage/connect/array.cpp1095
-rw-r--r--storage/connect/array.h122
-rw-r--r--storage/connect/blkfil.cpp1080
-rw-r--r--storage/connect/blkfil.h295
-rw-r--r--storage/connect/filter.cpp1733
-rw-r--r--storage/connect/filter.h172
6 files changed, 4497 insertions, 0 deletions
diff --git a/storage/connect/array.cpp b/storage/connect/array.cpp
new file mode 100644
index 00000000000..052057ad12b
--- /dev/null
+++ b/storage/connect/array.cpp
@@ -0,0 +1,1095 @@
+/************* Array C++ Functions Source Code File (.CPP) *************/
+/* Name: ARRAY.CPP Version 2.3 */
+/* */
+/* (C) Copyright to the author Olivier BERTRAND 2005-2014 */
+/* */
+/* This file contains the XOBJECT derived class ARRAY functions. */
+/* ARRAY is used for elaborate type of processing, such as sorting */
+/* and dichotomic search (Find). This new version does not use sub */
+/* classes anymore for the different types but relies entirely on the */
+/* functionalities provided by the VALUE and VALBLK classes. */
+/* Currently the only supported types are STRING, SHORT, int, DATE, */
+/* TOKEN, DOUBLE, and Compressed Strings. */
+/***********************************************************************/
+
+/***********************************************************************/
+/* Include relevant MariaDB header file. */
+/***********************************************************************/
+#include "my_global.h"
+#include "sql_class.h"
+//#include "sql_time.h"
+
+#if defined(WIN32)
+//#include <windows.h>
+#else // !WIN32
+#include <string.h>
+#endif // !WIN32
+
+/***********************************************************************/
+/* Include required application header files */
+/* global.h is header containing all global Plug declarations. */
+/* plgdbsem.h is header containing the DB applic. declarations. */
+/* xobject.h is header containing XOBJECT derived classes declares. */
+/***********************************************************************/
+#include "global.h"
+#include "plgdbsem.h"
+#include "xtable.h"
+#include "array.h"
+//#include "select.h"
+//#include "query.h"
+//#include "token.h"
+
+/***********************************************************************/
+/* Macro definitions. */
+/***********************************************************************/
+#if defined(_DEBUG)
+#define ASSERT(B) assert(B);
+#else
+#define ASSERT(B)
+#endif
+
+/***********************************************************************/
+/* Static variables. */
+/***********************************************************************/
+extern "C" int trace;
+
+/***********************************************************************/
+/* DB static external variables. */
+/***********************************************************************/
+extern MBLOCK Nmblk; /* Used to initialize MBLOCK's */
+
+/***********************************************************************/
+/* External functions. */
+/***********************************************************************/
+BYTE OpBmp(PGLOBAL g, OPVAL opc);
+void EncodeValue(int *lp, char *strp, int n);
+
+/***********************************************************************/
+/* MakeValueArray: Makes a value array from a value list. */
+/***********************************************************************/
+PARRAY MakeValueArray(PGLOBAL g, PPARM pp)
+ {
+ int n, valtyp = 0;
+ size_t len = 0;
+ PARRAY par;
+ PPARM parmp;
+
+ if (!pp)
+ return NULL;
+
+ /*********************************************************************/
+ /* New version with values coming in a list. */
+ /*********************************************************************/
+ if ((valtyp = pp->Type) != TYPE_STRING)
+ len = 1;
+
+ if (trace)
+ htrc("valtyp=%d len=%d\n", valtyp, len);
+
+ /*********************************************************************/
+ /* Firstly check the list and count the number of values in it. */
+ /*********************************************************************/
+ for (n = 0, parmp = pp; parmp; n++, parmp = parmp->Next)
+ if (parmp->Type != valtyp) {
+ sprintf(g->Message, MSG(BAD_PARAM_TYPE), "MakeValueArray", parmp->Type);
+ return NULL;
+ } else if (valtyp == TYPE_STRING)
+ len = max(len, strlen((char*)parmp->Value));
+
+ /*********************************************************************/
+ /* Make an array object with one block of the proper size. */
+ /*********************************************************************/
+ par = new(g) ARRAY(g, valtyp, n, (int)len);
+
+ if (par->GetResultType() == TYPE_ERROR)
+ return NULL; // Memory allocation error in ARRAY
+
+ /*********************************************************************/
+ /* All is right now, fill the array block. */
+ /*********************************************************************/
+ for (parmp = pp; parmp; parmp = parmp->Next)
+ switch (valtyp) {
+ case TYPE_STRING:
+ par->AddValue(g, (PSZ)parmp->Value);
+ break;
+ case TYPE_SHORT:
+ par->AddValue(g, *(SHORT*)parmp->Value);
+ break;
+ case TYPE_INT:
+ par->AddValue(g, *(int*)parmp->Value);
+ break;
+ case TYPE_DOUBLE:
+ par->AddValue(g, *(double*)parmp->Value);
+ break;
+ } // endswitch valtyp
+
+ /*********************************************************************/
+ /* Send back resulting array. */
+ /*********************************************************************/
+ return par;
+ } // end of MakeValueArray
+
+/* -------------------------- Class ARRAY ---------------------------- */
+
+/***********************************************************************/
+/* ARRAY public constructor. */
+/***********************************************************************/
+ARRAY::ARRAY(PGLOBAL g, int type, int size, int length, int prec)
+ : CSORT(FALSE)
+ {
+ Nval = 0;
+ Ndif = 0;
+ Bot = 0;
+ Top = 0;
+ Size = size;
+ Type = type;
+ Xsize = -1;
+ Len = 1;
+
+ switch ((Type = type)) {
+ case TYPE_STRING:
+ Len = length;
+ break;
+ case TYPE_SHORT:
+ case TYPE_INT:
+ case TYPE_DOUBLE:
+ break;
+#if 0
+ case TYPE_TOKEN:
+ break;
+ case TYPE_LIST:
+ Len = 0;
+ prec = length;
+ break;
+#endif // 0
+ default: // This is illegal an causes an ill formed array building
+ sprintf(g->Message, MSG(BAD_ARRAY_TYPE), type);
+ Type = TYPE_ERROR;
+ return;
+ } // endswitch type
+
+ Valblk = new(g) MBVALS;
+ Vblp = Valblk->Allocate(g, Type, Len, prec, Size);
+
+ if (!Valblk->GetMemp() && Type != TYPE_LIST)
+ // The error message was built by PlgDBalloc
+ Type = TYPE_ERROR;
+ else
+ Value = AllocateValue(g, type, Len, prec, NULL);
+
+ Constant = TRUE;
+ } // end of ARRAY constructor
+
+#if 0
+/***********************************************************************/
+/* ARRAY public constructor from a QUERY. */
+/***********************************************************************/
+ARRAY::ARRAY(PGLOBAL g, PQUERY qryp) : CSORT(FALSE)
+ {
+ Type = qryp->GetColType(0);
+ Nval = qryp->GetNblin();
+ Ndif = 0;
+ Bot = 0;
+ Top = 0;
+ Size = Nval;
+ Xsize = -1;
+ Len = qryp->GetColLength(0);
+ X = Inf = Sup = 0;
+ Correlated = FALSE;
+
+ switch (Type) {
+ case TYPE_STRING:
+ case TYPE_SHORT:
+ case TYPE_INT:
+ case TYPE_DATE:
+ case TYPE_DOUBLE:
+// case TYPE_TOKEN:
+// case TYPE_LIST:
+// Valblk = qryp->GetCol(0)->Result;
+// Vblp = qryp->GetColBlk(0);
+// Value = qryp->GetColValue(0);
+// break;
+ default: // This is illegal an causes an ill formed array building
+ sprintf(g->Message, MSG(BAD_ARRAY_TYPE), Type);
+ Type = TYPE_ERROR;
+ } // endswitch type
+
+ if (!Valblk || (!Valblk->GetMemp() && Type != TYPE_LIST))
+ // The error message was built by ???
+ Type = TYPE_ERROR;
+
+ Constant = TRUE;
+ } // end of ARRAY constructor
+
+/***********************************************************************/
+/* ARRAY constructor from a TYPE_LIST subarray. */
+/***********************************************************************/
+ARRAY::ARRAY(PGLOBAL g, PARRAY par, int k) : CSORT(FALSE)
+ {
+ int prec;
+ LSTBLK *lp;
+
+ if (par->Type != TYPE_LIST) {
+ Type = TYPE_ERROR;
+ return;
+ } // endif Type
+
+ lp = (LSTBLK*)par->Vblp;
+
+ Nval = par->Nval;
+ Ndif = 0;
+ Bot = 0;
+ Top = 0;
+ Size = par->Size;
+ Xsize = -1;
+
+ Valblk = lp->Mbvk[k];
+ Vblp = Valblk->Vblk;
+ Type = Vblp->GetType();
+ Len = (Type == TYPE_STRING) ? Vblp->GetVlen() : 0;
+ prec = (Type == TYPE_FLOAT) ? 2 : 0;
+ Value = AllocateValue(g, Type, Len, prec, NULL);
+ Constant = TRUE;
+ } // end of ARRAY constructor
+#endif // 0
+
+/***********************************************************************/
+/* Empty: reset the array for a new use (correlated queries). */
+/* Note: this is temporary as correlated queries will not use arrays */
+/* anymore with future optimized algorithms. */
+/***********************************************************************/
+void ARRAY::Empty(void)
+ {
+ assert(Correlated);
+ Nval = Ndif = 0;
+ Bot = Top = X = Inf = Sup = 0;
+ } // end of Empty
+
+/***********************************************************************/
+/* Add a string element to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, PSZ strp)
+ {
+ if (Type != TYPE_STRING) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "CHAR");
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding string(%d): '%s'\n", Nval, strp);
+
+//Value->SetValue_psz(strp);
+//Vblp->SetValue(valp, Nval++);
+ Vblp->SetValue(strp, Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Add a SHORT integer element to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, SHORT n)
+ {
+ if (Type != TYPE_SHORT) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "SHORT");
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding SHORT(%d): %hd\n", Nval, n);
+
+//Value->SetValue(n);
+//Vblp->SetValue(valp, Nval++);
+ Vblp->SetValue(n, Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Add a int integer element to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, int n)
+ {
+ if (Type != TYPE_INT) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "INTEGER");
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding int(%d): %d\n", Nval, n);
+
+//Value->SetValue(n);
+//Vblp->SetValue(valp, Nval++);
+ Vblp->SetValue(n, Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Add a double float element to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, double d)
+ {
+ if (Type != TYPE_DOUBLE) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE), GetTypeName(Type), "DOUBLE");
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding float(%d): %lf\n", Nval, d);
+
+ Value->SetValue(d);
+ Vblp->SetValue(Value, Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Add the value of a XOBJECT block to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, PXOB xp)
+ {
+ if (Type != xp->GetResultType()) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE),
+ GetTypeName(xp->GetResultType()), GetTypeName(Type));
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding (%d) from xp=%p\n", Nval, xp);
+
+//AddValue(xp->GetValue());
+ Vblp->SetValue(xp->GetValue(), Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Add a value to an array. */
+/***********************************************************************/
+bool ARRAY::AddValue(PGLOBAL g, PVAL vp)
+ {
+ if (Type != vp->GetType()) {
+ sprintf(g->Message, MSG(ADD_BAD_TYPE),
+ GetTypeName(vp->GetType()), GetTypeName(Type));
+ return TRUE;
+ } // endif Type
+
+ if (trace)
+ htrc(" adding (%d) from vp=%p\n", Nval, vp);
+
+ Vblp->SetValue(vp, Nval++);
+ return FALSE;
+ } // end of AddValue
+
+/***********************************************************************/
+/* Retrieve the nth value of the array. */
+/***********************************************************************/
+void ARRAY::GetNthValue(PVAL valp, int n)
+ {
+ valp->SetValue_pvblk(Vblp, n);
+ } // end of GetNthValue
+
+#if 0
+/***********************************************************************/
+/* Retrieve the nth subvalue of a list array. */
+/***********************************************************************/
+bool ARRAY::GetSubValue(PGLOBAL g, PVAL valp, int *kp)
+ {
+ PVBLK vblp;
+
+ if (Type != TYPE_LIST) {
+ sprintf(g->Message, MSG(NO_SUB_VAL), Type);
+ return TRUE;
+ } // endif Type
+
+ vblp = ((LSTBLK*)Vblp)->Mbvk[kp[0]]->Vblk;
+ valp->SetValue_pvblk(vblp, kp[1]);
+ return FALSE;
+ } // end of GetNthValue
+#endif // 0
+
+/***********************************************************************/
+/* Return the nth value of a STRING array. */
+/***********************************************************************/
+char *ARRAY::GetStringValue(int n)
+ {
+ assert (Type == TYPE_STRING);
+ return Vblp->GetCharValue(n);
+ } // end of GetStringValue
+
+/***********************************************************************/
+/* Find whether a value is in an array. */
+/* Provide a conversion limited to the Value limitation. */
+/***********************************************************************/
+bool ARRAY::Find(PVAL valp)
+ {
+ register int n;
+ PVAL vp;
+
+ if (Type != valp->GetType()) {
+ Value->SetValue_pval(valp);
+ vp = Value;
+ } else
+ vp = valp;
+
+ Inf = Bot, Sup = Top;
+
+ while (Sup - Inf > 1) {
+ X = (Inf + Sup) >> 1;
+ n = Vblp->CompVal(vp, X);
+
+ if (n < 0)
+ Sup = X;
+ else if (n > 0)
+ Inf = X;
+ else
+ return TRUE;
+
+ } // endwhile
+
+ return FALSE;
+ } // end of Find
+
+/***********************************************************************/
+/* ARRAY: Compare routine for a list of values. */
+/***********************************************************************/
+BYTE ARRAY::Vcompare(PVAL vp, int n)
+ {
+ Value->SetValue_pvblk(Vblp, n);
+ return vp->TestValue(Value);
+ } // end of Vcompare
+
+/***********************************************************************/
+/* Test a filter condition on an array depending on operator and mod. */
+/* Modificator values are 1: ANY (or SOME) and 2: ALL. */
+/***********************************************************************/
+bool ARRAY::FilTest(PGLOBAL g, PVAL valp, OPVAL opc, int opm)
+ {
+ int i;
+ PVAL vp;
+ BYTE bt = OpBmp(g, opc);
+ int top = Nval - 1;
+
+ if (top < 0) // Array is empty
+ // Return TRUE for ALL because it means that there are no item that
+ // does not verify the condition, which is true indeed.
+ // Return FALSE for ANY because TRUE means that there is at least
+ // one item that verifies the condition, which is false.
+ return opm == 2;
+
+ if (valp) {
+ if (Type != valp->GetType()) {
+ Value->SetValue_pval(valp);
+ vp = Value;
+ } else
+ vp = valp;
+
+ } else if (opc != OP_EXIST) {
+ sprintf(g->Message, MSG(MISSING_ARG), opc);
+ longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
+ } else // OP_EXIST
+ return Nval > 0;
+
+ if (opc == OP_IN || (opc == OP_EQ && opm == 1))
+ return Find(vp);
+ else if (opc == OP_NE && opm == 2)
+ return !Find(vp);
+ else if (opc == OP_EQ && opm == 2)
+ return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : FALSE;
+ else if (opc == OP_NE && opm == 1)
+ return (Ndif == 1) ? !(Vcompare(vp, 0) & bt) : TRUE;
+
+ if (Type != TYPE_LIST) {
+ if (opc == OP_GT || opc == OP_GE)
+ return !(Vcompare(vp, (opm == 1) ? 0 : top) & bt);
+ else
+ return !(Vcompare(vp, (opm == 2) ? 0 : top) & bt);
+
+ } // endif Type
+
+ // Case of TYPE_LIST
+ if (opm == 2) {
+ for (i = 0; i < Nval; i++)
+ if (Vcompare(vp, i) & bt)
+ return FALSE;
+
+ return TRUE;
+ } else { // opm == 1
+ for (i = 0; i < Nval; i++)
+ if (!(Vcompare(vp, i) & bt))
+ return TRUE;
+
+ return FALSE;
+ } // endif opm
+
+ } // end of FilTest
+
+/***********************************************************************/
+/* Test whether this array can be converted to TYPE_SHORT. */
+/* Must be called after the array is sorted. */
+/***********************************************************************/
+bool ARRAY::CanBeShort(void)
+ {
+ int* To_Val = (int*)Valblk->GetMemp();
+
+ if (Type != TYPE_INT || !Ndif)
+ return FALSE;
+
+ // Because the array is sorted, this is true if all the array
+ // int values are in the range of SHORT values
+ return (To_Val[0] >= -32768 && To_Val[Nval-1] < 32768);
+ } // end of CanBeShort
+
+/***********************************************************************/
+/* Convert an array to new numeric type k. */
+/* Note: conversion is always made in ascending order from STRING to */
+/* SHORT to int to double so no precision is lost in the conversion. */
+/* One exception is converting from int to SHORT compatible arrays. */
+/***********************************************************************/
+int ARRAY::Convert(PGLOBAL g, int k, PVAL vp)
+ {
+ int i, prec = 0;
+ bool b = FALSE;
+ PMBV ovblk = Valblk;
+ PVBLK ovblp = Vblp;
+
+ Type = k; // k is the new type
+ Valblk = new(g) MBVALS;
+
+ switch (Type) {
+ case TYPE_DOUBLE:
+ prec = 2;
+ case TYPE_SHORT:
+ case TYPE_INT:
+ case TYPE_DATE:
+ Len = 1;
+ break;
+ default:
+ sprintf(g->Message, MSG(BAD_CONV_TYPE), Type);
+ return TYPE_ERROR;
+ } // endswitch k
+
+ Size = Nval;
+ Nval = 0;
+ Vblp = Valblk->Allocate(g, Type, Len, 0, Size);
+
+ if (!Valblk->GetMemp())
+ // The error message was built by PlgDBalloc
+ return TYPE_ERROR;
+ else
+ Value = AllocateValue(g, Type, Len, 0, NULL);
+
+ /*********************************************************************/
+ /* Converting STRING to DATE can be done according to date format. */
+ /*********************************************************************/
+ if (Type == TYPE_DATE && ovblp->GetType() == TYPE_STRING && vp)
+ if (((DTVAL*)Value)->SetFormat(g, vp))
+ return TYPE_ERROR;
+ else
+ b = TRUE; // Sort the new array on date internal values
+
+ /*********************************************************************/
+ /* Do the actual conversion. */
+ /*********************************************************************/
+ for (i = 0; i < Size; i++) {
+ Value->SetValue_pvblk(ovblp, i);
+
+ if (AddValue(g, Value))
+ return TYPE_ERROR;
+
+ } // endfor i
+
+ /*********************************************************************/
+ /* For sorted arrays, get the initial find values. */
+ /*********************************************************************/
+ if (b)
+ Sort(g);
+
+ ovblk->Free();
+ return Type;
+ } // end of Convert
+
+/***********************************************************************/
+/* ARRAY Save: save value at i (used while rordering). */
+/***********************************************************************/
+void ARRAY::Save(int i)
+ {
+ Value->SetValue_pvblk(Vblp, i);
+ } // end of Save
+
+/***********************************************************************/
+/* ARRAY Restore: restore value to j (used while rordering). */
+/***********************************************************************/
+void ARRAY::Restore(int j)
+ {
+ Vblp->SetValue(Value, j);
+ } // end of Restore
+
+/***********************************************************************/
+/* ARRAY Move: move value from k to j (used while rordering). */
+/***********************************************************************/
+void ARRAY::Move(int j, int k)
+ {
+ Vblp->Move(k, j); // VALBLK does the opposite !!!
+ } // end of Move
+
+/***********************************************************************/
+/* ARRAY: Compare routine for one LIST value (ascending only). */
+/***********************************************************************/
+int ARRAY::Qcompare(int *i1, int *i2)
+ {
+ return Vblp->CompVal(*i1, *i2);
+ } // end of Qcompare
+
+/***********************************************************************/
+/* Mainly meant to set the character arrays case sensitiveness. */
+/***********************************************************************/
+void ARRAY::SetPrecision(PGLOBAL g, int p)
+ {
+ if (Vblp == NULL) {
+ strcpy(g->Message, MSG(PREC_VBLP_NULL));
+ longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
+ } // endif Vblp
+
+ bool was = Vblp->IsCi();
+
+ if (was && !p) {
+ strcpy(g->Message, MSG(BAD_SET_CASE));
+ longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
+ } // endif Vblp
+
+ if (was || !p)
+ return;
+ else
+ Vblp->SetPrec(p);
+
+ if (!was && Type == TYPE_STRING)
+ // Must be resorted to eliminate duplicate strings
+ if (Sort(g))
+ longjmp(g->jumper[g->jump_level], TYPE_ARRAY);
+
+ } // end of SetPrecision
+
+/***********************************************************************/
+/* Sort and eliminate distinct values from an array. */
+/* Note: this is done by making a sorted index on distinct values. */
+/* Returns FALSE if Ok or TRUE in case of error. */
+/***********************************************************************/
+bool ARRAY::Sort(PGLOBAL g)
+ {
+ int i, j, k;
+
+ // This is to avoid multiply allocating for correlated subqueries
+ if (Nval > Xsize) {
+ if (Xsize >= 0) {
+ // Was already allocated
+ PlgDBfree(Index);
+ PlgDBfree(Offset);
+ } // endif Xsize
+
+ // Prepare non conservative sort with offet values
+ Index.Size = Nval * sizeof(int);
+
+ if (!PlgDBalloc(g, NULL, Index))
+ goto error;
+
+ Offset.Size = (Nval + 1) * sizeof(int);
+
+ if (!PlgDBalloc(g, NULL, Offset))
+ goto error;
+
+ Xsize = Nval;
+ } // endif Nval
+
+ // Call the sort program, it returns the number of distinct values
+ Ndif = Qsort(g, Nval);
+
+ if (Ndif < 0)
+ goto error;
+
+ // Use the sort index to reorder the data in storage so it will
+ // be physically sorted and Index can be removed.
+ for (i = 0; i < Nval; i++) {
+ if (Pex[i] == i || Pex[i] == Nval)
+ // Already placed or already moved
+ continue;
+
+ Save(i);
+
+ for (j = i;; j = k) {
+ k = Pex[j];
+ Pex[j] = Nval; // Mark position as set
+
+ if (k == i) {
+ Restore(j);
+ break; // end of loop
+ } else
+ Move(j, k);
+
+ } // endfor j
+
+ } // endfor i
+
+ // Reduce the size of the To_Val array if Ndif < Nval
+ if (Ndif < Nval) {
+ for (i = 1; i < Ndif; i++)
+ if (i != Pof[i])
+ break;
+
+ for (; i < Ndif; i++)
+ Move(i, Pof[i]);
+
+ Nval = Ndif;
+ } // endif ndif
+
+ if (!Correlated) {
+ if (Size > Nval) {
+ Size = Nval;
+ Valblk->ReAllocate(g, Size);
+ } // endif Size
+
+ // Index and Offset are not used anymore
+ PlgDBfree(Index);
+ PlgDBfree(Offset);
+ Xsize = -1;
+ } // endif Correlated
+
+ Bot = -1; // For non optimized search
+ Top = Ndif; // Find searches the whole array.
+ return FALSE;
+
+ error:
+ Nval = Ndif = 0;
+ Valblk->Free();
+ PlgDBfree(Index);
+ PlgDBfree(Offset);
+ return TRUE;
+ } // end of Sort
+
+/***********************************************************************/
+/* Block filter testing for IN operator on Column/Array operands. */
+/* Here we call Find that returns TRUE if the value is in the array */
+/* with X equal to the index of the found value in the array, or */
+/* FALSE if the value is not in the array with Inf and Sup being the */
+/* indexes of the array values that are immediately below and over */
+/* the not found value. This enables to restrict the array to the */
+/* values that are between the min and max block values and to return */
+/* the indication of whether the Find will be always true, always not */
+/* true or other. */
+/***********************************************************************/
+int ARRAY::BlockTest(PGLOBAL g, int opc, int opm,
+ void *minp, void *maxp, bool s)
+ {
+ bool bin, bax, pin, pax, veq, all = (opm == 2);
+
+ if (Ndif == 0) // Array is empty
+ // Return TRUE for ALL because it means that there are no item that
+ // does not verify the condition, which is true indeed.
+ // Return FALSE for ANY because TRUE means that there is at least
+ // one item that verifies the condition, which is false.
+ return (all) ? 2 : -2;
+ else if (opc == OP_EQ && all && Ndif > 1)
+ return -2;
+ else if (opc == OP_NE && !all && Ndif > 1)
+ return 2;
+// else if (Ndif == 1)
+// all = FALSE;
+
+ // veq is true when all values in the block are equal
+ switch (Type) {
+ case TYPE_STRING: veq = (Vblp->IsCi())
+ ? !stricmp((char*)minp, (char*)maxp)
+ : !strcmp((char*)minp, (char*)maxp); break;
+ case TYPE_SHORT: veq = *(SHORT*)minp == *(SHORT*)maxp; break;
+ case TYPE_INT: veq = *(PINT)minp == *(PINT)maxp; break;
+ case TYPE_DOUBLE: veq = *(double*)minp == *(double*)maxp; break;
+ default: veq = FALSE; // Error ?
+ } // endswitch type
+
+ if (!s)
+ Bot = -1;
+
+ Top = Ndif; // Reset Top at top of list
+ Value->SetBinValue(maxp);
+ Top = (bax = Find(Value)) ? X + 1 : Sup;
+
+ if (bax) {
+ if (opc == OP_EQ)
+ return (veq) ? 1 : 0;
+ else if (opc == OP_NE)
+ return (veq) ? -1 : 0;
+
+ if (X == 0) switch (opc) {
+ // Max value is equal to min list value
+ case OP_LE: return 1; break;
+ case OP_LT: return (veq) ? -1 : 0; break;
+ case OP_GE: return (veq) ? 1 : 0; break;
+ case OP_GT: return -1; break;
+ } // endswitch opc
+
+ pax = (opc == OP_GE) ? (X < Ndif - 1) : TRUE;
+ } else if (Inf == Bot) {
+ // Max value is smaller than min list value
+ return (opc == OP_LT || opc == OP_LE || opc == OP_NE) ? 1 : -1;
+ } else
+ pax = (Sup < Ndif); // True if max value is inside the list value
+
+ if (!veq) {
+ Value->SetBinValue(minp);
+ bin = Find(Value);
+ } else
+ bin = bax;
+
+ Bot = (bin) ? X - 1 : Inf;
+
+ if (bin) {
+ if (opc == OP_EQ || opc == OP_NE)
+ return 0;
+
+ if (X == Ndif - 1) switch (opc) {
+ case OP_GE: return (s) ? 2 : 1; break;
+ case OP_GT: return (veq) ? -1 : 0; break;
+ case OP_LE: return (veq) ? 1 : 0; break;
+ case OP_LT: return (s) ? -2 : -1; break;
+ } // endswitch opc
+
+ pin = (opc == OP_LE) ? (X > 0) : TRUE;
+ } else if (Sup == Ndif) {
+ // Min value is greater than max list value
+ if (opc == OP_GT || opc == OP_GE || opc == OP_NE)
+ return (s) ? 2 : 1;
+ else
+ return (s) ? -2 : -1;
+
+ } else
+ pin = (Inf >= 0); // True if min value is inside the list value
+
+ if (Top - Bot <= 1) {
+ // No list item between min and max value
+#if defined(_DEBUG)
+ assert (!bin && !bax);
+#endif
+ switch (opc) {
+ case OP_EQ: return -1; break;
+ case OP_NE: return 1; break;
+ default: return (all) ? -1 : 1; break;
+ } // endswitch opc
+
+ } // endif
+
+#if defined(_DEBUG)
+ assert (Ndif > 1); // if Ndif = 1 we should have returned already
+#endif
+
+ // At this point, if there are no logical errors in the algorithm,
+ // the only possible overlaps between the array and the block are:
+ // Array: +-------+ +-------+ +-------+ +-----+
+ // Block: +-----+ +---+ +------+ +--------+
+ // TRUE: pax pin pax pin
+ if (all) switch (opc) {
+ case OP_GT:
+ case OP_GE: return (pax) ? -1 : 0; break;
+ case OP_LT:
+ case OP_LE: return (pin) ? -1 : 0; break;
+ } // endswitch opc
+
+ return 0;
+ } // end of BlockTest
+
+/***********************************************************************/
+/* MakeArrayList: Makes a value list from an SQL IN array (in work). */
+/***********************************************************************/
+PSZ ARRAY::MakeArrayList(PGLOBAL g)
+ {
+ char *p, *tp;
+ int i;
+ size_t z, len = 2;
+
+ if (Type == TYPE_LIST)
+ return "(???)"; // To be implemented
+
+ z = max(24, GetTypeSize(Type, Len) + 4);
+ tp = (char*)PlugSubAlloc(g, NULL, z);
+
+ for (i = 0; i < Nval; i++) {
+ Value->SetValue_pvblk(Vblp, i);
+ Value->Print(g, tp, z);
+ len += strlen(tp);
+ } // enfor i
+
+ if (trace)
+ htrc("Arraylist: len=%d\n", len);
+
+ p = (char *)PlugSubAlloc(g, NULL, len);
+ strcpy(p, "(");
+
+ for (i = 0; i < Nval;) {
+ Value->SetValue_pvblk(Vblp, i);
+ Value->Print(g, tp, z);
+ strcat(p, tp);
+ strcat(p, (++i == Nval) ? ")" : ",");
+ } // enfor i
+
+ if (trace)
+ htrc("Arraylist: newlen=%d\n", strlen(p));
+
+ return p;
+ } // end of MakeArrayList
+
+/***********************************************************************/
+/* Make file output of ARRAY contents. */
+/***********************************************************************/
+void ARRAY::Print(PGLOBAL g, FILE *f, UINT n)
+ {
+ char m[64];
+ int lim = min(Nval,10);
+
+ memset(m, ' ', n); // Make margin string
+ m[n] = '\0';
+ fprintf(f, "%sARRAY: type=%d\n", m, Type);
+ memset(m, ' ', n + 2); // Make margin string
+ m[n] = '\0';
+
+ if (Type != TYPE_LIST) {
+ fprintf(f, "%sblock=%p numval=%d\n", m, Valblk->GetMemp(), Nval);
+
+ if (Vblp)
+ for (int i = 0; i < lim; i++) {
+ Value->SetValue_pvblk(Vblp, i);
+ Value->Print(g, f, n+4);
+ } // endfor i
+
+ } else
+ fprintf(f, "%sVALLST: numval=%d\n", m, Nval);
+
+ } // end of Print
+
+/***********************************************************************/
+/* Make string output of ARRAY contents. */
+/***********************************************************************/
+void ARRAY::Print(PGLOBAL g, char *ps, UINT z)
+ {
+ if (z < 16)
+ return;
+
+ sprintf(ps, "ARRAY: type=%d\n", Type);
+ // More to be implemented later
+ } // end of Print
+
+/* -------------------------- Class MULAR ---------------------------- */
+
+/***********************************************************************/
+/* MULAR public constructor. */
+/***********************************************************************/
+MULAR::MULAR(PGLOBAL g, int n) : CSORT(FALSE)
+ {
+ Narray = n;
+ Pars = (PARRAY*)PlugSubAlloc(g, NULL, n * sizeof(PARRAY));
+ } // end of MULAR constructor
+
+/***********************************************************************/
+/* MULAR: Compare routine multiple arrays. */
+/***********************************************************************/
+int MULAR::Qcompare(int *i1, int *i2)
+ {
+ register int i, n = 0;
+
+ for (i = 0; i < Narray; i++)
+ if ((n = Pars[i]->Qcompare(i1, i2)))
+ break;
+
+ return n;
+ } // end of Qcompare
+
+/***********************************************************************/
+/* Sort and eliminate distinct values from multiple arrays. */
+/* Note: this is done by making a sorted index on distinct values. */
+/* Returns FALSE if Ok or TRUE in case of error. */
+/***********************************************************************/
+bool MULAR::Sort(PGLOBAL g)
+ {
+ int i, j, k, n, nval, ndif;
+
+ // All arrays must have the same number of values
+ nval = Pars[0]->Nval;
+
+ for (n = 1; n < Narray; n++)
+ if (Pars[n]->Nval != nval) {
+ strcpy(g->Message, MSG(BAD_ARRAY_VAL));
+ return TRUE;
+ } // endif nval
+
+ // Prepare non conservative sort with offet values
+ Index.Size = nval * sizeof(int);
+
+ if (!PlgDBalloc(g, NULL, Index))
+ goto error;
+
+ Offset.Size = (nval + 1) * sizeof(int);
+
+ if (!PlgDBalloc(g, NULL, Offset))
+ goto error;
+
+ // Call the sort program, it returns the number of distinct values
+ ndif = Qsort(g, nval);
+
+ if (ndif < 0)
+ goto error;
+
+ // Use the sort index to reorder the data in storage so it will
+ // be physically sorted and Index can be removed.
+ for (i = 0; i < nval; i++) {
+ if (Pex[i] == i || Pex[i] == nval)
+ // Already placed or already moved
+ continue;
+
+ for (n = 0; n < Narray; n++)
+ Pars[n]->Save(i);
+
+ for (j = i;; j = k) {
+ k = Pex[j];
+ Pex[j] = nval; // Mark position as set
+
+ if (k == i) {
+ for (n = 0; n < Narray; n++)
+ Pars[n]->Restore(j);
+
+ break; // end of loop
+ } else
+ for (n = 0; n < Narray; n++)
+ Pars[n]->Move(j, k);
+
+ } // endfor j
+
+ } // endfor i
+
+ // Reduce the size of the To_Val array if ndif < nval
+ if (ndif < nval) {
+ for (i = 1; i < ndif; i++)
+ if (i != Pof[i])
+ break;
+
+ for (; i < ndif; i++)
+ for (n = 0; n < Narray; n++)
+ Pars[n]->Move(i, Pof[i]);
+
+ for (n = 0; n < Narray; n++) {
+ Pars[n]->Nval = ndif;
+ Pars[n]->Size = ndif;
+ Pars[n]->Valblk->ReAllocate(g, ndif);
+ } // endfor n
+
+ } // endif ndif
+
+ // Index and Offset are not used anymore
+ PlgDBfree(Index);
+ PlgDBfree(Offset);
+
+ for (n = 0; n < Narray; n++) {
+ Pars[n]->Bot = -1; // For non optimized search
+ Pars[n]->Top = ndif; // Find searches the whole array.
+ } // endfor n
+
+ return FALSE;
+
+ error:
+ PlgDBfree(Index);
+ PlgDBfree(Offset);
+ return TRUE;
+ } // end of Sort
diff --git a/storage/connect/array.h b/storage/connect/array.h
new file mode 100644
index 00000000000..489a26ac0fd
--- /dev/null
+++ b/storage/connect/array.h
@@ -0,0 +1,122 @@
+/**************** Array H Declares Source Code File (.H) ***************/
+/* Name: ARRAY.H Version 3.1 */
+/* */
+/* (C) Copyright to the author Olivier BERTRAND 2005-2014 */
+/* */
+/* This file contains the ARRAY and VALBASE derived classes declares. */
+/***********************************************************************/
+
+/***********************************************************************/
+/* Include required application header files */
+/***********************************************************************/
+#include "xobject.h"
+#include "valblk.h"
+#include "csort.h"
+
+typedef class ARRAY *PARRAY;
+
+/***********************************************************************/
+/* Definition of class ARRAY with all its method functions. */
+/* Note: This is not a general array class that could be defined as */
+/* a template class, but rather a specific object containing a list */
+/* of values to be processed by the filter IN operator. */
+/* In addition it must act as a metaclass by being able to give back */
+/* the type of values it contains. */
+/* It must also be able to convert itself from some type to another. */
+/***********************************************************************/
+class DllExport ARRAY : public XOBJECT, public CSORT { // Array descblock
+ friend class MULAR;
+//friend class VALLST;
+//friend class SFROW;
+ public:
+ // Constructors
+ ARRAY(PGLOBAL g, int type, int size, int len = 1, int prec = 0);
+//ARRAY(PGLOBAL g, PQUERY qryp);
+//ARRAY(PGLOBAL g, PARRAY par, int k);
+
+ // Implementation
+ virtual int GetType(void) {return TYPE_ARRAY;}
+ virtual int GetResultType(void) {return Type;}
+ virtual int GetLength(void) {return Len;}
+ virtual int GetLengthEx(void) {return Len;}
+ virtual int GetScale() {return 0;}
+ int GetNval(void) {return Nval;}
+ int GetSize(void) {return Size;}
+// PVAL GetValp(void) {return Valp;}
+ void SetType(int atype) {Type = atype;}
+ void SetCorrel(bool b) {Correlated = b;}
+
+ // Methods
+ virtual void Reset(void) {Bot = -1;}
+ virtual int Qcompare(int *, int *);
+ virtual bool Compare(PXOB) {assert(FALSE); return FALSE;}
+ virtual bool SetFormat(PGLOBAL, FORMAT&) {assert(FALSE); return FALSE;}
+ virtual int CheckSpcCol(PTDB, int) {return 0;}
+ virtual void Print(PGLOBAL g, FILE *f, UINT n);
+ virtual void Print(PGLOBAL g, char *ps, UINT z);
+ void Empty(void);
+ void SetPrecision(PGLOBAL g, int p);
+ bool AddValue(PGLOBAL g, PSZ sp);
+ bool AddValue(PGLOBAL g, SHORT n);
+ bool AddValue(PGLOBAL g, int n);
+ bool AddValue(PGLOBAL g, double f);
+ bool AddValue(PGLOBAL g, PXOB xp);
+ bool AddValue(PGLOBAL g, PVAL vp);
+ void GetNthValue(PVAL valp, int n);
+ char *GetStringValue(int n);
+ BYTE Vcompare(PVAL vp, int n);
+ void Save(int);
+ void Restore(int);
+ void Move(int, int);
+ bool Sort(PGLOBAL g);
+ bool Find(PVAL valp);
+ bool FilTest(PGLOBAL g, PVAL valp, OPVAL opc, int opm);
+ int Convert(PGLOBAL g, int k, PVAL vp = NULL);
+ int BlockTest(PGLOBAL g, int opc, int opm,
+ void *minp, void *maxp, bool s);
+ PSZ MakeArrayList(PGLOBAL g);
+ bool CanBeShort(void);
+ bool GetSubValue(PGLOBAL g, PVAL valp, int *kp);
+
+ protected:
+ // Members
+ PMBV Valblk; // To the MBVALS class
+ PVBLK Vblp; // To Valblock of the data array
+//PVAL Valp; // The value used for Save and Restore is Value
+ int Size; // Size of value array
+ int Nval; // Total number of items in array
+ int Ndif; // Total number of distinct items in array
+ int Xsize; // Size of Index (used for correlated arrays)
+ int Type; // Type of individual values in the array
+ int Len; // Length of character string
+ int Bot; // Bottom of research index
+ int Top; // Top of research index
+ int X, Inf, Sup; // Used for block optimization
+ bool Correlated; // -----------> Temporary
+ }; // end of class ARRAY
+
+/***********************************************************************/
+/* Definition of class MULAR with all its method functions. */
+/* This class is used when constructing the arrays of constants used */
+/* for indexing. Its only purpose is to provide a way to sort, reduce */
+/* and reorder the arrays of multicolumn indexes as one block. Indeed */
+/* sorting the arrays independantly would break the correspondance of */
+/* column values. */
+/***********************************************************************/
+class MULAR : public CSORT, public BLOCK { // No need to be an XOBJECT
+ public:
+ // Constructor
+ MULAR(PGLOBAL g, int n);
+
+ // Implementation
+ void SetPars(PARRAY par, int i) {Pars[i] = par;}
+
+ // Methods
+ virtual int Qcompare(int *i1, int *i2); // Sort compare routine
+ bool Sort(PGLOBAL g);
+
+ protected:
+ // Members
+ int Narray; // The number of sub-arrays
+ PARRAY *Pars; // To the block of real arrays
+ }; // end of class ARRAY
diff --git a/storage/connect/blkfil.cpp b/storage/connect/blkfil.cpp
new file mode 100644
index 00000000000..d6da60166e8
--- /dev/null
+++ b/storage/connect/blkfil.cpp
@@ -0,0 +1,1080 @@
+/************* BlkFil C++ Program Source Code File (.CPP) **************/
+/* PROGRAM NAME: BLKFIL */
+/* ------------- */
+/* Version 2.5 */
+/* */
+/* COPYRIGHT: */
+/* ---------- */
+/* (C) Copyright to the author Olivier BERTRAND 2004-2014 */
+/* */
+/* WHAT THIS PROGRAM DOES: */
+/* ----------------------- */
+/* This program is the implementation of block indexing classes. */
+/* */
+/***********************************************************************/
+
+/***********************************************************************/
+/* Include relevant MariaDB header file. */
+/***********************************************************************/
+#include "my_global.h"
+#include "sql_class.h"
+//#include "sql_time.h"
+
+#if defined(WIN32)
+//#include <windows.h>
+#else // !WIN32
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#endif // !WIN32
+
+/***********************************************************************/
+/* Include application header files: */
+/***********************************************************************/
+#include "global.h" // global declarations
+#include "plgdbsem.h" // DB application declarations
+#include "xindex.h" // Key Index class declarations
+#include "filamtxt.h" // File access method dcls
+#include "tabdos.h" // TDBDOS and DOSCOL class dcls
+#include "array.h" // ARRAY classes dcls
+#include "blkfil.h" // Block Filter classes dcls
+
+/***********************************************************************/
+/* Static variables. */
+/***********************************************************************/
+extern "C" int trace;
+
+/* ------------------------ Class BLOCKFILTER ------------------------ */
+
+/***********************************************************************/
+/* BLOCKFILTER constructor. */
+/***********************************************************************/
+BLOCKFILTER::BLOCKFILTER(PTDBDOS tdbp, int op)
+ {
+ Tdbp = tdbp;
+ Correl = FALSE;
+ Opc = op;
+ Opm = 0;
+ Result = 0;
+ } // end of BLOCKFILTER constructor
+
+/***********************************************************************/
+/* Make file output of BLOCKFILTER contents. */
+/***********************************************************************/
+void BLOCKFILTER::Print(PGLOBAL g, FILE *f, UINT n)
+ {
+ char m[64];
+
+ memset(m, ' ', n); // Make margin string
+ m[n] = '\0';
+
+ fprintf(f, "%sBLOCKFILTER: at %p opc=%d opm=%d result=%d\n",
+ m, this, Opc, Opm, Result);
+ } // end of Print
+
+/***********************************************************************/
+/* Make string output of BLOCKFILTER contents. */
+/***********************************************************************/
+void BLOCKFILTER::Print(PGLOBAL g, char *ps, UINT z)
+ {
+ strncat(ps, "BlockFilter(s)", z);
+ } // end of Print
+
+
+/* ---------------------- Class BLKFILLOG ---------------------------- */
+
+/***********************************************************************/
+/* BLKFILLOG constructor. */
+/***********************************************************************/
+BLKFILLOG::BLKFILLOG(PTDBDOS tdbp, int op, PBF *bfp, int n)
+ : BLOCKFILTER(tdbp, op)
+ {
+ N = n;
+ Fil = bfp;
+
+ for (int i = 0; i < N; i++)
+ if (Fil[i])
+ Correl |= Fil[i]->Correl;
+
+ } // end of BLKFILLOG constructor
+
+/***********************************************************************/
+/* Reset: this function is used only to check the existence of a */
+/* BLKFILIN block and have it reset its Bot value for sorted columns. */
+/***********************************************************************/
+void BLKFILLOG::Reset(PGLOBAL g)
+ {
+ for (int i = 0; i < N; i++)
+ if (Fil[i])
+ Fil[i]->Reset(g);
+
+ } // end of Reset
+
+/***********************************************************************/
+/* This function is used for block filter evaluation. We use here a */
+/* fuzzy logic between the values returned by evaluation blocks: */
+/* -2: the condition will be always false for the rest of the file. */
+/* -1: the condition will be false for the whole group. */
+/* 0: the condition may be true for some of the group values. */
+/* 1: the condition will be true for the whole group. */
+/* 2: the condition will be always true for the rest of the file. */
+/***********************************************************************/
+int BLKFILLOG::BlockEval(PGLOBAL g)
+ {
+ int i, rc;
+
+ for (i = 0; i < N; i++) {
+ // 0: Means some block filter value may be True
+ rc = (Fil[i]) ? Fil[i]->BlockEval(g) : 0;
+
+ if (!i)
+ Result = (Opc == OP_NOT) ? -rc : rc;
+ else switch (Opc) {
+ case OP_AND:
+ Result = min(Result, rc);
+ break;
+ case OP_OR:
+ Result = max(Result, rc);
+ break;
+ default:
+ // Should never happen
+ Result = 0;
+ return Result;
+ } // endswitch Opc
+
+ } // endfor i
+
+ return Result;
+ } // end of BlockEval
+
+/* ---------------------- Class BLKFILARI----------------------------- */
+
+/***********************************************************************/
+/* BLKFILARI constructor. */
+/***********************************************************************/
+BLKFILARI::BLKFILARI(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
+ : BLOCKFILTER(tdbp, op)
+ {
+ Colp = (PDOSCOL)xp[0];
+
+ if (xp[1]->GetType() == TYPE_COLBLK) {
+ Cpx = (PCOL)xp[1]; // Subquery pseudo constant column
+ Correl = TRUE;
+ } else
+ Cpx = NULL;
+
+ Sorted = Colp->IsSorted() > 0;
+
+ // Don't remember why this was changed. Anyway it is no good for
+ // correlated subqueries because the Value must reflect changes
+ if (Cpx)
+ Valp = xp[1]->GetValue();
+ else
+ Valp = AllocateValue(g, xp[1]->GetValue());
+
+ } // end of BLKFILARI constructor
+
+/***********************************************************************/
+/* Reset: re-eval the constant value in the case of pseudo constant */
+/* column use in a correlated subquery. */
+/***********************************************************************/
+void BLKFILARI::Reset(PGLOBAL g)
+ {
+ if (Cpx) {
+ Cpx->Reset();
+ Cpx->Eval(g);
+ MakeValueBitmap(); // Does nothing for class BLKFILARI
+ } // endif Cpx
+
+ } // end of Reset
+
+/***********************************************************************/
+/* Evaluate block filter for arithmetic operators. */
+/***********************************************************************/
+int BLKFILARI::BlockEval(PGLOBAL g)
+ {
+ int mincmp, maxcmp, n;
+
+#if defined(_DEBUG)
+ assert (Colp->IsClustered());
+#endif
+
+ n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ mincmp = Colp->GetMin()->CompVal(Valp, n);
+ maxcmp = Colp->GetMax()->CompVal(Valp, n);
+
+ switch (Opc) {
+ case OP_EQ:
+ case OP_NE:
+ if (mincmp < 0) // Means minval > Val
+ Result = (Sorted) ? -2 : -1;
+ else if (maxcmp > 0) // Means maxval < Val
+ Result = -1;
+ else if (!mincmp && !maxcmp) // minval = maxval = val
+ Result = 1;
+ else
+ Result = 0;
+
+ break;
+ case OP_GT:
+ case OP_LE:
+ if (mincmp < 0) // minval > Val
+ Result = (Sorted) ? 2 : 1;
+ else if (maxcmp < 0) // maxval > Val
+ Result = 0;
+ else // maxval <= Val
+ Result = -1;
+
+ break;
+ case OP_GE:
+ case OP_LT:
+ if (mincmp <= 0) // minval >= Val
+ Result = (Sorted) ? 2 : 1;
+ else if (maxcmp <= 0) // Maxval >= Val
+ Result = 0;
+ else // Maxval < Val
+ Result = -1;
+
+ break;
+ } // endswitch Opc
+
+ switch (Opc) {
+ case OP_NE:
+ case OP_LE:
+ case OP_LT:
+ Result = -Result;
+ break;
+ } // endswitch Opc
+
+ if (trace)
+ htrc("BlockEval: op=%d n=%d rc=%d\n", Opc, n, Result);
+
+ return Result;
+ } // end of BlockEval
+
+/* ---------------------- Class BLKFILAR2----------------------------- */
+
+/***********************************************************************/
+/* BLKFILAR2 constructor. */
+/***********************************************************************/
+BLKFILAR2::BLKFILAR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
+ : BLKFILARI(g, tdbp, op, xp)
+ {
+ MakeValueBitmap();
+ } // end of BLKFILAR2 constructor
+
+/***********************************************************************/
+/* MakeValueBitmap: Set the constant value bit map. It can be void */
+/* if the constant value is not in the column distinct values list. */
+/***********************************************************************/
+void BLKFILAR2::MakeValueBitmap(void)
+ {
+ int i; // ndv = Colp->GetNdv();
+ bool found = FALSE;
+ PVBLK dval = Colp->GetDval();
+
+ assert(dval);
+
+ /*********************************************************************/
+ /* Here we cannot use Find because we must get the index */
+ /* of where to put the value if it is not found in the array. */
+ /* This is needed by operators other than OP_EQ or OP_NE. */
+ /*********************************************************************/
+ found = dval->Locate(Valp, i);
+
+ /*********************************************************************/
+ /* Set the constant value bitmap. The bitmaps are really matching */
+ /* the OP_EQ, OP_LE, and OP_LT operator but are also used for the */
+ /* other operators for which the Result will be inverted. */
+ /* The reason the bitmaps are not directly complemented for them is */
+ /* to be able to test easily the cases of sorted columns with Bxp, */
+ /* and the case of a void bitmap, which happens if the constant */
+ /* value is not in the column distinct values list. */
+ /*********************************************************************/
+ if (found) {
+ Bmp = 1 << i; // Bit of the found value
+ Bxp = Bmp - 1; // All smaller values
+
+ if (Opc != OP_LT && Opc != OP_GE)
+ Bxp |= Bmp; // Found value must be included
+
+ } else {
+ Bmp = 0;
+ Bxp = (1 << i) - 1;
+ } // endif found
+
+ if (!(Opc == OP_EQ || Opc == OP_NE))
+ Bmp = Bxp;
+
+ } // end of MakeValueBitmap
+
+/***********************************************************************/
+/* Evaluate XDB2 block filter for arithmetic operators. */
+/***********************************************************************/
+int BLKFILAR2::BlockEval(PGLOBAL g)
+ {
+#if defined(_DEBUG)
+ assert (Colp->IsClustered());
+#endif
+
+ int n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ ULONG bkmp = *(PULONG)Colp->GetBmap()->GetValPtr(n);
+ ULONG bres = Bmp & bkmp;
+
+ // Set result as if Opc were OP_EQ, OP_LT, or OP_LE
+ if (!bres) {
+ if (!Bmp)
+ Result = -2; // No good block in the table file
+ else if (!Sorted)
+ Result = -1; // No good values in this block
+ else // Sorted column, test for no more good blocks in file
+ Result = (Bxp & bkmp) ? -1 : -2;
+
+ } else
+ // Test whether all block values are good or only some ones
+ Result = (bres == bkmp) ? 1 : 0;
+
+ // For OP_NE, OP_GE, and OP_GT the result must be inverted.
+ switch (Opc) {
+ case OP_NE:
+ case OP_GE:
+ case OP_GT:
+ Result = -Result;
+ break;
+ } // endswitch Opc
+
+ if (trace)
+ htrc("BlockEval2: op=%d n=%d rc=%d\n", Opc, n, Result);
+
+ return Result;
+ } // end of BlockEval
+
+/* ---------------------- Class BLKFILMR2----------------------------- */
+
+/***********************************************************************/
+/* BLKFILMR2 constructor. */
+/***********************************************************************/
+BLKFILMR2::BLKFILMR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp)
+ : BLKFILARI(g, tdbp, op, xp)
+ {
+ Nbm = Colp->GetNbm();
+ Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+ Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+ MakeValueBitmap();
+ } // end of BLKFILMR2 constructor
+
+/***********************************************************************/
+/* MakeValueBitmap: Set the constant value bit map. It can be void */
+/* if the constant value is not in the column distinct values list. */
+/***********************************************************************/
+void BLKFILMR2::MakeValueBitmap(void)
+ {
+ int i; // ndv = Colp->GetNdv();
+ bool found = FALSE, noteq = !(Opc == OP_EQ || Opc == OP_NE);
+ PVBLK dval = Colp->GetDval();
+
+ assert(dval);
+
+ for (i = 0; i < Nbm; i++)
+ Bmp[i] = Bxp[i] = 0;
+
+ /*********************************************************************/
+ /* Here we cannot use Find because we must get the index */
+ /* of where to put the value if it is not found in the array. */
+ /* This is needed by operators other than OP_EQ or OP_NE. */
+ /*********************************************************************/
+ found = dval->Locate(Valp, i);
+
+ /*********************************************************************/
+ /* For bitmaps larger than a ULONG, we must know where Bmp and Bxp */
+ /* are positioned in the ULONG bit map block array. */
+ /*********************************************************************/
+ N = i / MAXBMP;
+ i %= MAXBMP;
+
+ /*********************************************************************/
+ /* Set the constant value bitmaps. The bitmaps are really matching */
+ /* the OP_EQ, OP_LE, and OP_LT operator but are also used for the */
+ /* other operators for which the Result will be inverted. */
+ /* The reason the bitmaps are not directly complemented for them is */
+ /* to be able to easily test the cases of sorted columns with Bxp, */
+ /* and the case of a void bitmap, which happens if the constant */
+ /* value is not in the column distinct values list. */
+ /*********************************************************************/
+ if (found) {
+ Bmp[N] = 1 << i;
+ Bxp[N] = Bmp[N] - 1;
+
+ if (Opc != OP_LT && Opc != OP_GE)
+ Bxp[N] |= Bmp[N]; // Found value must be included
+
+ } else
+ Bxp[N] = (1 << i) - 1;
+
+ if (noteq)
+ Bmp[N] = Bxp[N];
+
+ Void = !Bmp[N]; // There are no good values in the file
+
+ for (i = 0; i < N; i++) {
+ Bxp[i] = ~0;
+
+ if (noteq)
+ Bmp[i] = Bxp[i];
+
+ Void = Void && !Bmp[i];
+ } // endfor i
+
+ if (!Bmp[N] && !Bxp[N])
+ N--;
+
+ } // end of MakeValueBitmap
+
+/***********************************************************************/
+/* Evaluate XDB2 block filter for arithmetic operators. */
+/***********************************************************************/
+int BLKFILMR2::BlockEval(PGLOBAL g)
+ {
+#if defined(_DEBUG)
+ assert (Colp->IsClustered());
+#endif
+
+ int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ bool fnd = FALSE, all = TRUE, gt = TRUE;
+ ULONG bres;
+ PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
+
+ // Set result as if Opc were OP_EQ, OP_LT, or OP_LE
+ for (i = 0; i < Nbm; i++)
+ if (i <= N) {
+ if ((bres = Bmp[i] & bkmp[i]))
+ fnd = TRUE; // Some good value(s) found in the block
+
+ if (bres != bkmp[i])
+ all = FALSE; // Not all block values are good
+
+ if (Bxp[i] & bkmp[i])
+ gt = FALSE; // Not all block values are > good value(s)
+
+ } else if (bkmp[i]) {
+ all = FALSE;
+ break;
+ } // endif's
+
+ if (!fnd) {
+ if (Void || (gt && Sorted))
+ Result = -2; // No (more) good block in file
+ else
+ Result = -1; // No good values in this block
+
+ } else
+ Result = (all) ? 1 : 0; // All block values are good
+
+ // For OP_NE, OP_GE, and OP_GT the result must be inverted.
+ switch (Opc) {
+ case OP_NE:
+ case OP_GE:
+ case OP_GT:
+ Result = -Result;
+ break;
+ } // endswitch Opc
+
+ if (trace)
+ htrc("BlockEval2: op=%d n=%d rc=%d\n", Opc, n, Result);
+
+ return Result;
+ } // end of BlockEval
+
+/***********************************************************************/
+/* BLKSPCARI constructor. */
+/***********************************************************************/
+BLKSPCARI::BLKSPCARI(PTDBDOS tdbp, int op, PXOB *xp, int bsize)
+ : BLOCKFILTER(tdbp, op)
+ {
+ if (xp[1]->GetType() == TYPE_COLBLK) {
+ Cpx = (PCOL)xp[1]; // Subquery pseudo constant column
+ Correl = TRUE;
+ } else
+ Cpx = NULL;
+
+ Valp = xp[1]->GetValue();
+ Val = (int)xp[1]->GetValue()->GetIntValue();
+ Bsize = bsize;
+ } // end of BLKFILARI constructor
+
+/***********************************************************************/
+/* Reset: re-eval the constant value in the case of pseudo constant */
+/* column use in a correlated subquery. */
+/***********************************************************************/
+void BLKSPCARI::Reset(PGLOBAL g)
+ {
+ if (Cpx) {
+ Cpx->Reset();
+ Cpx->Eval(g);
+ Val = (int)Valp->GetIntValue();
+ } // endif Cpx
+
+ } // end of Reset
+
+/***********************************************************************/
+/* Evaluate block filter for arithmetic operators (ROWID) */
+/***********************************************************************/
+int BLKSPCARI::BlockEval(PGLOBAL g)
+ {
+ int mincmp, maxcmp, n, m;
+
+ n = Tdbp->GetCurBlk();
+ m = n * Bsize + 1; // Minimum Rowid value for this block
+ mincmp = (Val > m) ? 1 : (Val < m) ? (-1) : 0;
+ m = (n + 1) * Bsize; // Maximum Rowid value for this block
+ maxcmp = (Val > m) ? 1 : (Val < m) ? (-1) : 0;
+
+ switch (Opc) {
+ case OP_EQ:
+ case OP_NE:
+ if (mincmp < 0) // Means minval > Val
+ Result = -2; // Always sorted
+ else if (maxcmp > 0) // Means maxval < Val
+ Result = -1;
+ else if (!mincmp && !maxcmp) // minval = maxval = val
+ Result = 1;
+ else
+ Result = 0;
+
+ break;
+ case OP_GT:
+ case OP_LE:
+ if (mincmp < 0) // minval > Val
+ Result = 2; // Always sorted
+ else if (maxcmp < 0) // maxval > Val
+ Result = 0;
+ else // maxval <= Val
+ Result = -1;
+
+ break;
+ case OP_GE:
+ case OP_LT:
+ if (mincmp <= 0) // minval >= Val
+ Result = 2; // Always sorted
+ else if (maxcmp <= 0) // Maxval >= Val
+ Result = 0;
+ else // Maxval < Val
+ Result = -1;
+
+ break;
+ } // endswitch Opc
+
+ switch (Opc) {
+ case OP_NE:
+ case OP_LE:
+ case OP_LT:
+ Result = -Result;
+ break;
+ } // endswitch Opc
+
+ if (trace)
+ htrc("BlockEval: op=%d n=%d rc=%d\n", Opc, n, Result);
+
+ return Result;
+ } // end of BlockEval
+
+/* ------------------------ Class BLKFILIN --------------------------- */
+
+/***********************************************************************/
+/* BLKFILIN constructor. */
+/***********************************************************************/
+BLKFILIN::BLKFILIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
+ : BLOCKFILTER(tdbp, op)
+ {
+ if (op == OP_IN) {
+ Opc = OP_EQ;
+ Opm = 1;
+ } else {
+ Opc = op;
+ Opm = opm;
+ } // endif op
+
+ Colp = (PDOSCOL)xp[0];
+ Arap = (PARRAY)xp[1];
+ Type = Arap->GetResultType();
+
+ if (Colp->GetResultType() != Type) {
+ sprintf(g->Message, "BLKFILIN: %s", MSG(VALTYPE_NOMATCH));
+ longjmp(g->jumper[g->jump_level], 99);
+ } else if (Colp->GetValue()->IsCi())
+ Arap->SetPrecision(g, 1); // Case insensitive
+
+ Sorted = Colp->IsSorted() > 0;
+ } // end of BLKFILIN constructor
+
+/***********************************************************************/
+/* Reset: have the sorted array reset its Bot value to -1 (bottom). */
+/***********************************************************************/
+void BLKFILIN::Reset(PGLOBAL g)
+ {
+ Arap->Reset();
+// MakeValueBitmap(); // Does nothing for class BLKFILIN
+ } // end of Reset
+
+/***********************************************************************/
+/* Evaluate block filter for a IN operator on a constant array. */
+/* Note: here we need to use the GetValPtrEx function to get a zero */
+/* ended string in case of string argument. This is because the ARRAY */
+/* can have a different width than the char column. */
+/***********************************************************************/
+int BLKFILIN::BlockEval(PGLOBAL g)
+ {
+ int n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ void *minp = Colp->GetMin()->GetValPtrEx(n);
+ void *maxp = Colp->GetMax()->GetValPtrEx(n);
+
+ Result = Arap->BlockTest(g, Opc, Opm, minp, maxp, Sorted);
+ return Result;
+ } // end of BlockEval
+
+/* ------------------------ Class BLKFILIN2 -------------------------- */
+
+/***********************************************************************/
+/* BLKFILIN2 constructor. */
+/* New version that takes care of all operators and modificators. */
+/* It is also ready to handle the case of correlated sub-selects. */
+/***********************************************************************/
+BLKFILIN2::BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
+ : BLKFILIN(g, tdbp, op, opm, xp)
+ {
+ Nbm = Colp->GetNbm();
+ Valp = AllocateValue(g, Colp->GetValue());
+ Invert = (Opc == OP_NE || Opc == OP_GE || Opc ==OP_GT);
+ Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+ Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+ MakeValueBitmap();
+ } // end of BLKFILIN2 constructor
+
+/***********************************************************************/
+/* MakeValueBitmap: Set the constant values bit map. It can be void */
+/* if the constant values are not in the column distinct values list. */
+/* The bitmaps are prepared for the EQ, LE, and LT operators and */
+/* takes care of the ALL and ANY modificators. If the operators are */
+/* NE, GE, or GT the modificator is inverted and the result will be. */
+/***********************************************************************/
+void BLKFILIN2::MakeValueBitmap(void)
+ {
+ int i, k, n, ndv = Colp->GetNdv();
+ bool found, noteq = !(Opc == OP_EQ || Opc == OP_NE);
+ bool all = (!Invert) ? (Opm == 2) : (Opm != 2);
+ ULONG btp;
+ PVBLK dval = Colp->GetDval();
+
+ N = -1;
+
+ // Take care of special cases
+ if (!(n = Arap->GetNval())) {
+ // Return TRUE for ALL because it means that there are no item that
+ // does not verify the condition, which is true indeed.
+ // Return FALSE for ANY because TRUE means that there is at least
+ // one item that verifies the condition, which is false.
+ Result = (Opm == 2) ? 2 : -2;
+ return;
+ } else if (!noteq && all && n > 1) {
+ // An item cannot be equal to all different values
+ // or an item is always unequal to any different values
+ Result = (Opc == OP_EQ) ? -2 : 2;
+ return;
+ } // endif's
+
+ for (i = 0; i < Nbm; i++)
+ Bmp[i] = Bxp[i] = 0;
+
+ for (k = 0; k < n; k++) {
+ Arap->GetNthValue(Valp, k);
+ found = dval->Locate(Valp, i);
+ N = i / MAXBMP;
+ btp = 1 << (i % MAXBMP);
+
+ if (found)
+ Bmp[N] |= btp;
+
+ // For LT and LE if ALL the condition applies to the smallest item
+ // if ANY it applies to the largest item. In the case of EQ we come
+ // here only if ANY or if n == 1, so it does applies to the largest.
+ if ((!k && all) || (k == n - 1 && !all)) {
+ Bxp[N] = btp - 1;
+
+ if (found && Opc != OP_LT && Opc != OP_GE)
+ Bxp[N] |= btp; // Found value must be included
+
+ } // endif k, opm
+
+ } // endfor k
+
+ if (noteq)
+ Bmp[N] = Bxp[N];
+
+ Void = !Bmp[N]; // There are no good values in the file
+
+ for (i = 0; i < N; i++) {
+ Bxp[i] = ~0;
+
+ if (noteq) {
+ Bmp[i] = Bxp[i];
+ Void = FALSE;
+ } // endif noteq
+
+ } // endfor i
+
+ if (!Bmp[N] && !Bxp[N]) {
+ if (--N < 0)
+ // All array values are smaller than block values
+ Result = (Invert) ? 2 : -2;
+
+ } else if (N == Nbm - 1 && (signed)Bmp[N] == (1 << (ndv % MAXBMP)) - 1) {
+ // Condition will be always TRUE or FALSE for the whole file
+ Result = (Invert) ? -2 : 2;
+ N = -1;
+ } // endif's
+
+ } // end of MakeValueBitmap
+
+/***********************************************************************/
+/* Evaluate block filter for set operators on a constant array. */
+/* Note: here we need to use the GetValPtrEx function to get a zero */
+/* ended string in case of string argument. This is because the ARRAY */
+/* can have a different width than the char column. */
+/***********************************************************************/
+int BLKFILIN2::BlockEval(PGLOBAL g)
+ {
+ if (N < 0)
+ return Result; // Was set in MakeValueBitmap
+
+ int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ bool fnd = FALSE, all = TRUE, gt = TRUE;
+ ULONG bres;
+ PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
+
+ // Set result as if Opc were OP_EQ, OP_LT, or OP_LE
+ // The difference between ALL or ANY was handled in MakeValueBitmap
+ for (i = 0; i < Nbm; i++)
+ if (i <= N) {
+ if ((bres = Bmp[i] & bkmp[i]))
+ fnd = TRUE;
+
+ if (bres != bkmp[i])
+ all = FALSE;
+
+ if (Bxp[i] & bkmp[i])
+ gt = FALSE;
+
+ } else if (bkmp[i]) {
+ all = FALSE;
+ break;
+ } // endif's
+
+ if (!fnd) {
+ if (Void || (Sorted && gt))
+ Result = -2; // No more good block in file
+ else
+ Result = -1; // No good values in this block
+
+ } else if (all)
+ Result = 1; // All block values are good
+ else
+ Result = 0; // Block contains some good values
+
+ // For OP_NE, OP_GE, and OP_GT the result must be inverted.
+ switch (Opc) {
+ case OP_NE:
+ case OP_GE:
+ case OP_GT:
+ Result = -Result;
+ break;
+ } // endswitch Opc
+
+ return Result;
+ } // end of BlockEval
+
+#if 0
+/***********************************************************************/
+/* BLKFILIN2 constructor. */
+/***********************************************************************/
+BLKFILIN2::BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp)
+ : BLKFILIN(g, tdbp, op, opm, xp)
+ {
+ // Currently, bitmap matching is only implemented for the IN operator
+ if (!(Bitmap = (op == OP_IN || (op == OP_EQ && opm != 2)))) {
+ Nbm = Colp->GetNbm();
+ N = 0;
+ return; // Revert to standard minmax method
+ } // endif minmax
+
+ int i, n;
+ ULONG btp;
+ PVAL valp = AllocateValue(g, Colp->GetValue());
+ PVBLK dval = Colp->GetDval();
+
+ Nbm = Colp->GetNbm();
+ N = -1;
+ Bmp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+ Bxp = (PULONG)PlugSubAlloc(g, NULL, Nbm * sizeof(ULONG));
+
+ for (i = 0; i < Nbm; i++)
+ Bmp[i] = Bxp[i] = 0;
+
+ for (n = 0; n < Arap->GetNval(); n++) {
+ Arap->GetNthValue(valp, n);
+
+ if ((i = dval->Find(valp)) >= 0)
+ Bmp[i / MAXBMP] |= 1 << (i % MAXBMP);
+
+ } // endfor n
+
+ for (i = Nbm - 1; i >= 0; i--)
+ if (Bmp[i]) {
+ for (btp = Bmp[i]; btp; btp >>= 1)
+ Bxp[i] |= btp;
+
+ for (N = i--; i >= 0; i--)
+ Bxp[i] = ~0;
+
+ break;
+ } // endif Bmp
+
+ } // end of BLKFILIN2 constructor
+
+/***********************************************************************/
+/* Evaluate block filter for a IN operator on a constant array. */
+/* Note: here we need to use the GetValPtrEx function to get a zero */
+/* ended string in case of string argument. This is because the ARRAY */
+/* can have a different width than the char column. */
+/***********************************************************************/
+int BLKFILIN2::BlockEval(PGLOBAL g)
+ {
+ if (N < 0)
+ return -2; // IN list contains no good values
+
+ int i, n = ((PTDBDOS)Colp->GetTo_Tdb())->GetCurBlk();
+ bool fnd = FALSE, all = TRUE, gt = TRUE;
+ ULONG bres;
+ PULONG bkmp = (PULONG)Colp->GetBmap()->GetValPtr(n * Nbm);
+
+ if (Bitmap) {
+ // For IN operator use the bitmap method
+ for (i = 0; i < Nbm; i++)
+ if (i <= N) {
+ if ((bres = Bmp[i] & bkmp[i]))
+ fnd = TRUE;
+
+ if (bres != bkmp[i])
+ all = FALSE;
+
+ if (Bxp[i] & bkmp[i])
+ gt = FALSE;
+
+ } else if (bkmp[i]) {
+ all = FALSE;
+ break;
+ } // endif's
+
+ if (!fnd) {
+ if (Sorted && gt)
+ Result = -2; // No more good block in file
+ else
+ Result = -1; // No good values in this block
+
+ } else if (all)
+ Result = 1; // All block values are good
+ else
+ Result = 0; // Block contains some good values
+
+ } else {
+ // For other than IN operators, revert to standard minmax method
+ int n = 0, ndv = Colp->GetNdv();
+ void *minp = NULL;
+ void *maxp = NULL;
+ ULONG btp;
+ PVBLK dval = Colp->GetDval();
+
+ for (i = 0; i < Nbm; i++)
+ for (btp = 1; btp && n < ndv; btp <<= 1, n++)
+ if (btp & bkmp[i]) {
+ if (!minp)
+ minp = dval->GetValPtrEx(n);
+
+ maxp = dval->GetValPtrEx(n);
+ } // endif btp
+
+ Result = Arap->BlockTest(g, Opc, Opm, minp, maxp, Colp->IsSorted());
+ } // endif Bitmap
+
+ return Result;
+ } // end of BlockEval
+#endif // 0
+
+/* ------------------------ Class BLKSPCIN --------------------------- */
+
+/***********************************************************************/
+/* BLKSPCIN constructor. */
+/***********************************************************************/
+BLKSPCIN::BLKSPCIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm,
+ PXOB *xp, int bsize)
+ : BLOCKFILTER(tdbp, op)
+ {
+ if (op == OP_IN) {
+ Opc = OP_EQ;
+ Opm = 1;
+ } else
+ Opm = opm;
+
+ Arap = (PARRAY)xp[1];
+#if defined(_DEBUG)
+ assert (Opm);
+ assert (Arap->GetResultType() == TYPE_INT);
+#endif
+ Bsize = bsize;
+ } // end of BLKSPCIN constructor
+
+/***********************************************************************/
+/* Reset: have the sorted array reset its Bot value to -1 (bottom). */
+/***********************************************************************/
+void BLKSPCIN::Reset(PGLOBAL g)
+ {
+ Arap->Reset();
+ } // end of Reset
+
+/***********************************************************************/
+/* Evaluate block filter for a IN operator on a constant array. */
+/***********************************************************************/
+int BLKSPCIN::BlockEval(PGLOBAL g)
+ {
+ int n = Tdbp->GetCurBlk();
+ int minrow = n * Bsize + 1; // Minimum Rowid value for this block
+ int maxrow = (n + 1) * Bsize; // Maximum Rowid value for this block
+
+ Result = Arap->BlockTest(g, Opc, Opm, &minrow, &maxrow, TRUE);
+ return Result;
+ } // end of BlockEval
+
+/* ------------------------------------------------------------------- */
+
+#if 0
+/***********************************************************************/
+/* Implementation of the BLOCKINDEX class. */
+/***********************************************************************/
+BLOCKINDEX::BLOCKINDEX(PBX nx, PDOSCOL cp, PKXBASE kp)
+ {
+ Next = nx;
+ Tdbp = (cp) ? (PTDBDOS)cp->GetTo_Tdb() : NULL;
+ Colp = cp;
+ Kxp = kp;
+ Type = (cp) ? cp->GetResultType() : TYPE_ERROR;
+ Sorted = (cp) ? cp->IsSorted() > 0 : FALSE;
+ Result = 0;
+ } // end of BLOCKINDEX constructor
+
+/***********************************************************************/
+/* Reset Bot and Top values of optimized Kindex blocks. */
+/***********************************************************************/
+void BLOCKINDEX::Reset(void)
+ {
+ if (Next)
+ Next->Reset();
+
+ Kxp->Reset();
+ } // end of Reset
+
+/***********************************************************************/
+/* Evaluate block indexing test. */
+/***********************************************************************/
+int BLOCKINDEX::BlockEval(PGLOBAL g)
+ {
+#if defined(_DEBUG)
+ assert (Tdbp && Colp);
+#endif
+ int n = Tdbp->GetCurBlk();
+ void *minp = Colp->GetMin()->GetValPtr(n);
+ void *maxp = Colp->GetMax()->GetValPtr(n);
+
+ Result = Kxp->BlockTest(g, minp, maxp, Type, Sorted);
+ return Result;
+ } // end of BlockEval
+
+/***********************************************************************/
+/* Make file output of BLOCKINDEX contents. */
+/***********************************************************************/
+void BLOCKINDEX::Print(PGLOBAL g, FILE *f, UINT n)
+ {
+ char m[64];
+
+ memset(m, ' ', n); // Make margin string
+ m[n] = '\0';
+
+ fprintf(f, "%sBLOCKINDEX: at %p next=%p col=%s kxp=%p result=%d\n",
+ m, this, Next, (Colp) ? Colp->GetName() : "Rowid", Kxp, Result);
+
+ if (Next)
+ Next->Print(g, f, n);
+
+ } // end of Print
+
+/***********************************************************************/
+/* Make string output of BLOCKINDEX contents. */
+/***********************************************************************/
+void BLOCKINDEX::Print(PGLOBAL g, char *ps, UINT z)
+ {
+ strncat(ps, "BlockIndex(es)", z);
+ } // end of Print
+
+/* ------------------------------------------------------------------- */
+
+/***********************************************************************/
+/* Implementation of the BLOCKINDX2 class. */
+/***********************************************************************/
+BLOCKINDX2::BLOCKINDX2(PBX nx, PDOSCOL cp, PKXBASE kp)
+ : BLOCKINDEX(nx, cp, kp)
+ {
+ Nbm = Colp->GetNbm();
+ Dval = Colp->GetDval();
+ Bmap = Colp->GetBmap();
+#if defined(_DEBUG)
+ assert(Dval && Bmap);
+#endif
+ } // end of BLOCKINDX2 constructor
+
+/***********************************************************************/
+/* Evaluate block indexing test. */
+/***********************************************************************/
+int BLOCKINDX2::BlockEval(PGLOBAL g)
+ {
+ int n = Tdbp->GetCurBlk();
+ PUINT bmp = (PUINT)Bmap->GetValPtr(n * Nbm);
+
+ Result = Kxp->BlockTst2(g, Dval, bmp, Nbm, Type, Sorted);
+ return Result;
+ } // end of BlockEval
+
+/* ------------------------------------------------------------------- */
+
+/***********************************************************************/
+/* Implementation of the BLKSPCINDX class. */
+/***********************************************************************/
+BLKSPCINDX::BLKSPCINDX(PBX nx, PTDBDOS tp, PKXBASE kp, int bsize)
+ : BLOCKINDEX(nx, NULL, kp)
+ {
+ Tdbp = tp;
+ Bsize = bsize;
+ Type = TYPE_INT;
+ Sorted = TRUE;
+ } // end of BLKSPCINDX constructor
+
+/***********************************************************************/
+/* Evaluate block indexing test. */
+/***********************************************************************/
+int BLKSPCINDX::BlockEval(PGLOBAL g)
+ {
+ int n = Tdbp->GetCurBlk();
+ int minrow = n * Bsize + 1; // Minimum Rowid value for this block
+ int maxrow = (n + 1) * Bsize; // Maximum Rowid value for this block
+
+ Result = Kxp->BlockTest(g, &minrow, &maxrow, TYPE_INT, TRUE);
+ return Result;
+ } // end of BlockEval
+#endif // 0
diff --git a/storage/connect/blkfil.h b/storage/connect/blkfil.h
new file mode 100644
index 00000000000..a648fc2a422
--- /dev/null
+++ b/storage/connect/blkfil.h
@@ -0,0 +1,295 @@
+/*************** BlkFil H Declares Source Code File (.H) ***************/
+/* Name: BLKFIL.H Version 2.1 */
+/* */
+/* (C) Copyright to the author Olivier BERTRAND 2004-2010 */
+/* */
+/* This file contains the block optimization related classes declares */
+/***********************************************************************/
+#ifndef __BLKFIL__
+#define __BLKFIL__
+
+typedef class BLOCKFILTER *PBF;
+typedef class BLOCKINDEX *PBX;
+
+/***********************************************************************/
+/* Definition of class BLOCKFILTER. */
+/***********************************************************************/
+class DllExport BLOCKFILTER : public BLOCK { /* Block Filter */
+ friend class BLKFILLOG;
+ public:
+ // Constructors
+ BLOCKFILTER(PTDBDOS tdbp, int op);
+
+ // Implementation
+ int GetResult(void) {return Result;}
+ bool Correlated(void) {return Correl;}
+
+ // Methods
+ virtual void Reset(PGLOBAL) = 0;
+ virtual int BlockEval(PGLOBAL) = 0;
+ virtual void Print(PGLOBAL g, FILE *f, UINT n);
+ virtual void Print(PGLOBAL g, char *ps, UINT z);
+
+ protected:
+ BLOCKFILTER(void) {} // Standard constructor not to be used
+
+ // Members
+ PTDBDOS Tdbp; // Owner TDB
+ bool Correl; // TRUE for correlated subqueries
+ int Opc; // Comparison operator
+ int Opm; // Operator modificator
+ int Result; // Result from evaluation
+ }; // end of class BLOCKFILTER
+
+/***********************************************************************/
+/* Definition of class BLKFILLOG (with Op=OP_AND,OP_OR, or OP_NOT) */
+/***********************************************************************/
+class DllExport BLKFILLOG : public BLOCKFILTER { /* Logical Op Block Filter */
+ public:
+ // Constructors
+ BLKFILLOG(PTDBDOS tdbp, int op, PBF *bfp, int n);
+
+ // Methods
+ virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+
+ protected:
+ BLKFILLOG(void) {} // Standard constructor not to be used
+
+ // Members
+ PBF *Fil; // Points to Block filter args
+ int N;
+ }; // end of class BLKFILLOG
+
+/***********************************************************************/
+/* Definition of class BLKFILARI (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
+/***********************************************************************/
+class DllExport BLKFILARI : public BLOCKFILTER { /* Arithm. Op Block Filter */
+ public:
+ // Constructors
+ BLKFILARI(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
+
+ // Methods
+ virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+ virtual void MakeValueBitmap(void) {}
+
+ protected:
+ BLKFILARI(void) {} // Standard constructor not to be used
+
+ // Members
+ PDOSCOL Colp; // Points to column argument
+ PCOL Cpx; // Point to subquery "constant" column
+ PVAL Valp; // Points to constant argument Value
+ bool Sorted; // True if the column is sorted
+ }; // end of class BLKFILARI
+
+/***********************************************************************/
+/* Definition of class BLKFILAR2 (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
+/***********************************************************************/
+class DllExport BLKFILAR2 : public BLKFILARI { /* Arithm. Op Block Filter */
+ public:
+ // Constructors
+ BLKFILAR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
+
+ // Methods
+ virtual int BlockEval(PGLOBAL g);
+ virtual void MakeValueBitmap(void);
+
+ protected:
+ BLKFILAR2(void) {} // Standard constructor not to be used
+
+ // Members
+ ULONG Bmp; // The value bitmap used to test blocks
+ ULONG Bxp; // Bitmap used when Opc = OP_EQ
+ }; // end of class BLKFILAR2
+
+/***********************************************************************/
+/* Definition of class BLKFILAR2 (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
+/* To be used when the bitmap is an array of ULONG bitmaps; */
+/***********************************************************************/
+class DllExport BLKFILMR2 : public BLKFILARI { /* Arithm. Op Block Filter */
+ public:
+ // Constructors
+ BLKFILMR2(PGLOBAL g, PTDBDOS tdbp, int op, PXOB *xp);
+
+ // Methods
+ virtual int BlockEval(PGLOBAL g);
+ virtual void MakeValueBitmap(void);
+
+ protected:
+ BLKFILMR2(void) {} // Standard constructor not to be used
+
+ // Members
+ int Nbm; // The number of ULONG bitmaps
+ int N; // The position of the leftmost ULONG
+ bool Void; // True if all file blocks can be skipped
+ PULONG Bmp; // The values bitmaps used to test blocks
+ PULONG Bxp; // Bit of values <= max value
+ }; // end of class BLKFILMR2
+
+/***********************************************************************/
+/* Definition of class BLKSPCARI (with Op=OP_EQ,NE,GT,GE,LT, or LE) */
+/***********************************************************************/
+class DllExport BLKSPCARI : public BLOCKFILTER { /* Arithm. Op Block Filter */
+ public:
+ // Constructors
+ BLKSPCARI(PTDBDOS tdbp, int op, PXOB *xp, int bsize);
+
+ // Methods
+ virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+
+ protected:
+ BLKSPCARI(void) {} // Standard constructor not to be used
+
+ // Members
+ PCOL Cpx; // Point to subquery "constant" column
+ PVAL Valp; // Points to constant argument Value
+ int Val; // Constant argument Value
+ int Bsize; // Table block size
+ }; // end of class BLKSPCARI
+
+/***********************************************************************/
+/* Definition of class BLKFILIN (with Op=OP_IN) */
+/***********************************************************************/
+class DllExport BLKFILIN : public BLOCKFILTER { // With array arguments.
+ public:
+ // Constructors
+ BLKFILIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp);
+
+ // Methods
+ virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+ virtual void MakeValueBitmap(void) {}
+
+ protected:
+ // Member
+ PDOSCOL Colp; // Points to column argument
+ PARRAY Arap; // Points to array argument
+ bool Sorted; // True if the column is sorted
+ int Type; // Type of array elements
+ }; // end of class BLKFILIN
+
+/***********************************************************************/
+/* Definition of class BLKFILIN2 (with Op=OP_IN) */
+/***********************************************************************/
+class DllExport BLKFILIN2 : public BLKFILIN { // With array arguments.
+ public:
+ // Constructors
+ BLKFILIN2(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp);
+
+ // Methods
+//virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+ virtual void MakeValueBitmap(void);
+
+ protected:
+ // Member
+ int Nbm; // The number of ULONG bitmaps
+ int N; // The position of the leftmost ULONG
+//bool Bitmap; // True for IN operator (temporary)
+ bool Void; // True if all file blocks can be skipped
+ bool Invert; // True when Result must be inverted
+ PULONG Bmp; // The values bitmaps used to test blocks
+ PULONG Bxp; // Bit of values <= max value
+ PVAL Valp; // Used while building the bitmaps
+ }; // end of class BLKFILIN2
+
+/***********************************************************************/
+/* Definition of class BLKSPCIN (with Op=OP_IN) Special column */
+/***********************************************************************/
+class DllExport BLKSPCIN : public BLOCKFILTER { // With array arguments.
+ public:
+ // Constructors
+ BLKSPCIN(PGLOBAL g, PTDBDOS tdbp, int op, int opm, PXOB *xp, int bsize);
+
+ // Methods
+ virtual void Reset(PGLOBAL g);
+ virtual int BlockEval(PGLOBAL g);
+
+ protected:
+ // Member
+ PARRAY Arap; // Points to array argument
+ int Bsize; // Table block size
+ }; // end of class BLKSPCIN
+
+// ---------------- Class used in block indexing testing ----------------
+
+#if 0
+/***********************************************************************/
+/* Definition of class BLOCKINDEX. */
+/* Used to test the indexing to joined tables when the foreign key is */
+/* a clustered or sorted column. If the table is joined to several */
+/* tables, blocks will be chained together. */
+/***********************************************************************/
+class DllExport BLOCKINDEX : public BLOCK { /* Indexing Test Block */
+ public:
+ // Constructors
+ BLOCKINDEX(PBX nx, PDOSCOL cp, PKXBASE kp);
+
+ // Implementation
+ PBX GetNext(void) {return Next;}
+
+ // Methods
+ void Reset(void);
+ virtual int BlockEval(PGLOBAL);
+ virtual void Print(PGLOBAL g, FILE *f, UINT n);
+ virtual void Print(PGLOBAL g, char *ps, UINT z);
+
+ protected:
+ BLOCKINDEX(void) {} // Standard constructor not to be used
+
+ // Members
+ PBX Next; // To next Index Block
+ PTDBDOS Tdbp; // To table description block
+ PDOSCOL Colp; // Clustered foreign key
+ PKXBASE Kxp; // To Kindex of joined table
+ bool Sorted; // TRUE if column is sorted
+ int Type; // Col/Index type
+ int Result; // Result from evaluation
+ }; // end of class BLOCKINDEX
+
+/***********************************************************************/
+/* Definition of class BLOCKINDX2. (XDB2) */
+/***********************************************************************/
+class DllExport BLOCKINDX2 : public BLOCKINDEX { /* Indexing Test Block */
+ public:
+ // Constructors
+ BLOCKINDX2(PBX nx, PDOSCOL cp, PKXBASE kp);
+
+ // Methods
+ virtual int BlockEval(PGLOBAL);
+
+ protected:
+ BLOCKINDX2(void) {} // Standard constructor not to be used
+
+ // Members
+ int Nbm; // The number of ULONG bitmaps
+ PVBLK Dval; // Array of column distinct values
+ PVBLK Bmap; // Array of block bitmap values
+ }; // end of class BLOCKINDX2
+
+/***********************************************************************/
+/* Definition of class BLKSPCINDX. */
+/* Used to test the indexing to joined tables when the foreign key is */
+/* the ROWID special column. If the table is joined to several */
+/* tables, blocks will be chained together. */
+/***********************************************************************/
+class DllExport BLKSPCINDX : public BLOCKINDEX { /* Indexing Test Block */
+ public:
+ // Constructors
+ BLKSPCINDX(PBX nx, PTDBDOS tp, PKXBASE kp, int bsize);
+
+ // Methods
+ virtual int BlockEval(PGLOBAL);
+
+ protected:
+ BLKSPCINDX(void) {} // Standard constructor not to be used
+
+ // Members
+ int Bsize; // Table block size
+ }; // end of class BLOCKINDEX
+#endif // 0
+
+#endif // __BLKFIL__
diff --git a/storage/connect/filter.cpp b/storage/connect/filter.cpp
new file mode 100644
index 00000000000..c2747d00948
--- /dev/null
+++ b/storage/connect/filter.cpp
@@ -0,0 +1,1733 @@
+/***************** Filter C++ Class Filter Code (.CPP) *****************/
+/* Name: FILTER.CPP Version 3.9 */
+/* */
+/* (C) Copyright to the author Olivier BERTRAND 1998-2014 */
+/* */
+/* This file contains the class FILTER function code. */
+/***********************************************************************/
+
+/***********************************************************************/
+/* Include relevant MariaDB header file. */
+/***********************************************************************/
+#include "my_global.h"
+#include "sql_class.h"
+//#include "sql_time.h"
+
+#if defined(WIN32)
+//#include <windows.h>
+#else // !WIN32
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#endif // !WIN32
+
+
+/***********************************************************************/
+/* Include required application header files */
+/* global.h is header containing all global Plug declarations. */
+/* plgdbsem.h is header containing the DB applic. declarations. */
+/* xobject.h is header containing the XOBJECT derived classes dcls. */
+/***********************************************************************/
+#include "global.h"
+#include "plgdbsem.h"
+#include "tabcol.h"
+#include "xtable.h"
+#include "array.h"
+//#include "subquery.h"
+#include "filter.h"
+//#include "token.h"
+//#include "select.h"
+#include "xindex.h"
+
+/***********************************************************************/
+/* Static variables. */
+/***********************************************************************/
+extern "C" int trace;
+
+/***********************************************************************/
+/* Utility routines. */
+/***********************************************************************/
+void PlugConvertConstant(PGLOBAL, PVOID&, SHORT&);
+PVOID PlugCopyDB(PTABS, PVOID, INT);
+void NewPointer(PTABS, PVOID, PVOID);
+void AddPointer(PTABS, PVOID);
+
+PPARM MakeParm(PGLOBAL g, PXOB xp)
+ {
+ PPARM pp = (PPARM)PlugSubAlloc(g, NULL, sizeof(PARM));
+ pp->Type = TYPE_XOBJECT;
+ pp->Value = xp;
+ pp->Domain = 0;
+ pp->Next = NULL;
+ return pp;
+ } // end of MakeParm
+
+/***********************************************************************/
+/* Routines called externally by FILTER function. */
+/***********************************************************************/
+bool PlugEvalLike(PGLOBAL, LPCSTR, LPCSTR, bool);
+//bool ReadSubQuery(PGLOBAL, PSUBQ);
+//PSUBQ OpenSubQuery(PGLOBAL, PSQL);
+void PlugCloseDB(PGLOBAL, PSQL);
+BYTE OpBmp(PGLOBAL g, OPVAL opc);
+PARRAY MakeValueArray(PGLOBAL g, PPARM pp);
+
+/***********************************************************************/
+/* Routines called externally by CondFilter. */
+/***********************************************************************/
+PFIL MakeFilter(PGLOBAL g, PFIL fp1, OPVAL vop, PFIL fp2)
+ {
+ PFIL filp = new(g) FILTER(g, vop);
+
+ filp->Arg(0) = fp1;
+ filp->Arg(1) = fp2;
+
+ if (filp->Convert(g, false))
+ return NULL;
+
+ return filp;
+ } // end of MakeFilter
+
+PFIL MakeFilter(PGLOBAL g, PCOL *colp, POPER pop, PPARM pfirst, bool neg)
+{
+ PPARM parmp, pp[2];
+ PFIL fp1, fp2, filp = NULL;
+
+ if (pop->Val == OP_IN) {
+ PARRAY par = MakeValueArray(g, pfirst);
+
+ if (par) {
+ pp[0] = MakeParm(g, colp[0]);
+ pp[1] = MakeParm(g, par);
+ fp1 = new(g) FILTER(g, pop, pp);
+
+ if (fp1->Convert(g, false))
+ return NULL;
+
+ filp = (neg) ? MakeFilter(g, fp1, OP_NOT, NULL) : fp1;
+ } // endif par
+
+ } else if (pop->Val == OP_XX) { // BETWEEN
+ if (pfirst && pfirst->Next) {
+ pp[0] = MakeParm(g, colp[0]);
+ pp[1] = pfirst;
+ fp1 = new(g) FILTER(g, neg ? OP_LT : OP_GE, pp);
+
+ if (fp1->Convert(g, false))
+ return NULL;
+
+ pp[1] = pfirst->Next;
+ fp2 = new(g) FILTER(g, neg ? OP_GT : OP_LE, pp);
+
+ if (fp2->Convert(g, false))
+ return NULL;
+
+ filp = MakeFilter(g, fp1, neg ? OP_OR : OP_AND, fp2);
+ } // endif parmp
+
+ } else {
+ parmp = pfirst;
+
+ for (int i = 0; i < 2; i++)
+ if (colp[i]) {
+ pp[i] = MakeParm(g, colp[i]);
+ } else {
+ if (!parmp || parmp->Domain != i)
+ return NULL; // Logical error, should never happen
+
+ pp[i] = parmp;
+ parmp = parmp->Next;
+ } // endif colp
+
+ filp = new(g) FILTER(g, pop, pp);
+
+ if (filp->Convert(g, false))
+ return NULL;
+
+ } // endif's Val
+
+ return filp;
+} // end of MakeFilter
+
+/* --------------------------- Class FILTER -------------------------- */
+
+/***********************************************************************/
+/* FILTER public constructors. */
+/***********************************************************************/
+FILTER::FILTER(PGLOBAL g, POPER pop, PPARM *tp)
+ {
+ Constr(g, pop->Val, pop->Mod, tp);
+ } // end of FILTER constructor
+
+FILTER::FILTER(PGLOBAL g, OPVAL opc, PPARM *tp)
+ {
+ Constr(g, opc, 0, tp);
+ } // end of FILTER constructor
+
+void FILTER::Constr(PGLOBAL g, OPVAL opc, int opm, PPARM *tp)
+ {
+ Next = NULL;
+ Opc = opc;
+ Opm = opm;
+ Bt = 0x00;
+
+ for (int i = 0; i < 2; i++) {
+ Test[i].B_T = TYPE_VOID;
+
+ if (tp && tp[i]) {
+ PlugConvertConstant(g, tp[i]->Value, tp[i]->Type);
+#if defined(_DEBUG)
+ assert(tp[i]->Type == TYPE_XOBJECT);
+#endif
+ Arg(i) = (PXOB)tp[i]->Value;
+ } else
+ Arg(i) = pXVOID;
+
+ Val(i) = NULL;
+ Test[i].Conv = FALSE;
+ } // endfor i
+
+ } // end of Constr
+
+/***********************************************************************/
+/* FILTER copy constructor. */
+/***********************************************************************/
+FILTER::FILTER(PFIL fil1)
+ {
+ Next = NULL;
+ Opc = fil1->Opc;
+ Opm = fil1->Opm;
+ Test[0] = fil1->Test[0];
+ Test[1] = fil1->Test[1];
+ } // end of FILTER copy constructor
+
+/***********************************************************************/
+/* Linearize: Does the linearization of the filter tree: */
+/* Independent filters (not implied in OR/NOT) will be separated */
+/* from others and filtering operations will be automated by */
+/* making a list of filter operations in polish operation style. */
+/* Returned value points to the first filter of the list, which ends */
+/* with the filter that was pointed by the first call argument, */
+/* except for separators, in which case a loop is needed to find it. */
+/* Note: a loop is used now in all cases (was not for OP_NOT) to be */
+/* able to handle the case of filters whose arguments are already */
+/* linearized, as it is done in LNA semantic routines. Indeed for */
+/* already linearized chains, the first filter is never an OP_AND, */
+/* OP_OR or OP_NOT filter, so this function just returns 'this'. */
+/***********************************************************************/
+PFIL FILTER::Linearize(bool nosep)
+ {
+ int i;
+ PFIL lfp[2], ffp[2] = {NULL,NULL};
+
+ switch (Opc) {
+ case OP_NOT:
+ if (GetArgType(0) == TYPE_FILTER) {
+ lfp[0] = (PFIL)Arg(0);
+ ffp[0] = lfp[0]->Linearize(TRUE);
+ } /* endif */
+
+ if (!ffp[0])
+ return NULL;
+
+ while (lfp[0]->Next) // See Note above
+ lfp[0] = lfp[0]->Next;
+
+ Arg(0) = lfp[0];
+ lfp[0]->Next = this;
+ break;
+ case OP_OR:
+ nosep = TRUE;
+ case OP_AND:
+ for (i = 0; i < 2; i++) {
+ if (GetArgType(i) == TYPE_FILTER) {
+ lfp[i] = (PFIL)Arg(i);
+ ffp[i] = lfp[i]->Linearize(nosep);
+ } /* endif */
+
+ if (!ffp[i])
+ return NULL;
+
+ while (lfp[i]->Next)
+ lfp[i] = lfp[i]->Next;
+
+ Arg(i) = lfp[i];
+ } /* endfor i */
+
+ if (nosep) {
+ lfp[0]->Next = ffp[1];
+ lfp[1]->Next = this;
+ } else {
+ lfp[0]->Next = this;
+ Opc = OP_SEP;
+ Arg(1) = pXVOID;
+ Next = ffp[1];
+ } /* endif */
+
+ break;
+ default:
+ ffp[0] = this;
+ } /* endswitch */
+
+ return (ffp[0]);
+ } // end of Linearize
+
+/***********************************************************************/
+/* Link the fil2 filter chain to the fil1(this) filter chain. */
+/***********************************************************************/
+PFIL FILTER::Link(PGLOBAL g, PFIL fil2)
+ {
+ PFIL fil1;
+
+ if (trace)
+ htrc("Linking filter %p with op=%d... to filter %p with op=%d\n",
+ this, Opc, fil2, (fil2) ? fil2->Opc : 0);
+
+ for (fil1 = this; fil1->Next; fil1 = fil1->Next) ;
+
+ if (fil1->Opc == OP_SEP)
+ fil1->Next = fil2; // Separator already exists
+ else {
+ // Create a filter separator and insert it between the chains
+ PFIL filp = new(g) FILTER(g, OP_SEP);
+
+ filp->Arg(0) = fil1;
+ filp->Next = fil2;
+ fil1->Next = filp;
+ } // endelse
+
+ return (this);
+ } // end of Link
+
+/***********************************************************************/
+/* Remove eventual last separator from a filter chain. */
+/***********************************************************************/
+PFIL FILTER::RemoveLastSep(void)
+ {
+ PFIL filp, gfp = NULL;
+
+ // Find last filter block (filp) and previous one (gfp).
+ for (filp = this; filp->Next; filp = filp->Next)
+ gfp = filp;
+
+ // If last filter is a separator, remove it
+ if (filp->Opc == OP_SEP)
+ if (gfp)
+ gfp->Next = NULL;
+ else
+ return NULL; // chain is now empty
+
+ return this;
+ } // end of RemoveLastSep
+
+/***********************************************************************/
+/* CheckColumn: Checks references to Columns in the filter and change */
+/* them into references to Col Blocks. */
+/* Returns the number of column references or -1 in case of column */
+/* not found and -2 in case of unrecoverable error. */
+/* WHERE filters are called with *aggreg == AGG_NO. */
+/* HAVING filters are called with *aggreg == AGG_ANY. */
+/***********************************************************************/
+int FILTER::CheckColumn(PGLOBAL g, PSQL sqlp, PXOB &p, int &ag)
+ {
+ char errmsg[MAX_STR] = "";
+ int agg, k, n = 0;
+
+ if (trace)
+ htrc("FILTER CheckColumn: sqlp=%p ag=%d\n", sqlp, ag);
+
+ switch (Opc) {
+ case OP_SEP:
+ case OP_AND:
+ case OP_OR:
+ case OP_NOT:
+ return 0; // This because we are called for a linearized filter
+ default:
+ break;
+ } // endswitch Opc
+
+ // Check all arguments even in case of error for when we are called
+ // from CheckHaving, where references to an alias raise an error but
+ // we must have all other arguments to be set.
+ for (int i = 0; i < 2; i++) {
+ if (GetArgType(i) == TYPE_FILTER) // Should never happen in
+ return 0; // current implementation
+
+ agg = ag;
+
+ if ((k = Arg(i)->CheckColumn(g, sqlp, Arg(i), agg)) < -1) {
+ return k;
+ } else if (k < 0) {
+ if (!*errmsg) // Keep first error message
+ strcpy(errmsg, g->Message);
+
+ } else
+ n += k;
+
+ } // endfor i
+
+ if (*errmsg) {
+ strcpy(g->Message, errmsg);
+ return -1;
+ } else
+ return n;
+
+ } // end of CheckColumn
+
+/***********************************************************************/
+/* RefNum: Find the number of references correlated sub-queries make */
+/* to the columns of the outer query (pointed by sqlp). */
+/***********************************************************************/
+int FILTER::RefNum(PSQL sqlp)
+ {
+ int n = 0;
+
+ for (int i = 0; i < 2; i++)
+ n += Arg(i)->RefNum(sqlp);
+
+ return n;
+ } // end of RefNum
+
+#if 0
+/***********************************************************************/
+/* CheckSubQuery: see SUBQUERY::CheckSubQuery for comment. */
+/***********************************************************************/
+PXOB FILTER::CheckSubQuery(PGLOBAL g, PSQL sqlp)
+ {
+ switch (Opc) {
+ case OP_SEP:
+ case OP_AND:
+ case OP_OR:
+ case OP_NOT:
+ break;
+ default:
+ for (int i = 0; i < 2; i++)
+ if (!(Arg(i) = (PXOB)Arg(i)->CheckSubQuery(g, sqlp)))
+ return NULL;
+
+ break;
+ } // endswitch Opc
+
+ return this;
+ } // end of CheckSubQuery
+
+/***********************************************************************/
+/* SortJoin: function that places ahead of the list the 'good' groups */
+/* for join filtering. These are groups with only one filter that */
+/* specify equality between two different table columns, at least */
+/* one is a table key column. Doing so the join filter will be in */
+/* general compatible with linearization of the joined table tree. */
+/* This function has been added a further sorting on column indexing. */
+/***********************************************************************/
+PFIL FILTER::SortJoin(PGLOBAL g)
+ {
+ int k;
+ PCOL cp1, cp2;
+ PTDBASE tp1, tp2;
+ PFIL fp, filp, gfp, filstart = this, filjoin = NULL, lfp = NULL;
+ bool join = TRUE, key = TRUE;
+
+ // This routine requires that the chain ends with a separator
+ // So check for it and eventually add one if necessary
+ for (filp = this; filp->Next; filp = filp->Next) ;
+
+ if (filp->Opc != OP_SEP)
+ filp->Next = new(g) FILTER(g, OP_SEP);
+
+ again:
+ for (k = (key) ? 0 : MAX_MULT_KEY; k <= MAX_MULT_KEY; k++)
+ for (gfp = NULL, fp = filp = filstart; filp; filp = filp->Next)
+ switch (filp->Opc) {
+ case OP_SEP:
+ if (join) {
+ // Put this filter group into the join filter group list.
+ if (!lfp)
+ filjoin = fp;
+ else
+ lfp->Next = fp;
+
+ if (!gfp)
+ filstart = filp->Next;
+ else
+ gfp->Next = filp->Next;
+
+ lfp = filp; // last block of join filter list
+ } else
+ gfp = filp; // last block of bad filter list
+
+ join = TRUE;
+ fp = filp->Next;
+ break;
+ case OP_LOJ:
+ case OP_ROJ:
+ case OP_DTJ:
+ join &= TRUE;
+ break;
+ case OP_EQ:
+ if (join && k > 0 // So specific join operators come first
+ && filp->GetArgType(0) == TYPE_COLBLK
+ && filp->GetArgType(1) == TYPE_COLBLK) {
+ cp1 = (PCOL)filp->Arg(0);
+ cp2 = (PCOL)filp->Arg(1);
+ tp1 = (PTDBASE)cp1->GetTo_Tdb();
+ tp2 = (PTDBASE)cp2->GetTo_Tdb();
+
+ if (tp1->GetTdb_No() != tp2->GetTdb_No()) {
+ if (key)
+ join &= (cp1->GetKey() == k || cp2->GetKey() == k);
+ else
+ join &= (tp1->GetColIndex(cp1) || tp2->GetColIndex(cp2));
+
+ } else
+ join = FALSE;
+
+ } else
+ join = FALSE;
+
+ break;
+ default:
+ join = FALSE;
+ } // endswitch filp->Opc
+
+ if (key) {
+ key = FALSE;
+ goto again;
+ } // endif key
+
+ if (filjoin) {
+ lfp->Next = filstart;
+ filstart = filjoin;
+ } // endif filjoin
+
+ // Removing last separator is perhaps unuseful, but it was so
+ return filstart->RemoveLastSep();
+ } // end of SortJoin
+
+/***********************************************************************/
+/* Check that this filter is a good join filter. */
+/* If so the opj block will be set accordingly. */
+/* opj points to the join block, fprec to the filter block to which */
+/* the rest of the chain must be linked in case of success. */
+/* teq, tek and tk2 indicates the severity of the tests: */
+/* tk2 == TRUE means both columns must be primary keys. */
+/* tc2 == TRUE means both args must be columns (not expression). */
+/* tek == TRUE means at least one column must be a primary key. */
+/* teq == TRUE means the filter operator must be OP_EQ. */
+/* tix == TRUE means at least one column must be a simple index key. */
+/* thx == TRUE means at least one column must be a leading index key. */
+/***********************************************************************/
+bool FILTER::FindJoinFilter(POPJOIN opj, PFIL fprec, bool teq, bool tek,
+ bool tk2, bool tc2, bool tix, bool thx)
+ {
+ if (trace)
+ htrc("FindJoinFilter: opj=%p fprec=%p tests=(%d,%d,%d,%d)\n",
+ opj, fprec, teq, tek, tk2, tc2);
+
+ // Firstly check that this filter is an independent filter
+ // meaning that it is the only one in its own group.
+ if (Next && Next->Opc != OP_SEP)
+ return (Opc < 0);
+
+ // Keep only equi-joins and specific joins (Outer and Distinct)
+ // Normally specific join operators comme first because they have
+ // been placed first by SortJoin.
+ if (teq && Opc > OP_EQ)
+ return FALSE;
+
+ // We have a candidate for join filter, now check that it
+ // fulfil the requirement about its operands, to point to
+ // columns of respectively the two TDB's of that join.
+ int col1 = 0, col2 = 0;
+ bool key = tk2;
+ bool idx = FALSE, ihx = FALSE;
+ PIXDEF pdx;
+
+ for (int i = 0; i < 2; i++)
+ if (GetArgType(i) == TYPE_COLBLK) {
+ PCOL colp = (PCOL)Arg(i);
+
+ if (tk2)
+ key &= (colp->IsKey());
+ else
+ key |= (colp->IsKey());
+
+ pdx = ((PTDBASE)colp->GetTo_Tdb())->GetColIndex(colp);
+ idx |= (pdx && pdx->GetNparts() == 1);
+ ihx |= (pdx != NULL);
+
+ if (colp->VerifyColumn(opj->GetTbx1()))
+ col1 = i + 1;
+ else if (colp->VerifyColumn(opj->GetTbx2()))
+ col2 = i + 1;
+
+ } else if (!tc2 && GetArgType(i) != TYPE_CONST) {
+ PXOB xp = Arg(i);
+
+ if (xp->VerifyColumn(opj->GetTbx1()))
+ col1 = i + 1;
+ else if (xp->VerifyColumn(opj->GetTbx2()))
+ col2 = i + 1;
+
+ } else
+ return (Opc < 0);
+
+ if (col1 == 0 || col2 == 0)
+ return (Opc < 0);
+
+ if (((tek && !key) || (tix && !idx) || (thx && !ihx)) && Opc != OP_DTJ)
+ return FALSE;
+
+ // This is the join filter, set the join block.
+ if (col1 == 1) {
+ opj->SetCol1(Arg(0));
+ opj->SetCol2(Arg(1));
+ } else {
+ opj->SetCol1(Arg(1));
+ opj->SetCol2(Arg(0));
+
+ switch (Opc) {
+// case OP_GT: Opc = OP_LT; break;
+// case OP_LT: Opc = OP_GT; break;
+// case OP_GE: Opc = OP_LE; break;
+// case OP_LE: Opc = OP_GE; break;
+ case OP_LOJ:
+ case OP_ROJ:
+ case OP_DTJ:
+ // For expended join operators, the filter must indicate
+ // the way the join should be done, and not the order of
+ // appearance of tables in the table list (which is kept
+ // because tables are sorted in AddTdb). Therefore the
+ // join is inversed, not the filter.
+ opj->InverseJoin();
+ default: break;
+ } // endswitch Opc
+
+ } // endif col1
+
+ if (Opc < 0) {
+ // For join operators, special processing is needed
+ int knum = 0;
+ PFIL fp;
+
+ switch (Opc) {
+ case OP_LOJ:
+ opj->SetJtype(JT_LEFT);
+ knum = opj->GetCol2()->GetKey();
+ break;
+ case OP_ROJ:
+ opj->SetJtype(JT_RIGHT);
+ knum = opj->GetCol1()->GetKey();
+ break;
+ case OP_DTJ:
+ for (knum = 1, fp = this->Next; fp; fp = fp->Next)
+ if (fp->Opc == OP_DTJ)
+ knum++;
+ else if (fp->Opc != OP_SEP)
+ break;
+
+ opj->SetJtype(JT_DISTINCT);
+ opj->GetCol2()->SetKey(knum);
+ break;
+ default:
+ break;
+ } // endswitch Opc
+
+ if (knum > 1) {
+ // Lets take care of a multiple key join
+ // We do a minimum of checking here as it will done later
+ int k = 1;
+ OPVAL op;
+ BYTE tmp[sizeof(Test[0])];
+
+ for (fp = this->Next; k < knum && fp; fp = fp->Next) {
+ switch (op = fp->Opc) {
+ case OP_SEP:
+ continue;
+ case OP_LOJ:
+ if (Opc == OP_ROJ) {
+ op = Opc;
+ memcpy(tmp, &fp->Test[0], sizeof(Test[0]));
+ fp->Test[0] = fp->Test[1];
+ memcpy(&fp->Test[1], tmp, sizeof(Test[0]));
+ } // endif Opc
+
+ k++;
+ break;
+ case OP_ROJ:
+ if (Opc == OP_LOJ) {
+ op = Opc;
+ memcpy(tmp, &fp->Test[0], sizeof(Test[0]));
+ fp->Test[0] = fp->Test[1];
+ memcpy(&fp->Test[1], tmp, sizeof(Test[0]));
+ } // endif Opc
+
+ k++;
+ break;
+ case OP_DTJ:
+ if (op == Opc && fp->GetArgType(1) == TYPE_COLBLK)
+ ((PCOL)fp->Arg(1))->SetKey(knum);
+
+ k++;
+ break;
+ default:
+ break;
+ } // endswitch op
+
+ if (op != Opc)
+ return TRUE;
+
+ fp->Opc = OP_EQ;
+ } // endfor fp
+
+ } // endif k
+
+ Opc = OP_EQ;
+ } // endif Opc
+
+ // Set the join filter operator
+ opj->SetOpc(Opc);
+
+ // Now mark the columns involved in the join filter because
+ // this information will be used by the linearize program.
+ // Note: this should be replaced in the future by something
+ // enabling to mark tables as Parent or Child.
+ opj->GetCol1()->MarkCol(U_J_EXT);
+ opj->GetCol2()->MarkCol(U_J_EXT);
+
+ // Remove the filter from the filter chain. If the filter is
+ // not last in the chain, also remove the SEP filter after it.
+ if (Next) // Next->Opc == OP_SEP
+ Next = Next->Next;
+
+ if (!fprec)
+ opj->SetFilter(Next);
+ else
+ fprec->Next = Next;
+
+ return FALSE;
+ } // end of FindJoinFilter
+
+/***********************************************************************/
+/* CheckHaving: check and process a filter of an HAVING clause. */
+/* Check references to Columns and Functions in the filter. */
+/* All these references can correspond to items existing in the */
+/* SELECT list, else if it is a function, allocate a SELECT block */
+/* to be added to the To_Sel list (non projected blocks). */
+/***********************************************************************/
+bool FILTER::CheckHaving(PGLOBAL g, PSQL sqlp)
+ {
+ int agg = AGG_ANY;
+ PXOB xp;
+
+//sqlp->SetOk(TRUE); // Ok to look into outer queries for filters
+
+ switch (Opc) {
+ case OP_SEP:
+ case OP_AND:
+ case OP_OR:
+ case OP_NOT:
+ return FALSE;
+ default:
+ if (CheckColumn(g, sqlp, xp, agg) < -1)
+ return TRUE; // Unrecovable error
+
+ break;
+ } // endswitch Opc
+
+ sqlp->SetOk(TRUE); // Ok to look into outer queries for filters
+
+ for (int i = 0; i < 2; i++)
+ if (!(xp = Arg(i)->SetSelect(g, sqlp, TRUE)))
+ return TRUE;
+ else if (xp != Arg(i)) {
+ Arg(i) = xp;
+ Val(i) = Arg(i)->GetValue();
+ } // endif
+
+ sqlp->SetOk(FALSE);
+ return FALSE;
+ } // end of CheckHaving
+#endif // 0
+
+/***********************************************************************/
+/* Used while building a table index. This function split the filter */
+/* attached to the tdbp table into the local and not local part. */
+/* The local filter is used to restrict the size of the index and the */
+/* not local part remains to be executed later. This has been added */
+/* recently and not only to improve the performance but chiefly to */
+/* avoid loosing rows when processing distinct joins. */
+/* Returns: */
+/* 0: the whole filter is local (both arguments are) */
+/* 1: the whole filter is not local */
+/* 2: the filter was split in local (attached to fp[0]) and */
+/* not local (attached to fp[1]). */
+/***********************************************************************/
+int FILTER::SplitFilter(PFIL *fp)
+ {
+ int i, rc[2];
+
+ if (Opc == OP_AND) {
+ for (i = 0; i < 2; i++)
+ rc[i] = ((PFIL)Arg(i))->SplitFilter(fp);
+
+ // Filter first argument should never be split because of the
+ // algorithm used to de-linearize the filter.
+ assert(rc[0] != 2);
+
+ if (rc[0] != rc[1]) {
+ // Splitting to be done
+ if (rc[1] == 2) {
+ // 2nd argument already split, add 1st to the proper filter
+ assert(fp[*rc]);
+ Arg(1) = fp[*rc];
+ Val(1) = fp[*rc]->GetValue();
+ fp[*rc] = this;
+ } else for (i = 0; i < 2; i++) {
+ // Split the filter arguments
+ assert(!fp[rc[i]]);
+ fp[rc[i]] = (PFIL)Arg(i);
+ } // endfor i
+
+ *rc = 2;
+ } // endif rc
+
+ } else
+ *rc = (CheckLocal(NULL)) ? 0 : 1;
+
+ return *rc;
+ } // end of SplitFilter
+
+/***********************************************************************/
+/* This function is called when making a Kindex after the filter was */
+/* split in local and nolocal part in the case of many to many joins. */
+/* Indeed the whole filter must be reconstructed to take care of next */
+/* same values when doing the explosive join. In addition, the link */
+/* must be done respecting the way filters are de-linearized, no AND */
+/* filter in the first argument of an AND filter, because this is */
+/* expected to be true if SplitFilter is used again on this filter. */
+/***********************************************************************/
+PFIL FILTER::LinkFilter(PGLOBAL g, PFIL fp2)
+ {
+ PFIL fp1, filp, filand = NULL;
+
+ assert(fp2); // Test must be made by caller
+
+ // Find where the new AND filter must be attached
+ for (fp1 = this; fp1->Opc == OP_AND; fp1 = (PFIL)fp1->Arg(1))
+ filand = fp1;
+
+ filp = new(g) FILTER(g, OP_AND);
+ filp->Arg(0) = fp1;
+ filp->Val(0) = fp1->GetValue();
+ filp->Test[0].B_T = TYPE_INT;
+ filp->Test[0].Conv = FALSE;
+ filp->Arg(1) = fp2;
+ filp->Val(1) = fp2->GetValue();
+ filp->Test[1].B_T = TYPE_INT;
+ filp->Test[1].Conv = FALSE;
+ filp->Value = AllocateValue(g, TYPE_INT);
+
+ if (filand) {
+ // filp must be inserted here
+ filand->Arg(1) = filp;
+ filand->Val(1) = filp->GetValue();
+ filp = this;
+ } // endif filand
+
+ return filp;
+ } // end of LinkFilter
+
+/***********************************************************************/
+/* Checks whether filter contains reference to a previous table that */
+/* is not logically joined to the currently openned table, or whether */
+/* it is a Sub-Select filter. In any case, local is set to FALSE. */
+/* Note: This function is now applied to de-linearized filters. */
+/***********************************************************************/
+bool FILTER::CheckLocal(PTDB tdbp)
+ {
+ bool local = TRUE;
+
+ if (trace) {
+ if (tdbp)
+ htrc("CheckLocal: filp=%p R%d\n", this, tdbp->GetTdb_No());
+ else
+ htrc("CheckLocal: filp=%p\n", this);
+ } // endif trace
+
+ for (int i = 0; local && i < 2; i++)
+ local = Arg(i)->CheckLocal(tdbp);
+
+ if (trace)
+ htrc("FCL: returning %d\n", local);
+
+ return (local);
+ } // end of CheckLocal
+
+/***********************************************************************/
+/* This routine is used to split the filter attached to the tdbp */
+/* table into the local and not local part where "local" means that */
+/* it applies "locally" to the FILEID special column with crit = 2 */
+/* and to the SERVID and/or TABID special columns with crit = 3. */
+/* Returns: */
+/* 0: the whole filter is local (both arguments are) */
+/* 1: the whole filter is not local */
+/* 2: the filter was split in local (attached to fp[0]) and */
+/* not local (attached to fp[1]). */
+/* Note: "Locally" means that the "local" filter can be evaluated */
+/* before opening the table. This implies that the special column be */
+/* compared only with constants and that this filter not to be or'ed */
+/* with a non "local" filter. */
+/***********************************************************************/
+int FILTER::SplitFilter(PFIL *fp, PTDB tp, int crit)
+ {
+ int i, rc[2];
+
+ if (Opc == OP_AND) {
+ for (i = 0; i < 2; i++)
+ rc[i] = ((PFIL)Arg(i))->SplitFilter(fp, tp, crit);
+
+ // Filter first argument should never be split because of the
+ // algorithm used to de-linearize the filter.
+ assert(rc[0] != 2);
+
+ if (rc[0] != rc[1]) {
+ // Splitting to be done
+ if (rc[1] == 2) {
+ // 2nd argument already split, add 1st to the proper filter
+ assert(fp[*rc]);
+ Arg(1) = fp[*rc];
+ Val(1) = fp[*rc]->GetValue();
+ fp[*rc] = this;
+ } else for (i = 0; i < 2; i++) {
+ // Split the filter arguments
+ assert(!fp[rc[i]]);
+ fp[rc[i]] = (PFIL)Arg(i);
+ } // endfor i
+
+ *rc = 2;
+ } // endif rc
+
+ } else
+ *rc = (CheckSpcCol(tp, crit) == 1) ? 0 : 1;
+
+ return *rc;
+ } // end of SplitFilter
+
+/***********************************************************************/
+/* Checks whether filter contains only references to FILEID, SERVID, */
+/* or TABID with constants or pseudo constants. */
+/***********************************************************************/
+int FILTER::CheckSpcCol(PTDB tdbp, int n)
+ {
+ int n1 = Arg(0)->CheckSpcCol(tdbp, n);
+ int n2 = Arg(1)->CheckSpcCol(tdbp, n);
+
+ return max(n1, n2);
+ } // end of CheckSpcCol
+
+/***********************************************************************/
+/* Reset the filter arguments to non evaluated yet. */
+/***********************************************************************/
+void FILTER::Reset(void)
+ {
+ for (int i = 0; i < 2; i++)
+ Arg(i)->Reset();
+
+ } // end of Reset
+
+/***********************************************************************/
+/* Init: called when reinitializing a query (Correlated subqueries) */
+/***********************************************************************/
+bool FILTER::Init(PGLOBAL g)
+ {
+ for (int i = 0; i < 2; i++)
+ Arg(i)->Init(g);
+
+ return FALSE;
+ } // end of Init
+
+/***********************************************************************/
+/* Convert: does all filter setting and conversions. */
+/* (having = TRUE for Having Clauses, FALSE for Where Clauses) */
+/* Note: hierarchy of types is implied by the ConvertType */
+/* function, currently FLOAT, int, STRING and TOKEN. */
+/* Returns FALSE if successful or TRUE in case of error. */
+/* Note on result type for filters: */
+/* Currently the result type is of TYPE_INT (should be TYPE_BOOL). */
+/* This avoids to introduce a new type and perhaps will permit */
+/* conversions. However the boolean operators will result in a */
+/* boolean int result, meaning that result shall be only 0 or 1 . */
+/***********************************************************************/
+bool FILTER::Convert(PGLOBAL g, bool having)
+ {
+ int i, comtype = TYPE_ERROR;
+
+ if (trace)
+ htrc("converting(?) %s %p opc=%d\n",
+ (having) ? "having" : "filter", this, Opc);
+
+ for (i = 0; i < 2; i++) {
+ switch (GetArgType(i)) {
+ case TYPE_COLBLK:
+ if (((PCOL)Arg(i))->InitValue(g))
+ return TRUE;
+
+ break;
+ case TYPE_ARRAY:
+ if ((Opc != OP_IN && !Opm) || i == 0) {
+ strcpy(g->Message, MSG(BAD_ARRAY_OPER));
+ return TRUE;
+ } // endif
+
+ if (((PARRAY)Arg(i))->Sort(g)) // Sort the array
+ return TRUE; // Error
+
+ break;
+ case TYPE_VOID:
+ if (i == 1) {
+ Val(0) = Arg(0)->GetValue();
+ goto TEST; // Filter has only one argument
+ } // endif i
+
+ strcpy(g->Message, MSG(VOID_FIRST_ARG));
+ return TRUE;
+ } // endswitch
+
+ if (trace)
+ htrc("Filter(%d): Arg type=%d\n", i, GetArgType(i));
+
+ // Set default values
+ Test[i].B_T = Arg(i)->GetResultType();
+ Test[i].Conv = FALSE;
+
+ // Special case of the LIKE operator.
+ if (Opc == OP_LIKE) {
+ if (!IsTypeChar((int)Test[i].B_T)) {
+ sprintf(g->Message, MSG(BAD_TYPE_LIKE), i, Test[i].B_T);
+ return TRUE;
+ } // endif
+
+ comtype = TYPE_STRING;
+ } else {
+ // Set the common type for both (eventually converted) arguments
+ int argtyp = Test[i].B_T;
+
+ if (GetArgType(i) == TYPE_CONST && argtyp == TYPE_INT) {
+ // If possible, downcast the type to smaller types to avoid
+ // convertion as much as possible.
+ int n = Arg(i)->GetValue()->GetIntValue();
+
+ if (n >= INT_MIN8 && n <= INT_MAX8)
+ argtyp = TYPE_TINY;
+ else if (n >= INT_MIN16 && n <= INT_MAX16)
+ argtyp = TYPE_SHORT;
+
+ } else if (GetArgType(i) == TYPE_ARRAY) {
+ // If possible, downcast int arrays target type to TYPE_SHORT
+ // to take care of filters written like shortcol in (34,35,36).
+ if (((PARRAY)Arg(i))->CanBeShort())
+ argtyp = TYPE_SHORT;
+
+ } // endif TYPE_CONST
+
+ comtype = ConvertType(comtype, argtyp, CNV_ANY);
+ } // endif Opc
+
+ if (comtype == TYPE_ERROR) {
+ strcpy(g->Message, MSG(ILL_FILTER_CONV));
+ return TRUE;
+ } // endif
+
+ if (trace)
+ htrc(" comtype=%d, B_T(%d)=%d Val(%d)=%p\n",
+ comtype, i, Test[i].B_T, i, Val(i));
+
+ } // endfor i
+
+ // Set or allocate the filter argument values and buffers
+ for (i = 0; i < 2; i++) {
+ if (trace)
+ htrc(" conv type %d ? i=%d B_T=%d comtype=%d\n",
+ GetArgType(i), i, Test[i].B_T, comtype);
+
+ if (Test[i].B_T == comtype) {
+ // No conversion, set Value to argument Value
+ Val(i) = Arg(i)->GetValue();
+#if defined(_DEBUG)
+ assert (Val(i) && Val(i)->GetType() == Test[i].B_T);
+#endif
+ } else {
+ // Conversion between filter arguments to be done.
+ // Note that the argument must be converted, not only the
+ // buffer and buffer type, so GetArgType() returns the new type.
+ switch (GetArgType(i)) {
+ case TYPE_CONST:
+ if (comtype == TYPE_DATE && Test[i].B_T == TYPE_STRING) {
+ // Convert according to the format of the other argument
+ Val(i) = AllocateValue(g, comtype, Arg(i)->GetLength());
+
+ if (((DTVAL*)Val(i))->SetFormat(g, Val(1-i)))
+ return TRUE;
+
+ Val(i)->SetValue_psz(Arg(i)->GetValue()->GetCharValue());
+ } else {
+ ((PCONST)Arg(i))->Convert(g, comtype);
+ Val(i) = Arg(i)->GetValue();
+ } // endif comtype
+
+ break;
+ case TYPE_ARRAY:
+ // Conversion PSZ or int array to int or double FLOAT.
+ if (((PARRAY)Arg(i))->Convert(g, comtype, Val(i-1)) == TYPE_ERROR)
+ return TRUE;
+
+ break;
+ case TYPE_FILTER:
+ strcpy(g->Message, MSG(UNMATCH_FIL_ARG));
+ return TRUE;
+ default:
+ // Conversion from Column, Select/Func, Expr, Scalfnc...
+ // The argument requires conversion during Eval
+ // A separate Value block must be allocated.
+ // Note: the test on comtype is to prevent unnecessary
+ // domain initialization and get the correct length in
+ // case of Token -> numeric conversion.
+ Val(i) = AllocateValue(g, comtype, (comtype == TYPE_STRING)
+ ? Arg(i)->GetLengthEx() : Arg(i)->GetLength());
+
+ if (comtype == TYPE_DATE && Test[i].B_T == TYPE_STRING)
+ // Convert according to the format of the other argument
+ if (((DTVAL*)Val(i))->SetFormat(g, Val(1 - i)))
+ return TRUE;
+
+ Test[i].Conv = TRUE;
+ break;
+ } // endswitch GetType
+
+ Test[i].B_T = comtype;
+ } // endif comtype
+
+ } // endfor i
+
+ // Last check to be sure all is correct.
+ if (Test[0].B_T != Test[1].B_T) {
+ sprintf(g->Message, MSG(BAD_FILTER_CONV), Test[0].B_T, Test[1].B_T);
+ return TRUE;
+//} else if (Test[0].B_T == TYPE_LIST &&
+// ((LSTVAL*)Val(0))->GetN() != ((LSTVAL*)Val(1))->GetN()) {
+// sprintf(g->Message, MSG(ROW_ARGNB_ERR),
+// ((LSTVAL*)Val(0))->GetN(), ((LSTVAL*)Val(1))->GetN());
+// return TRUE;
+ } // endif's B_T
+
+
+ TEST: // Test for possible Eval optimization
+
+ if (trace)
+ htrc("Filp %p op=%d argtypes=(%d,%d)\n",
+ this, Opc, GetArgType(0), GetArgType(1));
+
+ // Check whether we have a "simple" filter and in that case
+ // change its class so an optimized Eval function will be used
+ if (!Test[0].Conv && !Test[1].Conv) {
+ if (Opm) switch (Opc) {
+ case OP_EQ:
+ case OP_NE:
+ case OP_GT:
+ case OP_GE:
+ case OP_LT:
+ case OP_LE:
+ if (GetArgType(1) != TYPE_ARRAY)
+ break; // On subquery, do standard processing
+
+ // Change the FILTER class to FILTERIN
+ new(this) FILTERIN;
+ break;
+ default:
+ break;
+ } // endswitch Opc
+
+ else switch (Opc) {
+#if 0
+ case OP_EQ: new(this) FILTEREQ; break;
+ case OP_NE: new(this) FILTERNE; break;
+ case OP_GT: new(this) FILTERGT; break;
+ case OP_GE: new(this) FILTERGE; break;
+ case OP_LT: new(this) FILTERLT; break;
+ case OP_LE: new(this) FILTERLE; break;
+#endif // 0
+ case OP_EQ:
+ case OP_NE:
+ case OP_GT:
+ case OP_GE:
+ case OP_LT:
+ case OP_LE: new(this) FILTERCMP(g); break;
+ case OP_AND: new(this) FILTERAND; break;
+ case OP_OR: new(this) FILTEROR; break;
+ case OP_NOT: new(this) FILTERNOT; break;
+ case OP_EXIST:
+ if (GetArgType(1) == TYPE_VOID) {
+ // For EXISTS it is the first argument that should be null
+ Arg(1) = Arg(0);
+ Arg(0) = pXVOID;
+ } // endif void
+
+ // pass thru
+ case OP_IN:
+ // For IN operator do optimize if operand is an array
+ if (GetArgType(1) != TYPE_ARRAY)
+ break; // IN on subquery, do standard processing
+
+ // Change the FILTER class to FILTERIN
+ new(this) FILTERIN;
+ break;
+ default:
+ break;
+ } // endswitch Opc
+
+ } // endif Conv
+
+ // The result value (should be TYPE_BOOL ???)
+ Value = AllocateValue(g, TYPE_INT);
+ return FALSE;
+ } // end of Convert
+
+/***********************************************************************/
+/* Eval: Compute filter result value. */
+/* New algorithm: evaluation is now done from the root for each group */
+/* so Eval is now a recursive process for FILTER operands. */
+/***********************************************************************/
+bool FILTER::Eval(PGLOBAL g)
+ {
+ int i; // n = 0;
+//PSUBQ subp = NULL;
+ PARRAY ap = NULL;
+ PDBUSER dup = PlgGetUser(g);
+
+ if (Opc <= OP_XX)
+ for (i = 0; i < 2; i++)
+ // Evaluate the object and eventually convert it.
+ if (Arg(i)->Eval(g))
+ return TRUE;
+ else if (Test[i].Conv)
+ Val(i)->SetValue_pval(Arg(i)->GetValue());
+
+ if (trace)
+ htrc(" Filter: op=%d type=%d %d B_T=%d %d val=%p %p\n",
+ Opc, GetArgType(0), GetArgType(1), Test[0].B_T, Test[1].B_T,
+ Val(0), Val(1));
+
+ // Main switch on filtering according to operator type.
+ switch (Opc) {
+ case OP_EQ:
+ case OP_NE:
+ case OP_GT:
+ case OP_GE:
+ case OP_LT:
+ case OP_LE:
+ if (!Opm) {
+ // Comparison boolean operators.
+#if defined(_DEBUG)
+ if (Val(0)->GetType() != Val(1)->GetType())
+ goto FilterError;
+#endif
+ // Compare the two arguments
+ // New algorithm to take care of TYPE_LIST
+ Bt = OpBmp(g, Opc);
+ Value->SetValue_bool(!(Val(0)->TestValue(Val(1)) & Bt));
+ break;
+ } // endif Opm
+
+ // For modified operators, pass thru
+ case OP_IN:
+ case OP_EXIST:
+ // For IN operations, special processing is done here
+ switch (GetArgType(1)) {
+ case TYPE_ARRAY:
+ ap = (PARRAY)Arg(1);
+ break;
+ default:
+ strcpy(g->Message, MSG(IN_WITHOUT_SUB));
+ goto FilterError;
+ } // endswitch Type
+
+ if (trace) {
+ htrc(" IN filtering: ap=%p\n", ap);
+
+ if (ap)
+ htrc(" Array: type=%d size=%d other_type=%d\n",
+ ap->GetType(), ap->GetSize(), Test[0].B_T);
+
+ } // endif trace
+
+ /*****************************************************************/
+ /* Implementation note: The Find function is now able to do a */
+ /* conversion but limited to SHORT, int, and FLOAT arrays. */
+ /*****************************************************************/
+// Value->SetValue_bool(ap->Find(g, Val(0)));
+
+ if (ap)
+ Value->SetValue_bool(ap->FilTest(g, Val(0), Opc, Opm));
+
+ break;
+
+ case OP_LIKE:
+#if defined(_DEBUG)
+ if (!IsTypeChar((int)Test[0].B_T) || !IsTypeChar((int)Test[1].B_T))
+ goto FilterError;
+#endif
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue_bool(PlugEvalLike(g, Val(0)->GetCharValue(),
+ Val(1)->GetCharValue(),
+ Val(0)->IsCi()));
+ break;
+
+ case OP_AND:
+#if defined(_DEBUG)
+ if (Test[0].B_T != TYPE_INT || Test[1].B_T != TYPE_INT)
+ goto FilterError;
+#endif
+
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(0)->GetIntValue());
+
+ if (!Value->GetIntValue())
+ return FALSE; // No need to evaluate 2nd argument
+
+ if (Arg(1)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(1)->GetIntValue());
+ break;
+
+ case OP_OR:
+#if defined(_DEBUG)
+ if (Test[0].B_T != TYPE_INT || Test[1].B_T != TYPE_INT)
+ goto FilterError;
+#endif
+
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(0)->GetIntValue());
+
+ if (Value->GetIntValue())
+ return FALSE; // No need to evaluate 2nd argument
+
+ if (Arg(1)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(1)->GetIntValue());
+ break;
+
+ case OP_NOT:
+#if defined(_DEBUG)
+ if (Test[0].B_T != TYPE_INT) // Should be type bool ???
+ goto FilterError;
+#endif
+
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue_bool(!Val(0)->GetIntValue());
+ break;
+
+ case OP_SEP: // No more used while evaluating
+ default:
+ goto FilterError;
+ } // endswitch Opc
+
+ if (trace)
+ htrc("Eval: filter %p Opc=%d result=%d\n",
+ this, Opc, Value->GetIntValue());
+
+ return FALSE;
+
+ FilterError:
+ sprintf(g->Message, MSG(BAD_FILTER),
+ Opc, Test[0].B_T, Test[1].B_T, GetArgType(0), GetArgType(1));
+ return TRUE;
+ } // end of Eval
+
+#if 0
+/***********************************************************************/
+/* Called by PlugCopyDB to make a copy of a (linearized) filter chain.*/
+/***********************************************************************/
+PFIL FILTER::Copy(PTABS t)
+ {
+ int i;
+ PFIL fil1, fil2, newfilchain = NULL, fprec = NULL;
+
+ for (fil1 = this; fil1; fil1 = fil1->Next) {
+ fil2 = new(t->G) FILTER(fil1);
+
+ if (!fprec)
+ newfilchain = fil2;
+ else
+ fprec->Next = fil2;
+
+ NewPointer(t, fil1, fil2);
+
+ for (i = 0; i < 2; i++)
+ if (fil1->GetArgType(i) == TYPE_COLBLK ||
+ fil1->GetArgType(i) == TYPE_FILTER)
+ AddPointer(t, &fil2->Arg(i));
+
+ fprec = fil2;
+ } /* endfor fil1 */
+
+ return newfilchain;
+ } // end of Copy
+#endif // 0
+
+/*********************************************************************/
+/* Make file output of FILTER contents. */
+/*********************************************************************/
+void FILTER::Print(PGLOBAL g, FILE *f, UINT n)
+ {
+ char m[64];
+
+ memset(m, ' ', n); // Make margin string
+ m[n] = '\0';
+
+ bool lin = (Next != NULL); // lin == TRUE if linearized
+
+ for (PFIL fp = this; fp; fp = fp->Next) {
+ fprintf(f, "%sFILTER: at %p opc=%d lin=%d result=%d\n",
+ m, fp, fp->Opc, lin,
+ (Value) ? Value->GetIntValue() : 0);
+
+ for (int i = 0; i < 2; i++) {
+ fprintf(f, "%s Arg(%d) type=%d value=%p B_T=%d val=%p\n",
+ m, i, fp->GetArgType(i), fp->Arg(i),
+ fp->Test[i].B_T, fp->Val(i));
+
+ if (lin && fp->GetArgType(i) == TYPE_FILTER)
+ fprintf(f, "%s Filter at %p\n", m, fp->Arg(i));
+ else
+ fp->Arg(i)->Print(g, f, n + 2);
+
+ } // endfor i
+
+ } // endfor fp
+
+ } // end of Print
+
+/***********************************************************************/
+/* Make string output of TABLE contents (z should be checked). */
+/***********************************************************************/
+void FILTER::Print(PGLOBAL g, char *ps, UINT z)
+ {
+ #define FLEN 100
+
+ typedef struct _bc {
+ struct _bc *Next;
+ char Cold[FLEN+1];
+ } BC, *PBC;
+
+ char *p;
+ int n;
+ PFIL fp;
+ PBC bxp, bcp = NULL;
+
+ *ps = '\0';
+
+ for (fp = this; fp && z > 0; fp = fp->Next) {
+ if (fp->Opc < OP_CNC || fp->Opc == OP_IN || fp->Opc == OP_NULL
+ || fp->Opc == OP_LIKE || fp->Opc == OP_EXIST) {
+ if (!(bxp = new BC)) {
+ strncat(ps, "Filter(s)", z);
+ return;
+ } /* endif */
+
+ bxp->Next = bcp;
+ bcp = bxp;
+ p = bcp->Cold;
+ n = FLEN;
+ fp->Arg(0)->Print(g, p, n);
+ n = FLEN - strlen(p);
+
+ switch (fp->Opc) {
+ case OP_EQ:
+ strncat(bcp->Cold, "=", n);
+ break;
+ case OP_NE:
+ strncat(bcp->Cold, "!=", n);
+ break;
+ case OP_GT:
+ strncat(bcp->Cold, ">", n);
+ break;
+ case OP_GE:
+ strncat(bcp->Cold, ">=", n);
+ break;
+ case OP_LT:
+ strncat(bcp->Cold, "<", n);
+ break;
+ case OP_LE:
+ strncat(bcp->Cold, "<=", n);
+ break;
+ case OP_IN:
+ strncat(bcp->Cold, " in ", n);
+ break;
+ case OP_NULL:
+ strncat(bcp->Cold, " is null", n);
+ break;
+ case OP_LIKE:
+ strncat(bcp->Cold, " like ", n);
+ break;
+ case OP_EXIST:
+ strncat(bcp->Cold, " exists ", n);
+ break;
+ case OP_AND:
+ strncat(bcp->Cold, " and ", n);
+ break;
+ case OP_OR:
+ strncat(bcp->Cold, " or ", n);
+ break;
+ default:
+ strncat(bcp->Cold, "?", n);
+ } // endswitch Opc
+
+ n = FLEN - strlen(p);
+ p += strlen(p);
+ fp->Arg(1)->Print(g, p, n);
+ } else
+ if (!bcp) {
+ strncat(ps, "???", z);
+ z -= 3;
+ } else
+ switch (fp->Opc) {
+ case OP_SEP: // Filter list separator
+ strncat(ps, bcp->Cold, z);
+ z -= strlen(bcp->Cold);
+ strncat(ps, ";", z--);
+ bxp = bcp->Next;
+ delete bcp;
+ bcp = bxp;
+ break;
+ case OP_NOT: // Filter NOT operator
+ for (n = min((int)strlen(bcp->Cold), FLEN-3); n >= 0; n--)
+ bcp->Cold[n+2] = bcp->Cold[n];
+ bcp->Cold[0] = '^';
+ bcp->Cold[1] = '(';
+ strcat(bcp->Cold, ")");
+ break;
+ default:
+ for (n = min((int)strlen(bcp->Cold), FLEN-4); n >= 0; n--)
+ bcp->Cold[n+3] = bcp->Cold[n];
+ bcp->Cold[0] = ')';
+ switch (fp->Opc) {
+ case OP_AND: bcp->Cold[1] = '&'; break;
+ case OP_OR: bcp->Cold[1] = '|'; break;
+ default: bcp->Cold[1] = '?';
+ } // endswitch
+ bcp->Cold[2] = '(';
+ strcat(bcp->Cold, ")");
+ bxp = bcp->Next;
+ for (n = min((int)strlen(bxp->Cold), FLEN-1); n >= 0; n--)
+ bxp->Cold[n+1] = bxp->Cold[n];
+ bxp->Cold[0] = '(';
+ strncat(bxp->Cold, bcp->Cold, FLEN-strlen(bxp->Cold));
+ delete bcp;
+ bcp = bxp;
+ } // endswitch
+
+ } // endfor fp
+
+ n = 0;
+
+ if (!bcp)
+ strncat(ps, "Null-Filter", z);
+ else do {
+ if (z > 0) {
+ if (n++ > 0) {
+ strncat(ps, "*?*", z);
+ z = max(0, (int)z-3);
+ } // endif
+ strncat(ps, bcp->Cold, z);
+ z -= strlen(bcp->Cold);
+ } // endif
+
+ bxp = bcp->Next;
+ delete bcp;
+ bcp = bxp;
+ } while (bcp); // enddo
+
+ } // end of Print
+
+
+/* -------------------- Derived Classes Functions -------------------- */
+
+/***********************************************************************/
+/* FILTERCMP constructor. */
+/***********************************************************************/
+FILTERCMP::FILTERCMP(PGLOBAL g)
+ {
+ Bt = OpBmp(g, Opc);
+ } // end of FILTERCMP constructor
+
+/***********************************************************************/
+/* Eval: Compute result value for comparison operators. */
+/***********************************************************************/
+bool FILTERCMP::Eval(PGLOBAL g)
+ {
+ if (Arg(0)->Eval(g) || Arg(1)->Eval(g))
+ return TRUE;
+
+ Value->SetValue_bool(!(Val(0)->TestValue(Val(1)) & Bt));
+ return FALSE;
+ } // end of Eval
+
+/***********************************************************************/
+/* Eval: Compute result value for AND filters. */
+/***********************************************************************/
+bool FILTERAND::Eval(PGLOBAL g)
+ {
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(0)->GetIntValue());
+
+ if (!Value->GetIntValue())
+ return FALSE; // No need to evaluate 2nd argument
+
+ if (Arg(1)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(1)->GetIntValue());
+ return FALSE;
+ } // end of Eval
+
+/***********************************************************************/
+/* Eval: Compute result value for OR filters. */
+/***********************************************************************/
+bool FILTEROR::Eval(PGLOBAL g)
+ {
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(0)->GetIntValue());
+
+ if (Value->GetIntValue())
+ return FALSE; // No need to evaluate 2nd argument
+
+ if (Arg(1)->Eval(g))
+ return TRUE;
+
+ Value->SetValue(Val(1)->GetIntValue());
+ return FALSE;
+ } // end of Eval
+
+/***********************************************************************/
+/* Eval: Compute result value for NOT filters. */
+/***********************************************************************/
+bool FILTERNOT::Eval(PGLOBAL g)
+ {
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue_bool(!Val(0)->GetIntValue());
+ return FALSE;
+ } // end of Eval
+
+/***********************************************************************/
+/* Eval: Compute result value for IN filters. */
+/***********************************************************************/
+bool FILTERIN::Eval(PGLOBAL g)
+ {
+ if (Arg(0)->Eval(g))
+ return TRUE;
+
+ Value->SetValue_bool(((PARRAY)Arg(1))->FilTest(g, Val(0), Opc, Opm));
+ return FALSE;
+ } // end of Eval
+
+/***********************************************************************/
+/* FILTERTRUE does nothing and returns TRUE. */
+/***********************************************************************/
+void FILTERTRUE::Reset(void)
+ {
+ } // end of Reset
+
+bool FILTERTRUE::Eval(PGLOBAL)
+ {
+ return FALSE;
+ } // end of Eval
+
+/* ------------------------- Friend Functions ------------------------ */
+
+#if 0
+/***********************************************************************/
+/* Prepare: prepare a filter for execution. This implies two things: */
+/* 1) de-linearize the filter to be able to evaluate it recursively. */
+/* This permit to conditionally evaluate only the first argument */
+/* of OP_OR and OP_AND filters without having to pass by an */
+/* intermediate Apply function (as this has a performance cost). */
+/* 2) do all the necessary conversion for all filter block arguments. */
+/***********************************************************************/
+PFIL PrepareFilter(PGLOBAL g, PFIL fp, bool having)
+ {
+ PFIL filp = NULL;
+
+ if (trace)
+ htrc("PrepareFilter: fp=%p having=%d\n", fp, having);
+//if (fp)
+// fp->Print(g, debug, 0);
+
+ while (fp) {
+ if (fp->Opc == OP_SEP)
+ // If separator is not last transform it into an AND filter
+ if (fp->Next) {
+ filp = PrepareFilter(g, fp->Next, having);
+ fp->Arg(1) = filp;
+ fp->Opc = OP_AND;
+ fp->Next = NULL; // This will end the loop
+ } else
+ break; // Remove eventual ending separator(s)
+
+// if (fp->Convert(g, having))
+// longjmp(g->jumper[g->jump_level], TYPE_FILTER);
+
+ filp = fp;
+ fp = fp->Next;
+ filp->Next = NULL;
+ } // endwhile
+
+ if (trace)
+ htrc(" returning filp=%p\n", filp);
+//if (filp)
+// filp->Print(g, debug, 0);
+
+ return filp;
+ } // end of PrepareFilter
+#endif // 0
+
+/***********************************************************************/
+/* ApplyFilter: Apply filtering for a table (where or having clause). */
+/* New algorithm: evaluate from the root a de-linearized filter so */
+/* AND/OR clauses can be optimized throughout the whole tree. */
+/***********************************************************************/
+DllExport bool ApplyFilter(PGLOBAL g, PFIL filp, PTDB tdbp)
+ {
+ if (!filp)
+ return TRUE;
+
+ // Must be done for null tables
+ filp->Reset();
+
+//if (tdbp && tdbp->IsNull())
+// return TRUE;
+
+ if (filp->Eval(g))
+ longjmp(g->jumper[g->jump_level], TYPE_FILTER);
+
+ if (trace)
+ htrc("PlugFilter filp=%p result=%d\n",
+ filp, filp->GetResult());
+
+ return filp->GetResult();
+ } // end of ApplyFilter
diff --git a/storage/connect/filter.h b/storage/connect/filter.h
new file mode 100644
index 00000000000..a24ca18dc59
--- /dev/null
+++ b/storage/connect/filter.h
@@ -0,0 +1,172 @@
+/*************** Filter H Declares Source Code File (.H) ***************/
+/* Name: FILTER.H Version 1.2 */
+/* */
+/* (C) Copyright to the author Olivier BERTRAND 2010-2012 */
+/* */
+/* This file contains the FILTER and derived classes declares. */
+/***********************************************************************/
+
+/***********************************************************************/
+/* Include required application header files */
+/***********************************************************************/
+#include "xobject.h"
+
+/***********************************************************************/
+/* Utilities for WHERE condition building. */
+/***********************************************************************/
+PFIL MakeFilter(PGLOBAL g, PFIL filp, OPVAL vop, PFIL fp);
+PFIL MakeFilter(PGLOBAL g, PCOL *colp, POPER pop, PPARM pfirst, bool neg);
+
+/***********************************************************************/
+/* Definition of class FILTER with all its method functions. */
+/* Note: Most virtual implementation functions are not in use yet */
+/* but could be in future system evolution. */
+/***********************************************************************/
+class DllExport FILTER : public XOBJECT { /* Filter description block */
+//friend PFIL PrepareFilter(PGLOBAL, PFIL, bool);
+ friend DllExport bool ApplyFilter(PGLOBAL, PFIL, PTDB = NULL);
+ public:
+ // Constructors
+ FILTER(PGLOBAL g, POPER pop, PPARM *tp = NULL);
+ FILTER(PGLOBAL g, OPVAL opc, PPARM *tp = NULL);
+ FILTER(PFIL fil1);
+
+ // Implementation
+ virtual int GetType(void) {return TYPE_FILTER;}
+ virtual int GetResultType(void) {return TYPE_INT;}
+ virtual int GetLength(void) {return 1;}
+ virtual int GetLengthEx(void) {assert(FALSE); return 0;}
+ virtual int GetScale() {return 0;};
+ PFIL GetNext(void) {return Next;}
+ OPVAL GetOpc(void) {return Opc;}
+ int GetOpm(void) {return Opm;}
+ int GetArgType(int i) {return Arg(i)->GetType();}
+ bool GetResult(void) {return Value->GetIntValue() != 0;}
+ PXOB &Arg(int i) {return Test[i].Arg;}
+ PVAL &Val(int i) {return Test[i].Value;}
+ bool &Conv(int i) {return Test[i].Conv;}
+ void SetNext(PFIL filp) {Next = filp;}
+
+ // Methods
+ virtual void Reset(void);
+ virtual bool Compare(PXOB) {return FALSE;} // Not used yet
+ virtual bool Init(PGLOBAL);
+ virtual bool Eval(PGLOBAL);
+ virtual bool SetFormat(PGLOBAL, FORMAT&) {return TRUE;} // NUY
+ virtual int CheckColumn(PGLOBAL g, PSQL sqlp, PXOB &xp, int &ag);
+ virtual int RefNum(PSQL);
+ virtual PXOB SetSelect(PGLOBAL, PSQL, bool) {return NULL;} // NUY
+//virtual PXOB CheckSubQuery(PGLOBAL, PSQL);
+ virtual bool CheckLocal(PTDB);
+ virtual int CheckSpcCol(PTDB tdbp, int n);
+ virtual void Print(PGLOBAL g, FILE *f, UINT n);
+ virtual void Print(PGLOBAL g, char *ps, UINT z);
+ PFIL Linearize(bool nosep);
+ PFIL Link(PGLOBAL g, PFIL fil2);
+ PFIL RemoveLastSep(void);
+// PFIL SortJoin(PGLOBAL g);
+// bool FindJoinFilter(POPJOIN opj, PFIL fprec, bool teq,
+// bool tek, bool tk2, bool tc2, bool tix, bool thx);
+// bool CheckHaving(PGLOBAL g, PSQL sqlp);
+ bool Convert(PGLOBAL g, bool having);
+ int SplitFilter(PFIL *fp);
+ int SplitFilter(PFIL *fp, PTDB tp, int n);
+ PFIL LinkFilter(PGLOBAL g, PFIL fp2);
+// PFIL Copy(PTABS t);
+
+ protected:
+ FILTER(void) {} // Standard constructor not to be used
+ void Constr(PGLOBAL g, OPVAL opc, int opm, PPARM *tp);
+
+ // Members
+ PFIL Next; // Used for linearization
+ OPVAL Opc; // Comparison operator
+ int Opm; // Modificator
+ BYTE Bt; // Operator bitmap
+ struct {
+ int B_T; // Buffer type
+ PXOB Arg; // Points to argument
+ PVAL Value; // Points to argument value
+ bool Conv; // TRUE if argument must be converted
+ } Test[2];
+ }; // end of class FILTER
+
+/***********************************************************************/
+/* Derived class FILTERX: used to replace a filter by a derived class */
+/* using an Eval method optimizing the filtering evaluation. */
+/* Note: this works only if the members of the derived class are the */
+/* same than the ones of the original class (NO added members). */
+/***********************************************************************/
+class FILTERX : public FILTER {
+ public:
+ // Methods
+ virtual bool Eval(PGLOBAL) = 0; // just to prevent direct FILTERX use
+
+ // Fake operator new used to change a filter into a derived filter
+ void * operator new(size_t size, PFIL filp) {return filp;}
+#if !defined(__BORLANDC__)
+ // Avoid warning C4291 by defining a matching dummy delete operator
+ void operator delete(void *, PFIL) {}
+#endif
+ }; // end of class FILTERX
+
+/***********************************************************************/
+/* Derived class FILTEREQ: OP_EQ, no conversion and Xobject args. */
+/***********************************************************************/
+class FILTERCMP : public FILTERX {
+ public:
+ // Constructor
+ FILTERCMP(PGLOBAL g);
+
+ // Methods
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTEREQ
+
+/***********************************************************************/
+/* Derived class FILTERAND: OP_AND, no conversion and Xobject args. */
+/***********************************************************************/
+class FILTERAND : public FILTERX {
+ public:
+ // Methods
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTERAND
+
+/***********************************************************************/
+/* Derived class FILTEROR: OP_OR, no conversion and Xobject args. */
+/***********************************************************************/
+class FILTEROR : public FILTERX {
+ public:
+ // Methods
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTEROR
+
+/***********************************************************************/
+/* Derived class FILTERNOT: OP_NOT, no conversion and Xobject args. */
+/***********************************************************************/
+class FILTERNOT : public FILTERX {
+ public:
+ // Methods
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTERNOT
+
+/***********************************************************************/
+/* Derived class FILTERIN: OP_IN, no conversion and Array 2nd arg. */
+/***********************************************************************/
+class FILTERIN : public FILTERX {
+ public:
+ // Methods
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTERIN
+
+/***********************************************************************/
+/* Derived class FILTERTRUE: Always returns TRUE. */
+/***********************************************************************/
+class FILTERTRUE : public FILTERX {
+ public:
+ // Constructor
+ FILTERTRUE(PVAL valp) {Value = valp; Value->SetValue_bool(TRUE);}
+
+ // Methods
+ virtual void Reset(void);
+ virtual bool Eval(PGLOBAL);
+ }; // end of class FILTERTRUE