diff options
author | Olivier Bertrand <bertrandop@gmail.com> | 2020-12-11 16:34:50 +0100 |
---|---|---|
committer | Olivier Bertrand <bertrandop@gmail.com> | 2020-12-11 16:34:50 +0100 |
commit | aa10789f472b975f3ffae2a5adc6514a879226ba (patch) | |
tree | 52a208cae19e381d88799c59dc53c424bf6d83e0 | |
parent | 4eeadedc77018da781eb7ea008fd3474f1a354d5 (diff) | |
download | mariadb-git-aa10789f472b975f3ffae2a5adc6514a879226ba.tar.gz |
BSON development
-rw-r--r-- | storage/connect/bson.h | 2 | ||||
-rw-r--r-- | storage/connect/bsonudf.cpp | 1 | ||||
-rw-r--r-- | storage/connect/filamtxt.cpp | 200 | ||||
-rw-r--r-- | storage/connect/filamtxt.h | 25 | ||||
-rw-r--r-- | storage/connect/tabbson.cpp | 121 | ||||
-rw-r--r-- | storage/connect/tabdos.cpp | 3 |
6 files changed, 249 insertions, 103 deletions
diff --git a/storage/connect/bson.h b/storage/connect/bson.h index a6e160a3f3b..402981befaa 100644 --- a/storage/connect/bson.h +++ b/storage/connect/bson.h @@ -145,7 +145,7 @@ public: void SetBigint(PBVAL vlp, longlong ll); void SetFloat(PBVAL vlp, double f); void SetBool(PBVAL vlp, bool b); - void Clear(PBVAL vlp) { vlp->N = 0; vlp->Nd = 0; vlp->Next = 0; vlp->Type = TYPE_NULL; } + void Clear(PBVAL vlp) { vlp->N = 0; vlp->Nd = 0; vlp->Next = 0; } bool IsValueNull(PBVAL vlp); bool IsJson(PBVAL vlp) {return (vlp->Type == TYPE_JAR || vlp->Type == TYPE_JOB);} diff --git a/storage/connect/bsonudf.cpp b/storage/connect/bsonudf.cpp index b7a778b8387..76ecce5133b 100644 --- a/storage/connect/bsonudf.cpp +++ b/storage/connect/bsonudf.cpp @@ -1737,7 +1737,6 @@ char *bfile_bjson(UDF_INIT *initid, UDF_ARGS *args, char *result, ssize_t len, newloc; size_t lrecl, binszp; PBVAL jsp; - PBJNX bnxp; PGLOBAL g = (PGLOBAL)initid->ptr; BDOC doc(g); diff --git a/storage/connect/filamtxt.cpp b/storage/connect/filamtxt.cpp index 758a4b1d8cf..ef6d3ecafca 100644 --- a/storage/connect/filamtxt.cpp +++ b/storage/connect/filamtxt.cpp @@ -1668,13 +1668,14 @@ void BLKFAM::Rewind(void) /***********************************************************************/ /* BIN GetFileLength: returns file size in number of bytes. */ /***********************************************************************/ -int BINFAM::GetFileLength(PGLOBAL g) { +int BINFAM::GetFileLength(PGLOBAL g) +{ int len; - if (!BStream) + if (!Stream) len = TXTFAM::GetFileLength(g); else - if ((len = _filelength(_fileno(BStream))) < 0) + if ((len = _filelength(_fileno(Stream))) < 0) sprintf(g->Message, MSG(FILELEN_ERROR), "_filelength", To_File); xtrc(1, "File length=%d\n", len); @@ -1686,10 +1687,12 @@ int BINFAM::GetFileLength(PGLOBAL g) { /* This function can be called with a null argument to test the */ /* availability of Cardinality implementation (1 yes, 0 no). */ /***********************************************************************/ -int BINFAM::Cardinality(PGLOBAL g) { +int BINFAM::Cardinality(PGLOBAL g) +{ return (g) ? -1 : 0; } // end of Cardinality +#if 0 /***********************************************************************/ /* OpenTableFile: Open a DOS/UNIX table file using C standard I/Os. */ /***********************************************************************/ @@ -1713,16 +1716,16 @@ bool BINFAM::OpenTableFile(PGLOBAL g) { // Now open the file stream PlugSetPath(filename, To_File, Tdbp->GetPath()); - if (!(BStream = PlugOpenFile(g, filename, opmode))) { + if (!(Stream = PlugOpenFile(g, filename, opmode))) { if (trace(1)) htrc("%s\n", g->Message); return (mode == MODE_READ && errno == ENOENT) ? PushWarning(g, Tdbp) : true; - } // endif BStream + } // endif Stream if (trace(1)) - htrc("File %s open BStream=%p mode=%s\n", filename, BStream, opmode); + htrc("File %s open Stream=%p mode=%s\n", filename, Stream, opmode); To_Fb = dbuserp->Openlist; // Keep track of File block @@ -1731,12 +1734,14 @@ bool BINFAM::OpenTableFile(PGLOBAL g) { /*********************************************************************/ return AllocateBuffer(g); } // end of OpenTableFile +#endif 0 /***********************************************************************/ /* Allocate the line buffer. For mode Delete a bigger buffer has to */ /* be allocated because is it also used to move lines into the file. */ /***********************************************************************/ -bool BINFAM::AllocateBuffer(PGLOBAL g) { +bool BINFAM::AllocateBuffer(PGLOBAL g) +{ MODE mode = Tdbp->GetMode(); // Lrecl is Ok @@ -1749,6 +1754,7 @@ bool BINFAM::AllocateBuffer(PGLOBAL g) { return false; } // end of AllocateBuffer +#if 0 /***********************************************************************/ /* GetRowID: return the RowID of last read record. */ /***********************************************************************/ @@ -1767,7 +1773,7 @@ int BINFAM::GetPos(void) { /* GetNextPos: return the position of next record. */ /***********************************************************************/ int BINFAM::GetNextPos(void) { - return ftell(BStream); + return ftell(Stream); } // end of GetNextPos /***********************************************************************/ @@ -1776,7 +1782,7 @@ int BINFAM::GetNextPos(void) { bool BINFAM::SetPos(PGLOBAL g, int pos) { Fpos = pos; - if (fseek(BStream, Fpos, SEEK_SET)) { + if (fseek(Stream, Fpos, SEEK_SET)) { sprintf(g->Message, MSG(FSETPOS_ERROR), Fpos); return true; } // endif @@ -1789,7 +1795,7 @@ bool BINFAM::SetPos(PGLOBAL g, int pos) { /* Record file position in case of UPDATE or DELETE. */ /***********************************************************************/ bool BINFAM::RecordPos(PGLOBAL g) { - if ((Fpos = ftell(BStream)) < 0) { + if ((Fpos = ftell(Stream)) < 0) { sprintf(g->Message, MSG(FTELL_ERROR), 0, strerror(errno)); // strcat(g->Message, " (possible wrong ENDING option value)"); return true; @@ -1797,14 +1803,16 @@ bool BINFAM::RecordPos(PGLOBAL g) { return false; } // end of RecordPos +#endif // 0 /***********************************************************************/ /* ReadBuffer: Read one line for a text file. */ /***********************************************************************/ -int BINFAM::ReadBuffer(PGLOBAL g) { +int BINFAM::ReadBuffer(PGLOBAL g) +{ int rc; - if (!BStream) + if (!Stream) return RC_EF; xtrc(2, "ReadBuffer: Tdbp=%p To_Line=%p Placed=%d\n", @@ -1823,11 +1831,11 @@ int BINFAM::ReadBuffer(PGLOBAL g) { Placed = false; xtrc(2, " About to read: bstream=%p To_Buf=%p Buflen=%d\n", - BStream, To_Buf, Buflen); + Stream, To_Buf, Buflen); // Read the prefix giving the row length - if (!fread(&Recsize, sizeof(size_t), 1, BStream)) { - if (!feof(BStream)) { + if (!fread(&Recsize, sizeof(size_t), 1, Stream)) { + if (!feof(Stream)) { strcpy(g->Message, "Error reading line prefix\n"); return RC_FX; } else @@ -1838,12 +1846,12 @@ int BINFAM::ReadBuffer(PGLOBAL g) { return RC_FX; } // endif Recsize - if (fread(To_Buf, Recsize, 1, BStream)) { + if (fread(To_Buf, Recsize, 1, Stream)) { xtrc(2, " Read: To_Buf=%p Recsize=%zd\n", To_Buf, Recsize); // memcpy(Tdbp->GetLine(), To_Buf, Recsize); num_read++; rc = RC_OK; - } else if (feof(BStream)) { + } else if (feof(Stream)) { rc = RC_EF; } else { #if defined(__WIN__) @@ -1863,23 +1871,24 @@ int BINFAM::ReadBuffer(PGLOBAL g) { /***********************************************************************/ /* WriteBuffer: File write routine for BIN access method. */ /***********************************************************************/ -int BINFAM::WriteBuffer(PGLOBAL g) { +int BINFAM::WriteBuffer(PGLOBAL g) +{ int curpos = 0; bool moved = true; /*********************************************************************/ /* Prepare writing the line. */ /*********************************************************************/ - memcpy(To_Buf, Tdbp->GetLine(), Recsize); +//memcpy(To_Buf, Tdbp->GetLine(), Recsize); /*********************************************************************/ /* Now start the writing process. */ /*********************************************************************/ - if (fwrite(&Recsize, sizeof(size_t), 1, BStream) != 1) { + if (fwrite(&Recsize, sizeof(size_t), 1, Stream) != 1) { sprintf(g->Message, "Error %d writing prefix to %s", errno, To_File); return RC_FX; - } else if (fwrite(To_Buf, Recsize, 1, BStream) != 1) { + } else if (fwrite(To_Buf, Recsize, 1, Stream) != 1) { sprintf(g->Message, "Error %d writing %zd bytes to %s", errno, Recsize, To_File); return RC_FX; @@ -1889,24 +1898,153 @@ int BINFAM::WriteBuffer(PGLOBAL g) { return RC_OK; } // end of WriteBuffer +#if 0 +/***********************************************************************/ +/* Data Base delete line routine for DOS and BLK access methods. */ +/***********************************************************************/ +int DOSFAM::DeleteRecords(PGLOBAL g, int irc) +{ + bool moved; + int curpos = ftell(Stream); + + /*********************************************************************/ + /* There is an alternative here: */ + /* 1 - use a temporary file in which are copied all not deleted */ + /* lines, at the end the original file will be deleted and */ + /* the temporary file renamed to the original file name. */ + /* 2 - directly move the not deleted lines inside the original */ + /* file, and at the end erase all trailing records. */ + /* This will be experimented. */ + /*********************************************************************/ + if (trace(1)) + htrc( + "DOS DeleteDB: rc=%d UseTemp=%d curpos=%d Fpos=%d Tpos=%d Spos=%d\n", + irc, UseTemp, curpos, Fpos, Tpos, Spos); + + if (irc != RC_OK) { + /*******************************************************************/ + /* EOF: position Fpos at the end-of-file position. */ + /*******************************************************************/ + fseek(Stream, 0, SEEK_END); + Fpos = ftell(Stream); + + if (trace(1)) + htrc("Fpos placed at file end=%d\n", Fpos); + + } // endif irc + + if (Tpos == Spos) { + /*******************************************************************/ + /* First line to delete, Open temporary file. */ + /*******************************************************************/ + if (UseTemp) { + if (OpenTempFile(g)) + return RC_FX; + + } else { + /*****************************************************************/ + /* Move of eventual preceding lines is not required here. */ + /* Set the target file as being the source file itself. */ + /* Set the future Tpos, and give Spos a value to block copying. */ + /*****************************************************************/ + T_Stream = Stream; + Spos = Tpos = Fpos; + } // endif UseTemp + + } // endif Tpos == Spos + + /*********************************************************************/ + /* Move any intermediate lines. */ + /*********************************************************************/ + if (MoveIntermediateLines(g, &moved)) + return RC_FX; + + if (irc == RC_OK) { + /*******************************************************************/ + /* Reposition the file pointer and set Spos. */ + /*******************************************************************/ + if (!UseTemp || moved) + if (fseek(Stream, curpos, SEEK_SET)) { + sprintf(g->Message, MSG(FSETPOS_ERROR), 0); + return RC_FX; + } // endif + + Spos = GetNextPos(); // New start position + + if (trace(1)) + htrc("after: Tpos=%d Spos=%d\n", Tpos, Spos); + + } else { + /*******************************************************************/ + /* Last call after EOF has been reached. */ + /* The UseTemp case is treated in CloseTableFile. */ + /*******************************************************************/ + if (!UseTemp & !Abort) { + /*****************************************************************/ + /* Because the chsize functionality is only accessible with a */ + /* system call we must close the file and reopen it with the */ + /* open function (_fopen for MS ??) this is still to be checked */ + /* for compatibility with Text files and other OS's. */ + /*****************************************************************/ + char filename[_MAX_PATH]; + int h; // File handle, return code + + PlugSetPath(filename, To_File, Tdbp->GetPath()); + /*rc=*/ PlugCloseFile(g, To_Fb); + + if ((h= global_open(g, MSGID_OPEN_STRERROR, filename, O_WRONLY)) <= 0) + return RC_FX; + + /*****************************************************************/ + /* Remove extra records. */ + /*****************************************************************/ +#if defined(__WIN__) + if (chsize(h, Tpos)) { + sprintf(g->Message, MSG(CHSIZE_ERROR), strerror(errno)); + close(h); + return RC_FX; + } // endif +#else + if (ftruncate(h, (off_t)Tpos)) { + sprintf(g->Message, MSG(TRUNCATE_ERROR), strerror(errno)); + close(h); + return RC_FX; + } // endif +#endif + + close(h); + + if (trace(1)) + htrc("done, h=%d irc=%d\n", h, irc); + + } // endif !UseTemp + + } // endif irc + + return RC_OK; // All is correct +} // end of DeleteRecords +#endif // 0 + /***********************************************************************/ /* Table file close routine for DOS access method. */ /***********************************************************************/ -void BINFAM::CloseTableFile(PGLOBAL g, bool abort) { - int rc; +void BINFAM::CloseTableFile(PGLOBAL g, bool abort) +{ + int rc; - Abort = abort; - rc = PlugCloseFile(g, To_Fb); - xtrc(1, "BIN Close: closing %s rc=%d\n", To_File, rc); - BStream = NULL; // So we can know whether table is open + Abort = abort; + rc = PlugCloseFile(g, To_Fb); + xtrc(1, "BIN Close: closing %s rc=%d\n", To_File, rc); + Stream = NULL; // So we can know whether table is open } // end of CloseTableFile /***********************************************************************/ /* Rewind routine for BIN access method. */ /***********************************************************************/ -void BINFAM::Rewind(void) { - if (BStream) // Can be NULL when making index on void table - rewind(BStream); +void BINFAM::Rewind(void) +{ + if (Stream) // Can be NULL when making index on void table + rewind(Stream); Rows = 0; OldBlk = CurBlk = -1; diff --git a/storage/connect/filamtxt.h b/storage/connect/filamtxt.h index 8c1fe5e7dbc..e5067b5a3e0 100644 --- a/storage/connect/filamtxt.h +++ b/storage/connect/filamtxt.h @@ -215,16 +215,16 @@ class DllExport BLKFAM : public DOSFAM { /* This is the DOS/UNIX Access Method class declaration for binary */ /* files with variable record format (BJSON) */ /***********************************************************************/ -class DllExport BINFAM : public TXTFAM { +class DllExport BINFAM : public DOSFAM { public: // Constructor - BINFAM(PDOSDEF tdp) : TXTFAM(tdp) {BStream = NULL; Recsize = 0;} - BINFAM(PBINFAM txfp) : TXTFAM(txfp) {BStream = txfp->BStream;} + BINFAM(PDOSDEF tdp) : DOSFAM(tdp) {Recsize = 0;} + BINFAM(PBINFAM txfp) : DOSFAM(txfp) {Recsize = txfp->Recsize;} // Implementation virtual AMT GetAmType(void) {return TYPE_AM_BIN;} - virtual int GetPos(void); - virtual int GetNextPos(void); +//virtual int GetPos(void); +//virtual int GetNextPos(void); virtual PTXF Duplicate(PGLOBAL g) { return (PTXF)new(g) BINFAM(this); } // Methods @@ -233,23 +233,22 @@ public: virtual int Cardinality(PGLOBAL g); virtual int MaxBlkSize(PGLOBAL g, int s) {return s;} virtual bool AllocateBuffer(PGLOBAL g); - virtual int GetRowID(void); - virtual bool RecordPos(PGLOBAL g); - virtual bool SetPos(PGLOBAL g, int recpos); +//virtual int GetRowID(void); +//virtual bool RecordPos(PGLOBAL g); +//virtual bool SetPos(PGLOBAL g, int recpos); virtual int SkipRecord(PGLOBAL g, bool header) {return 0;} - virtual bool OpenTableFile(PGLOBAL g); +//virtual bool OpenTableFile(PGLOBAL g); virtual int ReadBuffer(PGLOBAL g); virtual int WriteBuffer(PGLOBAL g); - virtual int DeleteRecords(PGLOBAL g, int irc) {return RC_FX;} +//virtual int DeleteRecords(PGLOBAL g, int irc); virtual void CloseTableFile(PGLOBAL g, bool abort); virtual void Rewind(void); -protected: +//protected: //virtual int InitDelete(PGLOBAL g, int fpos, int spos); // Members - FILE *BStream; // Points to Bin file structure - size_t Recsize; // Length of last read record + size_t Recsize; // Length of last read or next written record }; // end of class BINFAM #endif // __FILAMTXT_H diff --git a/storage/connect/tabbson.cpp b/storage/connect/tabbson.cpp index 90a49aac1d5..c1647604b63 100644 --- a/storage/connect/tabbson.cpp +++ b/storage/connect/tabbson.cpp @@ -1012,7 +1012,7 @@ PBVAL BCUTIL::GetRow(PGLOBAL g) PBVAL nwr, row = Tp->Row; for (int i = 0; i < nod && row; i++) { - if (nodes[i + 1].Op == OP_XX) + if (i < nod-1 && nodes[i+1].Op == OP_XX) break; else switch (row->Type) { case TYPE_JOB: @@ -1411,29 +1411,31 @@ int TDBBSN::EstimatedLength(void) /***********************************************************************/ bool TDBBSN::OpenDB(PGLOBAL g) { + TUSE use = Use; + + if (Pretty < 0 && Mode == MODE_UPDATE) { + sprintf(g->Message, "Mode %d NIY for Bjson", Mode); + return true; + } // endif Mode + if (Use == USE_OPEN) { /*******************************************************************/ - /* Table already open replace it at its beginning. */ + /* Table already open replace it at its beginning. ??? */ /*******************************************************************/ Fpos = -1; NextSame = 0; SameRow = 0; - } else { - /*******************************************************************/ - /* First opening. */ - /*******************************************************************/ - if (Mode == MODE_INSERT) - switch (Jmode) { - case MODE_OBJECT: Row = Bp->NewVal(TYPE_JOB); break; - case MODE_ARRAY: Row = Bp->NewVal(TYPE_JAR); break; - case MODE_VALUE: Row = Bp->NewVal(TYPE_JVAL); break; - default: - sprintf(g->Message, "Invalid Jmode %d", Jmode); - return true; - } // endswitch Jmode - } // endif Use + /*********************************************************************/ + /* Open according to logical input/output mode required. */ + /*********************************************************************/ + if (TDBDOS::OpenDB(g)) + return true; + + if (use == USE_OPEN) + return false; + if (Pretty < 0) { /*******************************************************************/ /* Binary BJSON table. */ @@ -1441,45 +1443,45 @@ bool TDBBSN::OpenDB(PGLOBAL g) xtrc(1, "JSN OpenDB: tdbp=%p tdb=R%d use=%d mode=%d\n", this, Tdb_No, Use, Mode); - if (Use == USE_OPEN) { - /*******************************************************************/ - /* Table already open, just replace it at its beginning. */ - /*******************************************************************/ - if (!To_Kindex) { - Txfp->Rewind(); // see comment in Work.log - } else // Table is to be accessed through a sorted index table - To_Kindex->Reset(); // TODO: NIY - - return false; - } // endif use - /*********************************************************************/ - /* Open according to logical input/output mode required. */ - /* Use conventionnal input/output functions. */ - /*********************************************************************/ - if (Txfp->OpenTableFile(g)) - return true; - - Use = USE_OPEN; // Do it now in case we are recursively called - - /*********************************************************************/ - /* Lrecl is Ok. */ + /* Lrecl is Ok. */ /*********************************************************************/ size_t linelen = Lrecl; - // Buffer should be the first allocated thing in G->Sarea + // Buffer must be set to G->Sarea Txfp->AllocateBuffer(Bp->G); + + if (Mode == MODE_INSERT) + Bp->SubSet(true); + else + Bp->MemSave(); + To_Line = Txfp->GetBuf(); memset(To_Line, 0, linelen); - Bp->MemSave(); xtrc(1, "OpenJSN: R%hd mode=%d To_Line=%p\n", Tdb_No, Mode, To_Line); - } else if (TDBDOS::OpenDB(g)) - return true; + } // endif Pretty + + /***********************************************************************/ + /* First opening. */ + /***********************************************************************/ + if (Mode == MODE_INSERT) { + switch (Jmode) { + case MODE_OBJECT: Row = Bp->NewVal(TYPE_JOB); break; + case MODE_ARRAY: Row = Bp->NewVal(TYPE_JAR); break; + case MODE_VALUE: Row = Bp->NewVal(TYPE_JVAL); break; + default: + sprintf(g->Message, "Invalid Jmode %d", Jmode); + return true; + } // endswitch Jmode + + Bp->MemSave(); + } // endif Mode if (Xcol) To_Filter = NULL; // Imcompatible return false; + } // end of OpenDB /***********************************************************************/ @@ -1564,26 +1566,30 @@ int TDBBSN::ReadDB(PGLOBAL g) /***********************************************************************/ bool TDBBSN::PrepareWriting(PGLOBAL g) { - PSZ s; + if (Pretty >= 0) { + PSZ s; - if (!(Top = Bp->MakeTopTree(g, Row))) - return true; + if (!(Top = Bp->MakeTopTree(g, Row))) + return true; - if ((s = Bp->SerialVal(g, Top, Pretty))) { - if (Comma) - strcat(s, ","); + if ((s = Bp->SerialVal(g, Top, Pretty))) { + if (Comma) + strcat(s, ","); - if ((signed)strlen(s) > Lrecl) { - strncpy(To_Line, s, Lrecl); - sprintf(g->Message, "Line truncated (lrecl=%d)", Lrecl); - return PushWarning(g, this); - } else - strcpy(To_Line, s); + if ((signed)strlen(s) > Lrecl) { + strncpy(To_Line, s, Lrecl); + sprintf(g->Message, "Line truncated (lrecl=%d)", Lrecl); + return PushWarning(g, this); + } else + strcpy(To_Line, s); - return false; + return false; + } else + return true; } else - return true; - + ((BINFAM*)Txfp)->Recsize = ((size_t)PlugSubAlloc(Bp->G, NULL, 0) + - (size_t)To_Line); + return false; } // end of PrepareWriting /***********************************************************************/ @@ -2034,6 +2040,7 @@ void BSONCOL::WriteColumn(PGLOBAL g) else Cp->AddArrayValue(row, jsp); + break; case TYPE_JOB: if (Nodes[Nod - 1].Key) Cp->SetKeyValue(row, jsp, Nodes[Nod - 1].Key); diff --git a/storage/connect/tabdos.cpp b/storage/connect/tabdos.cpp index a2b5204cd0a..8c57157f5a9 100644 --- a/storage/connect/tabdos.cpp +++ b/storage/connect/tabdos.cpp @@ -2148,6 +2148,9 @@ bool TDBDOS::OpenDB(PGLOBAL g) } // endif use if (Mode == MODE_DELETE && !Next && Txfp->GetAmType() != TYPE_AM_DOS +#if defined(BSON_SUPPORT) + && Txfp->GetAmType() != TYPE_AM_BIN +#endif // BSON_SUPPORT && Txfp->GetAmType() != TYPE_AM_MGO) { // Delete all lines. Not handled in MAP or block mode Txfp = new(g) DOSFAM((PDOSDEF)To_Def); |