diff options
author | Olivier Bertrand <bertrandop@gmail.com> | 2017-07-02 22:29:31 +0200 |
---|---|---|
committer | Olivier Bertrand <bertrandop@gmail.com> | 2017-07-02 22:41:11 +0200 |
commit | 94e5d7de85a2964e7dd9a02e91a71522ccc66f94 (patch) | |
tree | 45164d08bff5a905f88da27762fcf3c089299cd7 /storage | |
parent | c51548d6b4234d1b9bf7548125f63c5975f37d35 (diff) | |
download | mariadb-git-94e5d7de85a2964e7dd9a02e91a71522ccc66f94.tar.gz |
- Add Support of the MongoDB Java Driver.
modified: storage/connect/CMakeLists.txt
modified: storage/connect/JavaWrappers.jar
modified: storage/connect/colblk.h
modified: storage/connect/filter.cpp
modified: storage/connect/filter.h
modified: storage/connect/ha_connect.cc
modified: storage/connect/ha_connect.h
modified: storage/connect/jdbccat.h
modified: storage/connect/jdbconn.cpp
modified: storage/connect/jdbconn.h
modified: storage/connect/mongofam.cpp
modified: storage/connect/mongofam.h
modified: storage/connect/mycat.cc
modified: storage/connect/mycat.h
modified: storage/connect/tabext.h
modified: storage/connect/tabjdbc.cpp
modified: storage/connect/tabjdbc.h
modified: storage/connect/tabjson.cpp
modified: storage/connect/tabjson.h
modified: storage/connect/tabmgo.cpp
modified: storage/connect/tabmgo.h
created: storage/connect/Mongo2Interface.java
created: storage/connect/Mongo3Interface.java
created: storage/connect/cmgoconn.cpp
created: storage/connect/cmgoconn.h
created: storage/connect/javaconn.cpp
created: storage/connect/javaconn.h
created: storage/connect/jmgfam.cpp
created: storage/connect/jmgfam.h
created: storage/connect/jmgoconn.cpp
created: storage/connect/jmgoconn.h
created: storage/connect/mongo.cpp
created: storage/connect/mongo.h
created: storage/connect/tabjmg.cpp
created: storage/connect/tabjmg.h
- tdbp not initialized when catched exception
in CntGetTDB (connect.cc line 188)
modified: storage/connect/connect.h
- CheckCleanup should sometimes doing cleanup on pure info
Sometimes MariaDB loops on info to get the size of all tables in a database.
This can sometimes fail by exhausted memory.
CheckCleanup now have a force boolean parameter (defaulting to false)
modified: storage/connect/ha_connect.cc
modified: storage/connect/user_connect.cc
modified: storage/connect/user_connect.h
Change the copyright of some source files
modified: storage/connect/connect.cc
modified: storage/connect/connect.h
modified: storage/connect/engmsg.h
modified: storage/connect/global.h
modified: storage/connect/ha_connect.cc
modified: storage/connect/ha_connect.h
modified: storage/connect/msgid.h
modified: storage/connect/mycat.cc
modified: storage/connect/mycat.h
modified: storage/connect/os.h
modified: storage/connect/osutil.c
modified: storage/connect/osutil.h
modified: storage/connect/user_connect.cc
modified: storage/connect/user_connect.h
Diffstat (limited to 'storage')
46 files changed, 5400 insertions, 2477 deletions
diff --git a/storage/connect/CMakeLists.txt b/storage/connect/CMakeLists.txt index 141117a6412..248764879ed 100644 --- a/storage/connect/CMakeLists.txt +++ b/storage/connect/CMakeLists.txt @@ -245,7 +245,7 @@ int main() { ENDIF(CONNECT_WITH_ODBC) # -# JDBC +# JDBC and MongoDB Java Driver # IF(APPLE) OPTION(CONNECT_WITH_JDBC "Compile CONNECT storage engine without JDBC support" OFF) @@ -262,9 +262,13 @@ IF(CONNECT_WITH_JDBC) INCLUDE_DIRECTORIES(${JAVA_INCLUDE_PATH2}) # SET(JDBC_LIBRARY ${JAVA_JVM_LIBRARY}) will be dynamically linked SET(CONNECT_SOURCES ${CONNECT_SOURCES} - jdbconn.cpp tabjdbc.cpp jdbconn.h tabjdbc.h jdbccat.h + javaconn.cpp jdbconn.cpp tabjdbc.cpp + jmgoconn.cpp jmgfam.cpp mongo.cpp tabjmg.cpp + jdbccat.h javaconn.h jdbconn.h tabjdbc.h + jmgoconn.h jmgfam.h mongo.h tabjmg.h JdbcInterface.java ApacheInterface.java MariadbInterface.java MysqlInterface.java OracleInterface.java PostgresqlInterface.java + Mongo2Interface.java Mongo3Interface.java JavaWrappers.jar) # TODO: Find how to compile and install the java wrapper classes # Find required libraries and include directories @@ -292,7 +296,7 @@ IF(CONNECT_WITH_ZIP) ENDIF(CONNECT_WITH_ZIP) # -# MONGO (CMAKE NOT YET WORKING) +# MONGO C Driver (CMAKE NOT YET WORKING) # #OPTION(CONNECT_WITH_MONGO "Compile CONNECT storage engine with MONGO support" ON) @@ -302,16 +306,21 @@ ENDIF(CONNECT_WITH_ZIP) # # Adding some typical places to search in # SET(PC_MONGO_INCLUDE_DIRS # C:/mongo-c-driver/include -# D:/mongo-c-driver/include) +# D:/mongo-c-driver/include) # SET(PC_MONGO_LIBRARY_DIRS # C:/mongo-c-driver/lib -# D:/mongo-c-driver/lib) +# D:/mongo-c-driver/lib) # ENDIF(WIN32) # FIND_PACKAGE(libmongoc) # IF (MONGO_FOUND) # INCLUDE_DIRECTORIES(${MONGO_INCLUDE_DIR}) # SET(MONGO_LIBRARY ${MONGO_LIBRARIES}) -# SET(CONNECT_SOURCES ${CONNECT_SOURCES} mongofam.cpp mongofam.h) +# SET(CONNECT_SOURCES ${CONNECT_SOURCES} +# cmgoconn.cpp mongofam.cpp tabmgo.cpp +# cmgoconn.h mongofam.h tabmgo.h) +# IF (NOT JAVA_FOUND AND JNI_FOUND) +# SET(CONNECT_SOURCES ${CONNECT_SOURCES} mongo.cpp mongo.h) +# ENDIF (NOT JAVA_FOUND AND JNI_FOUND) # add_definitions(-DMONGO_SUPPORT) # ENDIF(MONGO_FOUND) #ENDIF(CONNECT_WITH_MONGO) @@ -339,3 +348,4 @@ MYSQL_ADD_PLUGIN(connect ${CONNECT_SOURCES} LINK_LIBRARIES ${ZLIB_LIBRARY} ${XML_LIBRARY} ${ICONV_LIBRARY} ${ODBC_LIBRARY} ${JDBC_LIBRARY} ${IPHLPAPI_LIBRARY}) + diff --git a/storage/connect/JavaWrappers.jar b/storage/connect/JavaWrappers.jar Binary files differindex 8c01c364a3f..dec90967039 100644 --- a/storage/connect/JavaWrappers.jar +++ b/storage/connect/JavaWrappers.jar diff --git a/storage/connect/Mongo2Interface.java b/storage/connect/Mongo2Interface.java new file mode 100644 index 00000000000..106dd4a4d63 --- /dev/null +++ b/storage/connect/Mongo2Interface.java @@ -0,0 +1,437 @@ +package wrappers; + +import java.util.Date; +import java.util.List; +import java.util.Set; + +import com.mongodb.AggregationOptions; +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.Cursor; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.MongoException; +import com.mongodb.WriteConcernException; +import com.mongodb.WriteResult; +import com.mongodb.util.JSON; + +public class Mongo2Interface { + boolean DEBUG = false; + String Errmsg = "No error"; + Set<String> Colnames = null; + Cursor cursor = null; + MongoClient client = null; + DB db = null; + DBCollection coll = null; + BasicDBObject doc = null; + BasicDBObject dbq = null; + BasicDBObject dbf = null; + List<DBObject> pip = null; + AggregationOptions aop = null; + + // === Constructors/finalize ========================================= + public Mongo2Interface() { + this(false); + } // end of default constructor + + public Mongo2Interface(boolean b) { + DEBUG = b; + } // end of constructor + + protected void SetErrmsg(String str) { + if (DEBUG) + System.out.println(str); + + Errmsg = str; + } // end of SetErrmsg + + protected void SetErrmsg(Exception e) { + if (DEBUG) + System.out.println(e.getMessage()); + + Errmsg = e.toString(); + } // end of SetErrmsg + + public String GetErrmsg() { + String err = Errmsg; + + Errmsg = "No error"; + return err; + } // end of GetErrmsg + + public int MongoConnect(String[] parms) { + int rc = 0; + + if (DEBUG) + System.out.println("Mongo2: URI=" + parms[0] + " DB=" + parms[1]); + + try { + MongoClientURI uri = new MongoClientURI(parms[0]); + + client = new MongoClient(uri); + + if (DEBUG) + System.out.println("Connection " + client.toString() + " established"); + + // Now connect to your databases + db = client.getDB(parms[1]); + + if (parms[2] != null && !parms[2].isEmpty()) { + if (DEBUG) + System.out.println("user=" + parms[2] + " pwd=" + parms[3]); + + @SuppressWarnings("deprecation") + boolean auth = db.authenticate(parms[2], parms[3].toCharArray()); + + if (DEBUG) + System.out.println("Authentication: " + auth); + + } // endif user + + } catch (MongoException me) { + SetErrmsg(me); + rc = -1; + } catch (Exception e) { + SetErrmsg(e); + rc = -3; + } // end try/catch + + return rc; + } // end of MongoConnect + + public int MongoDisconnect() { + int rc = 0; + + try { + if (cursor != null) { + if (DEBUG) + System.out.println("Closing cursor"); + + cursor.close(); + cursor = null; + } // endif client + + if (client != null) { + if (DEBUG) + System.out.println("Closing connection"); + + client.close(); + client = null; + } // endif client + + } catch (MongoException se) { + SetErrmsg(se); + rc += 8; + } // end try/catch + + return rc; + } // end of MongoDisconnect + + public boolean GetCollection(String name) { + if (DEBUG) + System.out.println("GetCollection: name=" + name); + + try { + coll = db.getCollection(name); + } catch (Exception e) { + SetErrmsg(e); + return true; + } // end try/catch + + return false; + } // end of GetCollection + + public long GetCollSize() { + return (coll != null) ? coll.count() : 0; + } // end of GetCollSize + + public boolean FindColl(String query, String fields) { + if (DEBUG) + System.out.println("FindColl: query=" + query + " fields=" + fields); + + try { + if (query != null || fields != null) { + dbq = (BasicDBObject) JSON.parse((query != null) ? query : "{}"); + + if (fields != null) { + dbf = (BasicDBObject) JSON.parse(fields); + cursor = coll.find(dbq, dbf); + } else + cursor = coll.find(dbq); + + } else + cursor = coll.find(); + + } catch (Exception e) { + SetErrmsg(e); + return true; + } // end try/catch + + return false; + } // end of FindColl + + @SuppressWarnings("unchecked") + public boolean AggregateColl(String pipeline) { + if (DEBUG) + System.out.println("AggregateColl: pipeline=" + pipeline); + + try { + DBObject pipe = (DBObject) JSON.parse(pipeline); + + pip = (List<DBObject>) pipe.get("pipeline"); + aop = AggregationOptions.builder().batchSize(0).allowDiskUse(true) + .outputMode(AggregationOptions.OutputMode.CURSOR).build(); + cursor = coll.aggregate(pip, aop); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } // end try/catch + + return false; + } // end of AggregateColl + + public boolean Rewind() { + if (cursor != null) + cursor.close(); + + if (pip == null) { + if (dbf != null) + cursor = coll.find(dbq, dbf); + else if (dbq != null) + cursor = coll.find(dbq); + else + cursor = coll.find(); + + } else + cursor = coll.aggregate(pip, aop); + + return (cursor == null); + } // end of Rewind + + public int ReadNext() { + try { + if (cursor.hasNext()) { + doc = (BasicDBObject) cursor.next(); + + if (DEBUG) + System.out.println("Class doc = " + doc.getClass()); + + Colnames = doc.keySet(); + return 1; + } else + return 0; + + } catch (MongoException me) { + SetErrmsg(me); + return -1; + } // end try/catch + + } // end of ReadNext + + public boolean Fetch(int row) { + if (cursor.hasNext()) { + doc = (BasicDBObject) cursor.next(); + Colnames = doc.keySet(); + return true; + } else + return false; + + } // end of Fetch + + public String GetDoc() { + return (doc != null) ? doc.toString() : null; + } // end of GetDoc + + public Set<String> GetColumns() { + if (doc != null) + return doc.keySet(); + else + return null; + + } // end of GetColumns + + public String ColumnDesc(int n, int[] val) { + // if (rsmd == null) { + // System.out.println("No result metadata"); + // return null; + // } else try { + // val[0] = rsmd.getColumnType(n); + // val[1] = rsmd.getPrecision(n); + // val[2] = rsmd.getScale(n); + // val[3] = rsmd.isNullable(n); + // return rsmd.getColumnLabel(n); + // } catch (SQLException se) { + // SetErrmsg(se); + // } //end try/catch + + return null; + } // end of ColumnDesc + + protected Object GetFieldObject(String path) { + Object o = null; + BasicDBObject dob = null; + BasicDBList lst = null; + String[] names = null; + + if (path == null || path.equals("*")) + return doc; + else if (doc instanceof BasicDBObject) + dob = doc; + // else if (o instanceof BasicDBList) + // lst = (BasicDBList) doc; + else + return doc; + + try { + names = path.split("\\."); + + for (String name : names) { + if (lst != null) { + o = lst.get(Integer.parseInt(name)); + } else + o = dob.get(name); + + if (o == null) + break; + + if (DEBUG) + System.out.println("Class o = " + o.getClass()); + + if (o instanceof BasicDBObject) { + dob = (BasicDBObject) o; + lst = null; + } else if (o instanceof BasicDBList) { + lst = (BasicDBList) o; + } else + break; + + } // endfor name + + } catch (IndexOutOfBoundsException x) { + o = null; + } catch (MongoException se) { + SetErrmsg(se); + o = null; + } // end try/catch + + return o; + } // end of GetFieldObject + + public String GetField(String path) { + Object o = GetFieldObject(path); + + if (o != null) { + if (o instanceof Date) { + Integer TS = (int) (((Date) o).getTime() / 1000); + return TS.toString(); + } // endif Date + + return o.toString(); + } else + return null; + + } // end of GetField + + public Object MakeDocument() { + return new BasicDBObject(); + } // end of MakeDocument + + public boolean DocAdd(Object bdc, String key, Object val) { + try { + ((BasicDBObject) bdc).append(key, val); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } // end try/catch + + return false; + } // end of DocAdd + + public Object MakeArray() { + return new BasicDBList(); + } // end of MakeArray + + public boolean ArrayAdd(Object bar, int n, Object val) { + try { + ((BasicDBList) bar).put(n, val); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } catch (Exception ex) { + SetErrmsg(ex); + return true; + } // end try/catch + + return false; + } // end of ArrayAdd + + public boolean CollInsert(Object dob) { + try { + coll.insert((BasicDBObject) dob); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } catch (Exception ex) { + SetErrmsg(ex); + return true; + } // end try/catch + + return false; + } // end of CollInsert + + public long CollUpdate(Object upd) { + long n = -1; + + if (DEBUG) + System.out.println("upd: " + upd.toString()); + + try { + DBObject qry = new BasicDBObject("_id", doc.get("_id")); + + WriteResult res = coll.update(qry, (DBObject) upd); + + if (DEBUG) + System.out.println("CollUpdate: " + res.toString()); + + n = res.getN(); + } catch (MongoException me) { + SetErrmsg(me); + } catch (Exception ex) { + SetErrmsg(ex); + } // end try/catch + + return n; + } // end of CollUpdate + + public long CollDelete(boolean all) { + long n = -1; + + try { + WriteResult res; + BasicDBObject qry = new BasicDBObject(); + + if (!all) + qry.append("_id", doc.get("_id")); + + res = coll.remove(qry); + + if (DEBUG) + System.out.println("CollDelete: " + res.toString()); + + n = res.getN(); + } catch (WriteConcernException wx) { + SetErrmsg(wx); + } catch (MongoException me) { + SetErrmsg(me); + } catch (UnsupportedOperationException ux) { + SetErrmsg(ux); + n = 0; + } // end try/catch + + return n; + } // end of CollDelete + +} // end of class MongoInterface diff --git a/storage/connect/Mongo3Interface.java b/storage/connect/Mongo3Interface.java new file mode 100644 index 00000000000..f587c01b391 --- /dev/null +++ b/storage/connect/Mongo3Interface.java @@ -0,0 +1,504 @@ +package wrappers; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Set; + +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonNull; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.MongoException; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import com.mongodb.client.result.DeleteResult; +import com.mongodb.client.result.UpdateResult; + +public class Mongo3Interface { + boolean DEBUG = false; + String Errmsg = "No error"; + Set<String> Colnames = null; + MongoClient client = null; + MongoDatabase db = null; + MongoCollection<BsonDocument> coll = null; + FindIterable<BsonDocument> finditer = null; + AggregateIterable<BsonDocument> aggiter = null; + MongoCursor<BsonDocument> cursor = null; + BsonDocument doc = null; + BsonDocument util = null; + BsonNull bsonull = new BsonNull(); + + // === Constructors/finalize ========================================= + public Mongo3Interface() { + this(false); + } // end of default constructor + + public Mongo3Interface(boolean b) { + DEBUG = b; + } // end of constructor + + protected void SetErrmsg(String str) { + if (DEBUG) + System.out.println(str); + + Errmsg = str; + } // end of SetErrmsg + + protected void SetErrmsg(Exception e) { + if (DEBUG) + System.out.println(e.getMessage()); + + Errmsg = e.toString(); + } // end of SetErrmsg + + public String GetErrmsg() { + String err = Errmsg; + + Errmsg = "No error"; + return err; + } // end of GetErrmsg + + public int MongoConnect(String[] parms) { + int rc = 0; + + if (DEBUG) + System.out.println("Mongo3: URI=" + parms[0] + " DB=" + parms[1]); + + try { + MongoClientURI uri = new MongoClientURI(parms[0]); + + client = new MongoClient(uri); + + if (DEBUG) + System.out.println("Connection " + client.toString() + " established"); + + // Now connect to your databases + db = client.getDatabase(parms[1]); + + // if (parms[2] != null && !parms[2].isEmpty()) { + // if (DEBUG) + // System.out.println("user=" + parms[2] + " pwd=" + parms[3]); + + // @SuppressWarnings("deprecation") + // boolean auth = db.authenticate(parms[2], parms[3].toCharArray()); + + // if (DEBUG) + // System.out.println("Authentication: " + auth); + + // } // endif user + + } catch (MongoException me) { + SetErrmsg(me); + rc = -1; + } catch (Exception e) { + SetErrmsg(e); + rc = -3; + } // end try/catch + + return rc; + } // end of MongoConnect + + public int MongoDisconnect() { + int rc = 0; + + try { + if (cursor != null) { + if (DEBUG) + System.out.println("Closing cursor"); + + cursor.close(); + cursor = null; + } // endif client + + if (client != null) { + if (DEBUG) + System.out.println("Closing connection"); + + client.close(); + client = null; + } // endif client + + } catch (MongoException se) { + SetErrmsg(se); + rc += 8; + } // end try/catch + + return rc; + } // end of MongoDisconnect + + public boolean GetCollection(String name) { + if (DEBUG) + System.out.println("GetCollection: name=" + name); + + try { + coll = db.getCollection(name).withDocumentClass(BsonDocument.class); + } catch (Exception e) { + SetErrmsg(e); + return true; + } // end try/catch + + return false; + } // end of GetCollection + + public long GetCollSize() { + return (coll != null) ? coll.count() : 0; + } // end of GetCollSize + + public boolean FindColl(String query, String fields) { + if (DEBUG) + System.out.println("FindColl: query=" + query + " fields=" + fields); + + try { + if (query != null) { + Bson dbq = Document.parse((query != null) ? query : "{}"); + finditer = coll.find(dbq); + } else + finditer = coll.find(); + + if (fields != null) { + Bson dbf = BsonDocument.parse(fields); + finditer = finditer.projection(dbf); + } // endif fields + + cursor = finditer.iterator(); + } catch (Exception e) { + SetErrmsg(e); + return true; + } // end try/catch + + return false; + } // end of FindColl + + @SuppressWarnings("unchecked") + public boolean AggregateColl(String pipeline) { + if (DEBUG) + System.out.println("AggregateColl: pipeline=" + pipeline); + + try { + Document pipe = Document.parse(pipeline); + ArrayList<?> pip = (ArrayList<?>) pipe.get("pipeline"); + + aggiter = coll.aggregate((List<? extends Bson>) pip); + cursor = aggiter.iterator(); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } // end try/catch + + return false; + } // end of AggregateColl + + public boolean Rewind() { + if (cursor != null) + cursor.close(); + + if (finditer != null) + cursor = finditer.iterator(); + else if (aggiter != null) + cursor = aggiter.iterator(); + + return (cursor == null); + } // end of Rewind + + public int ReadNext() { + if (cursor.hasNext()) { + doc = cursor.next(); + + if (DEBUG) + System.out.println("Class doc = " + doc.getClass()); + + Colnames = doc.keySet(); + return 1; + } else + return 0; + + } // end of ReadNext + + public boolean Fetch(int row) { + if (cursor.hasNext()) { + doc = cursor.next(); + Colnames = doc.keySet(); + return true; + } else + return false; + + } // end of Fetch + + public String GetDoc() { + return (doc != null) ? doc.toJson() : null; + } // end of GetDoc + + public Set<String> GetColumns() { + if (doc != null) + return doc.keySet(); + else + return null; + + } // end of GetColumns + + public String ColumnName(int n) { + int i = 1; + + for (String name : Colnames) + if (i++ == n) + return name; + + return null; + } // end of ColumnName + + public int ColumnType(int n, String name) { + // if (rsmd == null) { + // System.out.println("No result metadata"); + // } else try { + // if (n == 0) + // n = rs.findColumn(name); + + // return rsmd.getColumnType(n); + // } catch (SQLException se) { + // SetErrmsg(se); + // } //end try/catch + + return 666; // Not a type + } // end of ColumnType + + public String ColumnDesc(int n, int[] val) { + // if (rsmd == null) { + // System.out.println("No result metadata"); + // return null; + // } else try { + // val[0] = rsmd.getColumnType(n); + // val[1] = rsmd.getPrecision(n); + // val[2] = rsmd.getScale(n); + // val[3] = rsmd.isNullable(n); + // return rsmd.getColumnLabel(n); + // } catch (SQLException se) { + // SetErrmsg(se); + // } //end try/catch + + return null; + } // end of ColumnDesc + + protected BsonValue GetFieldObject(String path) { + BsonValue o = doc; + BsonDocument dob = null; + BsonArray ary = null; + String[] names = null; + + if (path == null || path.equals("*")) + return doc; + else if (o instanceof BsonDocument) + dob = doc; + else if (o instanceof BsonArray) + ary = (BsonArray) o; + else + return doc; + + try { + names = path.split("\\."); + + for (String name : names) { + if (ary != null) { + o = ary.get(Integer.parseInt(name)); + } else + o = dob.get(name); + + if (o == null) + break; + + if (DEBUG) + System.out.println("Class o = " + o.getClass()); + + if (o instanceof BsonDocument) { + dob = (BsonDocument) o; + ary = null; + } else if (o instanceof BsonArray) { + ary = (BsonArray) o; + } else + break; + + } // endfor name + + } catch (IndexOutOfBoundsException x) { + o = null; + } catch (MongoException me) { + SetErrmsg(me); + o = null; + } // end try/catch + + return o; + } // end of GetFieldObject + + public String GetField(String path) { + BsonValue o = GetFieldObject(path); + + if (o != null) { + if (o.isString()) { + return o.asString().getValue(); + } else if (o.isInt32()) { + return Integer.toString(o.asInt32().getValue()); + } else if (o.isInt64()) { + return Long.toString(o.asInt64().getValue()); + } else if (o.isObjectId()) { + return o.asObjectId().getValue().toString(); + } else if (o.isDateTime()) { + Integer TS = (int) (o.asDateTime().getValue() / 1000); + return TS.toString(); + } else if (o.isDouble()) { + return Double.toString(o.asDouble().getValue()); + } else if (o.isDocument()) { + return o.asDocument().toJson(); + } else if (o.isArray()) { + util = new BsonDocument("arr", o.asArray()); + String s = util.toJson(); + int i1 = s.indexOf('['); + int i2 = s.lastIndexOf(']'); + return s.substring(i1, i2 + 1); + } else if (o.isNull()) { + return null; + } else + return o.toString(); + + } else + return null; + + } // end of GetField + + protected BsonValue ObjToBson(Object val) { + BsonValue bval = null; + + if (val == null) + bval = bsonull; + else if (val.getClass() == String.class) + bval = new BsonString((String) val); + else if (val.getClass() == Integer.class) + bval = new BsonInt32((int) val); + else if (val.getClass() == Double.class) + bval = new BsonDouble((double) val); + else if (val.getClass() == BigInteger.class) + bval = new BsonInt64((long) val); + else if (val.getClass() == Boolean.class) + bval = new BsonBoolean((Boolean) val); + else if (val.getClass() == Date.class) + bval = new BsonDateTime(((Date) val).getTime() * 1000); + else if (val.getClass() == BsonDocument.class) + bval = (BsonDocument) val; + else if (val.getClass() == BsonArray.class) + bval = (BsonArray) val; + + return bval; + } // end of ObjToBson + + public Object MakeDocument() { + return new BsonDocument(); + } // end of MakeDocument + + public boolean DocAdd(Object bdc, String key, Object val) { + try { + ((BsonDocument) bdc).append(key, ObjToBson(val)); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } // end try/catch + + return false; + } // end of DocAdd + + public Object MakeArray() { + return new BsonArray(); + } // end of MakeArray + + public boolean ArrayAdd(Object bar, int n, Object val) { + try { + for (int i = ((BsonArray) bar).size(); i < n; i++) + ((BsonArray) bar).add(bsonull); + + ((BsonArray) bar).add(ObjToBson(val)); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } catch (Exception ex) { + SetErrmsg(ex); + return true; + } // end try/catch + + return false; + } // end of ArrayAdd + + public boolean CollInsert(Object dob) { + try { + coll.insertOne((BsonDocument) dob); + } catch (MongoException me) { + SetErrmsg(me); + return true; + } catch (Exception ex) { + SetErrmsg(ex); + return true; + } // end try/catch + + return false; + } // end of CollInsert + + public long CollUpdate(Object upd) { + long n = -1; + + if (DEBUG) + System.out.println("upd: " + upd.toString()); + + try { + UpdateResult res = coll.updateOne(Filters.eq("_id", doc.get("_id")), (Bson) upd); + + if (DEBUG) + System.out.println("CollUpdate: " + res.toString()); + + n = res.getModifiedCount(); + } catch (MongoException me) { + SetErrmsg(me); + } catch (Exception ex) { + SetErrmsg(ex); + } // end try/catch + + return n; + } // end of CollUpdate + + public long CollDelete(boolean all) { + long n = -1; + + try { + DeleteResult res; + + if (all) + res = coll.deleteMany(new Document()); + else + res = coll.deleteOne(Filters.eq("_id", doc.get("_id"))); + + if (DEBUG) + System.out.println("CollDelete: " + res.toString()); + + n = res.getDeletedCount(); + } catch (MongoException me) { + SetErrmsg(me); + } catch (Exception ex) { + SetErrmsg(ex); + } // end try/catch + + return n; + } // end of CollDelete + +} // end of class MongoInterface diff --git a/storage/connect/cmgoconn.cpp b/storage/connect/cmgoconn.cpp new file mode 100644 index 00000000000..8367708d4ac --- /dev/null +++ b/storage/connect/cmgoconn.cpp @@ -0,0 +1,888 @@ +/************ CMgoConn C++ Functions Source Code File (.CPP) ***********/ +/* Name: CMgoConn.CPP Version 1.0 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the MongoDB C connection classes functions. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> + +/***********************************************************************/ +/* Required objects includes. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "colblk.h" +#include "xobject.h" +#include "xtable.h" +#include "filter.h" +#include "cmgoconn.h" + +bool IsNum(PSZ s); + +// Required to initialize libmongoc's internals +void mongo_init(bool init) +{ + if (init) + mongoc_init(); + else + mongoc_cleanup(); + +} // end of mongo_init + +/* --------------------------- Class INCOL --------------------------- */ + +/***********************************************************************/ +/* Add a column in the column list. */ +/***********************************************************************/ +void INCOL::AddCol(PGLOBAL g, PCOL colp, char *jp) +{ + char *p; + PKC kp, kcp; + + if ((p = strchr(jp, '.'))) { + PINCOL icp; + + *p++ = 0; + + for (kp = Klist; kp; kp = kp->Next) + if (kp->Incolp && !strcmp(jp, kp->Key)) + break; + + if (!kp) { + icp = new(g) INCOL(IsNum(p)); + kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); + kcp->Next = NULL; + kcp->Incolp = icp; + kcp->Colp = NULL; + kcp->Key = PlugDup(g, jp); + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } else + icp = kp->Incolp; + + *(p - 1) = '.'; + icp->AddCol(g, colp, p); + } else { + kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); + + kcp->Next = NULL; + kcp->Incolp = NULL; + kcp->Colp = colp; + kcp->Key = jp; + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } // endif jp + +} // end of AddCol + +/* -------------------------- Class CMgoConn ------------------------- */ + +/***********************************************************************/ +/* Implementation of the CMgoConn class. */ +/***********************************************************************/ +CMgoConn::CMgoConn(PGLOBAL g, PCPARM pcg) +{ + Pcg = pcg; + Uri = NULL; + Pool = NULL; + Client = NULL; + Database = NULL; + Collection = NULL; + Cursor = NULL; + Query = NULL; + Opts = NULL; + Fpc = NULL; + m_Connected = false; +} // end of CMgoConn standard constructor + +/***********************************************************************/ +/* Connect to the MongoDB server and get the collection. */ +/***********************************************************************/ +bool CMgoConn::Connect(PGLOBAL g) +{ + Uri = mongoc_uri_new(Pcg->Uristr); + + if (!Uri) { + sprintf(g->Message, "Failed to parse URI: \"%s\"", Pcg->Uristr); + return true; + } // endif Uri + + // Create a new client pool instance + Pool = mongoc_client_pool_new(Uri); + mongoc_client_pool_set_error_api(Pool, 2); + + // Register the application name so we can track it in the profile logs + // on the server. This can also be done from the URI. + mongoc_client_pool_set_appname(Pool, "Connect"); + + // Create a new client instance + Client = mongoc_client_pool_pop(Pool); + + if (!Client) { + sprintf(g->Message, "Failed to get Client"); + return true; + } // endif Client + + // Get a handle on the collection Coll_name + Collection = mongoc_client_get_collection(Client, Pcg->Db_name, Pcg->Coll_name); + + if (!Collection) { + sprintf(g->Message, "Failed to get Collection %s.%s", + Pcg->Db_name, Pcg->Coll_name); + return true; + } // endif Collection + + m_Connected = true; + return false; +} // end of Connect + +/***********************************************************************/ +/* CollSize: returns the number of documents in the collection. */ +/***********************************************************************/ +int CMgoConn::CollSize(PGLOBAL g) +{ + int cnt; + bson_t *query; + const char *jf = NULL; + + if (Pcg->Pipe) + return 10; + else if (Pcg->Filter) + jf = Pcg->Filter; + + if (jf) { + query = bson_new_from_json((const uint8_t *)jf, -1, &Error); + + if (!query) { + htrc("Wrong filter: %s", Error.message); + return 10; + } // endif Query + + } else + query = bson_new(); + + cnt = (int)mongoc_collection_count(Collection, + MONGOC_QUERY_NONE, query, 0, 0, NULL, &Error); + + if (cnt < 0) { + htrc("Collection count: %s", Error.message); + cnt = 2; + } // endif Cardinal + + bson_destroy(query); + return cnt; +} // end of CollSize + +/***********************************************************************/ +/* OpenDB: Data Base open routine for MONGO access method. */ +/***********************************************************************/ +bool CMgoConn::MakeCursor(PGLOBAL g) +{ + const char *p; + bool id, b = false, all = false; + PCSZ options = Pcg->Options; + PTDB tp = Pcg->Tdbp; + PCOL cp; + PSTRG s = NULL; + + id = (tp->GetMode() != MODE_READ); + + if (options && !stricmp(options, "all")) { + options = NULL; + all = true; + } // endif Options + + for (cp = tp->GetColumns(); cp; cp = cp->GetNext()) + if (!strcmp(cp->GetName(), "_id")) + id = true; + else if (cp->GetFmt() && !strcmp(cp->GetFmt(), "*") && !options) + all = true; + + if (Pcg->Pipe) { + if (trace) + htrc("Pipeline: %s\n", options); + + p = strrchr(options, ']'); + + if (!p) { + strcpy(g->Message, "Missing ] in pipeline"); + return true; + } else + *(char*)p = 0; + + s = new(g) STRING(g, 1023, (PSZ)options); + + if (tp->GetFilter()) { + s->Append(",{\"$match\":"); + + if (tp->GetFilter()->MakeSelector(g, s)) { + strcpy(g->Message, "Failed making selector"); + return true; + } else + s->Append('}'); + + tp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (!all && tp->GetColumns()) { + // Project list + s->Append(",{\"$project\":{\""); + + if (!id) + s->Append("_id\":0,\""); + + for (cp = tp->GetColumns(); cp; cp = cp->GetNext()) { + if (b) + s->Append(",\""); + else + b = true; + + s->Append(cp->GetJpath(g, true)); + s->Append("\":1"); + } // endfor cp + + s->Append("}}"); + } // endif all + + s->Append("]}"); + s->Resize(s->GetLength() + 1); + *(char*)p = ']'; // Restore Colist for discovery + p = s->GetStr(); + + if (trace) + htrc("New Pipeline: %s\n", p); + + Query = bson_new_from_json((const uint8_t *)p, -1, &Error); + + if (!Query) { + sprintf(g->Message, "Wrong pipeline: %s", Error.message); + return true; + } // endif Query + + Cursor = mongoc_collection_aggregate(Collection, MONGOC_QUERY_NONE, + Query, NULL, NULL); + + if (mongoc_cursor_error(Cursor, &Error)) { + sprintf(g->Message, "Mongo aggregate Failure: %s", Error.message); + return true; + } // endif error + + } else { + if (Pcg->Filter || tp->GetFilter()) { + if (trace) { + if (Pcg->Filter) + htrc("Filter: %s\n", Pcg->Filter); + + if (tp->GetFilter()) { + char buf[512]; + + tp->GetFilter()->Prints(g, buf, 511); + htrc("To_Filter: %s\n", buf); + } // endif To_Filter + + } // endif trace + + s = new(g) STRING(g, 1023, (PSZ)Pcg->Filter); + + if (tp->GetFilter()) { + if (Pcg->Filter) + s->Append(','); + + if (tp->GetFilter()->MakeSelector(g, s)) { + strcpy(g->Message, "Failed making selector"); + return NULL; + } // endif Selector + + tp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (trace) + htrc("selector: %s\n", s->GetStr()); + + s->Resize(s->GetLength() + 1); + Query = bson_new_from_json((const uint8_t *)s->GetStr(), -1, &Error); + + if (!Query) { + sprintf(g->Message, "Wrong filter: %s", Error.message); + return NULL; + } // endif Query + + } else + Query = bson_new(); + + if (!all) { + if (options && *options) { + if (trace) + htrc("options=%s\n", options); + + p = options; + } else if (tp->GetColumns()) { + // Projection list + if (s) + s->Set("{\"projection\":{\""); + else + s = new(g) STRING(g, 511, "{\"projection\":{\""); + + if (!id) + s->Append("_id\":0,\""); + + for (cp = tp->GetColumns(); cp; cp = cp->GetNext()) { + if (b) + s->Append(",\""); + else + b = true; + + s->Append(cp->GetJpath(g, true)); + s->Append("\":1"); + } // endfor cp + + s->Append("}}"); + s->Resize(s->GetLength() + 1); + p = s->GetStr(); + } else { + // count(*) ? + p = "{\"projection\":{\"_id\":1}}"; + } // endif Options + + Opts = bson_new_from_json((const uint8_t *)p, -1, &Error); + + if (!Opts) { + sprintf(g->Message, "Wrong options: %s", Error.message); + return NULL; + } // endif Opts + + } // endif all + + Cursor = mongoc_collection_find_with_opts(Collection, Query, Opts, NULL); + } // endif Pipe + + return false; +} // end of MakeCursor + +/***********************************************************************/ +/* Fetch next document. */ +/***********************************************************************/ +int CMgoConn::ReadNext(PGLOBAL g) +{ + int rc = RC_OK; + + if (!Cursor && MakeCursor(g)) { + rc = RC_FX; + } else if (mongoc_cursor_next(Cursor, &Document)) { + if (trace > 1) { + bson_iter_t iter; + ShowDocument(&iter, Document, ""); + } else if (trace == 1) + htrc("%s\n", GetDocument(g)); + + } else if (mongoc_cursor_error(Cursor, &Error)) { + sprintf(g->Message, "Mongo Cursor Failure: %s", Error.message); + rc = RC_FX; + } else + rc = RC_EF; + + return rc; +} // end of Fetch + +/***********************************************************************/ +/* Get the Json string of the current document. */ +/***********************************************************************/ +PSZ CMgoConn::GetDocument(PGLOBAL g) +{ + char *str = bson_as_json(Document, NULL); + PSZ doc = PlugDup(g, str); + + bson_free(str); + return doc; +} // end of GetDocument + +/***********************************************************************/ +/* Use to trace restaurants document contains. */ +/***********************************************************************/ +void CMgoConn::ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k) +{ + if (!doc || bson_iter_init(iter, doc)) { + const char *key; + + while (bson_iter_next(iter)) { + key = bson_iter_key(iter); + htrc("Found element key: \"%s\"\n", key); + + if (BSON_ITER_HOLDS_UTF8(iter)) + htrc("%s.%s=\"%s\"\n", k, key, bson_iter_utf8(iter, NULL)); + else if (BSON_ITER_HOLDS_INT32(iter)) + htrc("%s.%s=%d\n", k, key, bson_iter_int32(iter)); + else if (BSON_ITER_HOLDS_INT64(iter)) + htrc("%s.%s=%lld\n", k, key, bson_iter_int64(iter)); + else if (BSON_ITER_HOLDS_DOUBLE(iter)) + htrc("%s.%s=%g\n", k, key, bson_iter_double(iter)); + else if (BSON_ITER_HOLDS_DATE_TIME(iter)) + htrc("%s.%s=date(%lld)\n", k, key, bson_iter_date_time(iter)); + else if (BSON_ITER_HOLDS_OID(iter)) { + char str[25]; + + bson_oid_to_string(bson_iter_oid(iter), str); + htrc("%s.%s=%s\n", k, key, str); + } else if (BSON_ITER_HOLDS_DECIMAL128(iter)) { + char *str = NULL; + bson_decimal128_t dec; + + bson_iter_decimal128(iter, &dec); + bson_decimal128_to_string(&dec, str); + htrc("%s.%s=%s\n", k, key, str); + } else if (BSON_ITER_HOLDS_DOCUMENT(iter)) { + bson_iter_t child; + + if (bson_iter_recurse(iter, &child)) + ShowDocument(&child, NULL, key); + + } else if (BSON_ITER_HOLDS_ARRAY(iter)) { + bson_t *arr; + bson_iter_t itar; + const uint8_t *data = NULL; + uint32_t len = 0; + + bson_iter_array(iter, &len, &data); + arr = bson_new_from_data(data, len); + ShowDocument(&itar, arr, key); + } // endif's + + } // endwhile bson_iter_next + + } // endif bson_iter_init + +} // end of ShowDocument + +/***********************************************************************/ +/* Group columns for inserting or updating. */ +/***********************************************************************/ +void CMgoConn::MakeColumnGroups(PGLOBAL g) +{ + Fpc = new(g) INCOL(false); + + for (PCOL colp = Pcg->Tdbp->GetColumns(); colp; colp = colp->GetNext()) + if (!colp->IsSpecial()) + Fpc->AddCol(g, colp, colp->GetJpath(g, false)); + +} // end of MakeColumnGroups + +/***********************************************************************/ +/* DocWrite. */ +/***********************************************************************/ +bool CMgoConn::DocWrite(PGLOBAL g, PINCOL icp) +{ + for (PKC kp = icp->Klist; kp; kp = kp->Next) + if (kp->Incolp) { + bool isdoc = !kp->Incolp->Array; + + if (isdoc) + BSON_APPEND_DOCUMENT_BEGIN(&icp->Child, kp->Key, &kp->Incolp->Child); + else + BSON_APPEND_ARRAY_BEGIN(&icp->Child, kp->Key, &kp->Incolp->Child); + + if (DocWrite(g, kp->Incolp)) + return true; + + if (isdoc) + bson_append_document_end(&icp->Child, &kp->Incolp->Child); + else + bson_append_array_end(&icp->Child, &kp->Incolp->Child); + + } else if (AddValue(g, kp->Colp, &icp->Child, kp->Key, false)) + return true; + + return false; +} // end of DocWrite + +/***********************************************************************/ +/* WriteDB: Data Base write routine for DOS access method. */ +/***********************************************************************/ +int CMgoConn::Write(PGLOBAL g) +{ + int rc = RC_OK; + PTDB tp = Pcg->Tdbp; + + if (tp->GetMode() == MODE_INSERT) { + bson_init(&Fpc->Child); + + if (DocWrite(g, Fpc)) + return RC_FX; + + if (trace) { + char *str = bson_as_json(&Fpc->Child, NULL); + htrc("Inserting: %s\n", str); + bson_free(str); + } // endif trace + + if (!mongoc_collection_insert(Collection, MONGOC_INSERT_NONE, + &Fpc->Child, NULL, &Error)) { + sprintf(g->Message, "Mongo insert: %s", Error.message); + rc = RC_FX; + } // endif insert + + } else { + bool b = false; + bson_iter_t iter; + bson_t *query = bson_new(); + + bson_iter_init(&iter, Document); + + if (bson_iter_find(&iter, "_id")) { + if (BSON_ITER_HOLDS_OID(&iter)) + b = BSON_APPEND_OID(query, "_id", bson_iter_oid(&iter)); + else if (BSON_ITER_HOLDS_INT32(&iter)) + b = BSON_APPEND_INT32(query, "_id", bson_iter_int32(&iter)); + else if (BSON_ITER_HOLDS_INT64(&iter)) + b = BSON_APPEND_INT64(query, "_id", bson_iter_int64(&iter)); + else if (BSON_ITER_HOLDS_DOUBLE(&iter)) + b = BSON_APPEND_DOUBLE(query, "_id", bson_iter_double(&iter)); + else if (BSON_ITER_HOLDS_UTF8(&iter)) + b = BSON_APPEND_UTF8(query, "_id", bson_iter_utf8(&iter, NULL)); + + } // endif iter + + if (b) { + if (trace) { + char *str = bson_as_json(query, NULL); + htrc("update query: %s\n", str); + bson_free(str); + } // endif trace + + if (tp->GetMode() == MODE_UPDATE) { + bson_t child; + bson_t *update = bson_new(); + + BSON_APPEND_DOCUMENT_BEGIN(update, "$set", &child); + + for (PCOL colp = tp->GetSetCols(); colp; colp = colp->GetNext()) + if (AddValue(g, colp, &child, colp->GetJpath(g, false), true)) + rc = RC_FX; + + bson_append_document_end(update, &child); + + if (rc == RC_OK) + if (!mongoc_collection_update(Collection, MONGOC_UPDATE_NONE, + query, update, NULL, &Error)) { + sprintf(g->Message, "Mongo update: %s", Error.message); + rc = RC_FX; + } // endif update + + bson_destroy(update); + } else if (!mongoc_collection_remove(Collection, + MONGOC_REMOVE_SINGLE_REMOVE, query, NULL, &Error)) { + sprintf(g->Message, "Mongo delete: %s", Error.message); + rc = RC_FX; + } // endif remove + + } else { + strcpy(g->Message, "Mongo update: cannot find _id"); + rc = RC_FX; + } // endif b + + bson_destroy(query); + } // endif Mode + + return rc; +} // end of Write + +/***********************************************************************/ +/* Remove all documents from the collection. */ +/***********************************************************************/ +bool CMgoConn::DocDelete(PGLOBAL g) +{ + Query = bson_new(); + + if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE, + Query, NULL, &Error)) { + sprintf(g->Message, "Mongo remove all: %s", Error.message); + return true; + } // endif remove + + return false; +} // end of DocDelete + +/***********************************************************************/ +/* Rewind the collection. */ +/***********************************************************************/ +void CMgoConn::Rewind(void) +{ + mongoc_cursor_t *cursor = mongoc_cursor_clone(Cursor); + + mongoc_cursor_destroy(Cursor); + Cursor = cursor; +} // end of Rewind + +/***********************************************************************/ +/* Table close routine for MONGO tables. */ +/***********************************************************************/ +void CMgoConn::Close(void) +{ + if (Query) bson_destroy(Query); + if (Opts) bson_destroy(Opts); + if (Cursor) mongoc_cursor_destroy(Cursor); + if (Collection) mongoc_collection_destroy(Collection); + if (Client) mongoc_client_pool_push(Pool, Client); + if (Pool) mongoc_client_pool_destroy(Pool); + if (Uri) mongoc_uri_destroy(Uri); +} // end of Close + +/***********************************************************************/ +/* Mini: used to suppress blanks to json strings. */ +/***********************************************************************/ +char *CMgoConn::Mini(PGLOBAL g, PCOL colp, const bson_t *bson, bool b) +{ + char *s, *str = NULL; + char *Mbuf = (char*)PlugSubAlloc(g, NULL, colp->GetLength() + 1); + int i, k = 0; + bool ok = true; + + if (b) + s = str = bson_array_as_json(bson, NULL); + else + s = str = bson_as_json(bson, NULL); + + for (i = 0; i < colp->GetLength() && s[i]; i++) { + switch (s[i]) { + case ' ': + if (ok) continue; + case '"': + ok = !ok; + default: + break; + } // endswitch s[i] + + Mbuf[k++] = s[i]; + } // endfor i + + bson_free(str); + + if (i >= colp->GetLength()) { + sprintf(g->Message, "Value too long for column %s", colp->GetName()); + throw (int)TYPE_AM_MGO; + } // endif i + + Mbuf[k] = 0; + return Mbuf; +} // end of Mini + +/***********************************************************************/ +/* Retrieve the column value from the document. */ +/***********************************************************************/ +void CMgoConn::GetColumnValue(PGLOBAL g, PCOL colp) +{ + char *jpath = colp->GetJpath(g, false); + PVAL value = colp->GetValue(); + + if (!strcmp(jpath, "*")) { + value->SetValue_psz(Mini(g, colp, Document, false)); + } else if (bson_iter_init(&Iter, Document) && + bson_iter_find_descendant(&Iter, jpath, &Desc)) { + if (BSON_ITER_HOLDS_UTF8(&Desc)) + value->SetValue_psz((PSZ)bson_iter_utf8(&Desc, NULL)); + else if (BSON_ITER_HOLDS_INT32(&Desc)) + value->SetValue(bson_iter_int32(&Desc)); + else if (BSON_ITER_HOLDS_INT64(&Desc)) + value->SetValue(bson_iter_int64(&Desc)); + else if (BSON_ITER_HOLDS_DOUBLE(&Desc)) + value->SetValue(bson_iter_double(&Desc)); + else if (BSON_ITER_HOLDS_DATE_TIME(&Desc)) + value->SetValue(bson_iter_date_time(&Desc) / 1000); + else if (BSON_ITER_HOLDS_BOOL(&Desc)) { + bool b = bson_iter_bool(&Desc); + + if (value->IsTypeNum()) + value->SetValue(b ? 1 : 0); + else + value->SetValue_psz(b ? "true" : "false"); + + } else if (BSON_ITER_HOLDS_OID(&Desc)) { + char str[25]; + + bson_oid_to_string(bson_iter_oid(&Desc), str); + value->SetValue_psz(str); + } else if (BSON_ITER_HOLDS_NULL(&Iter)) { + // Apparently this does not work... + value->Reset(); + value->SetNull(true); + } else if (BSON_ITER_HOLDS_DECIMAL128(&Desc)) { + char *str = NULL; + bson_decimal128_t dec; + + bson_iter_decimal128(&Desc, &dec); + bson_decimal128_to_string(&dec, str); + value->SetValue_psz(str); + bson_free(str); + } else if (BSON_ITER_HOLDS_DOCUMENT(&Iter)) { + bson_t *doc; + const uint8_t *data = NULL; + uint32_t len = 0; + + bson_iter_document(&Desc, &len, &data); + + if (data) { + doc = bson_new_from_data(data, len); + value->SetValue_psz(Mini(g, colp, doc, false)); + bson_destroy(doc); + } else { + // ... but we can come here in case of NULL! + value->Reset(); + value->SetNull(true); + } // endif data + + } else if (BSON_ITER_HOLDS_ARRAY(&Iter)) { + bson_t *arr; + const uint8_t *data = NULL; + uint32_t len = 0; + + bson_iter_array(&Desc, &len, &data); + + if (data) { + arr = bson_new_from_data(data, len); + value->SetValue_psz(Mini(g, colp, arr, true)); + bson_destroy(arr); + } else { + // This is a bug in returning the wrong type + // This fix is only for document items + bson_t *doc; + + bson_iter_document(&Desc, &len, &data); + + if (data) { + doc = bson_new_from_data(data, len); + value->SetValue_psz(Mini(g, colp, doc, false)); + bson_destroy(doc); + } else { + // ... or we can also come here in case of NULL! + value->Reset(); + value->SetNull(true); + } // endif data + + } // endif data + + } else + value->Reset(); + + } else { + // Field does not exist + value->Reset(); + value->SetNull(true); + } // endif Iter + +} // end of GetColumnValue + +/***********************************************************************/ +/* AddValue: Add column value to the document to insert or update. */ +/***********************************************************************/ +bool CMgoConn::AddValue(PGLOBAL g, PCOL colp, bson_t *doc, char *key, bool upd) +{ + bool rc = false; + PVAL value = colp->GetValue(); + + if (value->IsNull()) { + if (upd) + rc = BSON_APPEND_NULL(doc, key); + else + return false; + + } else switch (colp->GetResultType()) { + case TYPE_STRING: + rc = BSON_APPEND_UTF8(doc, key, value->GetCharValue()); + break; + case TYPE_INT: + case TYPE_SHORT: + rc = BSON_APPEND_INT32(doc, key, value->GetIntValue()); + break; + case TYPE_TINY: + rc = BSON_APPEND_BOOL(doc, key, value->GetIntValue()); + break; + case TYPE_BIGINT: + rc = BSON_APPEND_INT64(doc, key, value->GetBigintValue()); + break; + case TYPE_DOUBLE: + rc = BSON_APPEND_DOUBLE(doc, key, value->GetFloatValue()); + break; + case TYPE_DECIM: + {bson_decimal128_t dec; + + if (bson_decimal128_from_string(value->GetCharValue(), &dec)) + rc = BSON_APPEND_DECIMAL128(doc, key, &dec); + + } break; + case TYPE_DATE: + rc = BSON_APPEND_DATE_TIME(doc, key, value->GetBigintValue() * 1000); + break; + default: + sprintf(g->Message, "Type %d not supported yet", colp->GetResultType()); + return true; + } // endswitch Buf_Type + + if (!rc) { + strcpy(g->Message, "Adding value failed"); + return true; + } else + return false; + +} // end of AddValue + +#if 0 +void *CMgoConn::mgo_alloc(size_t n) +{ + char *mst = (char*)PlgDBSubAlloc(G, NULL, n + sizeof(size_t)); + + if (mst) { + *(size_t*)mst = n; + return mst + sizeof(size_t); + } // endif mst + + return NULL; +} // end of mgo_alloc + +void *CMgoConn::mgo_calloc(size_t n, size_t sz) +{ + void *m = mgo_alloc(n * sz); + + if (m) + memset(m, 0, n * sz); + + return m; +} // end of mgo_calloc + +void *CMgoConn::mgo_realloc(void *m, size_t n) +{ + if (!m) + return n ? mgo_alloc(n) : NULL; + + size_t *osz = (size_t*)((char*)m - sizeof(size_t)); + + if (n > *osz) { + void *nwm = mgo_alloc(n); + + if (nwm) + memcpy(nwm, m, *osz); + + return nwm; + } else { + *osz = n; + return m; + } // endif n + +} // end of mgo_realloc +#endif // 0 + diff --git a/storage/connect/cmgoconn.h b/storage/connect/cmgoconn.h new file mode 100644 index 00000000000..9e1361039ab --- /dev/null +++ b/storage/connect/cmgoconn.h @@ -0,0 +1,112 @@ +/***********************************************************************/ +/* CMgoConn.h : header file for the MongoDB connection classes. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include MongoDB library header files. */ +/***********************************************************************/ +#include <bson.h> +#include <bcon.h> +#include <mongoc.h> + +// C connection to a MongoDB data source +class TDBMGO; +class MGOCOL; + +/***********************************************************************/ +/* Include MongoDB library header files. */ +/***********************************************************************/ +typedef class INCOL *PINCOL; +typedef class MGODEF *PMGODEF; +typedef class TDBMGO *PTDBMGO; +typedef class MGOCOL *PMGOCOL; + +typedef struct mongo_parms { + PTDB Tdbp; + PCSZ Uristr; // Driver URI + PCSZ Db_name; + PCSZ Coll_name; + PCSZ Options; + PCSZ Filter; + bool Pipe; +//PCSZ User; // User connect info +//PCSZ Pwd; // Password connect info +//int Fsize; // Fetch size +//bool Scrollable; // Scrollable cursor +} CMGOPARM, *PCPARM; + +typedef struct KEYCOL { + KEYCOL *Next; + PINCOL Incolp; + PCOL Colp; + char *Key; +} *PKC; + +/***********************************************************************/ +/* Used when inserting values in a MongoDB collection. */ +/***********************************************************************/ +class INCOL : public BLOCK { +public: + // Constructor + INCOL(bool ar) { Klist = NULL; Array = ar; } + + // Methods + void AddCol(PGLOBAL g, PCOL colp, char *jp); + + //Members + bson_t Child; + PKC Klist; + bool Array; +}; // end of INCOL; + +/***********************************************************************/ +/* CMgoConn class. */ +/***********************************************************************/ +class CMgoConn : public BLOCK { + friend class TDBMGO; + friend class MGODISC; +public: + // Constructor + CMgoConn(PGLOBAL g, PCPARM pcg); + + //static void *mgo_alloc(size_t n); + //static void *mgo_calloc(size_t n, size_t sz); + //static void *mgo_realloc(void *m, size_t n); + //static void mgo_free(void *) {} + + // Implementation + bool IsConnected(void) { return m_Connected; } + bool Connect(PGLOBAL g); + int CollSize(PGLOBAL g); + bool MakeCursor(PGLOBAL g); + int ReadNext(PGLOBAL g); + PSZ GetDocument(PGLOBAL g); + void ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k); + void MakeColumnGroups(PGLOBAL g); + bool DocWrite(PGLOBAL g, PINCOL icp); + int Write(PGLOBAL g); + bool DocDelete(PGLOBAL g); + void Rewind(void); + void Close(void); + PSZ Mini(PGLOBAL g, PCOL colp, const bson_t *bson, bool b); + void GetColumnValue(PGLOBAL g, PCOL colp); + bool AddValue(PGLOBAL g, PCOL colp, bson_t *doc, char *key, bool upd); + +protected: + // Members + PCPARM Pcg; + mongoc_uri_t *Uri; + mongoc_client_pool_t *Pool; // Thread safe client pool + mongoc_client_t *Client; // The MongoDB client + mongoc_database_t *Database; // The MongoDB database + mongoc_collection_t *Collection; // The MongoDB collection + mongoc_cursor_t *Cursor; + const bson_t *Document; + bson_t *Query; // MongoDB cursor filter + bson_t *Opts; // MongoDB cursor options + bson_error_t Error; + bson_iter_t Iter; // Used to retrieve column value + bson_iter_t Desc; // Descendant iter + PINCOL Fpc; // To insert INCOL classes + bool m_Connected; +}; // end of class CMgoConn diff --git a/storage/connect/colblk.h b/storage/connect/colblk.h index 608aa040787..b22933d9ebb 100644 --- a/storage/connect/colblk.h +++ b/storage/connect/colblk.h @@ -38,7 +38,8 @@ class DllExport COLBLK : public XOBJECT { virtual PTDB GetTo_Tdb(void) {return To_Tdb;} virtual int GetClustered(void) {return 0;} virtual int IsClustered(void) {return FALSE;} - PCOL GetNext(void) {return Next;} + virtual PSZ GetJpath(PGLOBAL g, bool proj) {return NULL;} + PCOL GetNext(void) {return Next;} PSZ GetName(void) {return Name;} int GetIndex(void) {return Index;} ushort GetColUse(void) {return ColUse;} diff --git a/storage/connect/connect.cc b/storage/connect/connect.cc index 00a2a0d16b6..08d35b05932 100644 --- a/storage/connect/connect.cc +++ b/storage/connect/connect.cc @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2017 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -185,7 +185,7 @@ bool CntInfo(PGLOBAL g, PTDB tp, PXF info) /***********************************************************************/ PTDB CntGetTDB(PGLOBAL g, LPCSTR name, MODE mode, PHC h) { - PTDB tdbp; + PTDB tdbp = NULL; PTABLE tabp; PDBUSER dup = PlgGetUser(g); volatile PCATLG cat = (dup) ? dup->Catalog : NULL; // Safe over throw diff --git a/storage/connect/connect.h b/storage/connect/connect.h index 128561b80f3..2bca8bf54cb 100644 --- a/storage/connect/connect.h +++ b/storage/connect/connect.h @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2011 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -15,6 +15,7 @@ /**************** Cnt H Declares Source Code File (.H) *****************/ /* Name: CONNECT.H Version 2.4 */ +/* Author Olivier BERTRAND bertrandop@gmail.com */ /* This file contains the some based classes declares. */ /***********************************************************************/ #include "filamtxt.h" diff --git a/storage/connect/engmsg.h b/storage/connect/engmsg.h index 14808758efd..c072511065e 100644 --- a/storage/connect/engmsg.h +++ b/storage/connect/engmsg.h @@ -1,3 +1,4 @@ +/* Copyright (C) MariaDB Corporation Ab */ #define MSG_ACCESS_VIOLATN "Access violation" #define MSG_ADD_BAD_TYPE "Array add value type mismatch (%s -> %s)" #define MSG_ALLOC_ERROR "Error allocating %s" diff --git a/storage/connect/filter.cpp b/storage/connect/filter.cpp index d4026f39c67..68b58004970 100644 --- a/storage/connect/filter.cpp +++ b/storage/connect/filter.cpp @@ -33,18 +33,11 @@ #include "tabcol.h" #include "xtable.h" #include "array.h" -//#include "subquery.h" #include "filter.h" -//#include "token.h" -//#include "select.h" #include "xindex.h" -#if defined(MONGO_SUPPORT) -#include "filamtxt.h" -#include "tabdos.h" -#include "tabjson.h" +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) #include "tabext.h" -#include "tabmgo.h" -#endif // MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT /***********************************************************************/ /* Utility routines. */ @@ -1412,11 +1405,11 @@ PFIL FILTER::Copy(PTABS t) } // end of Copy #endif // 0 +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) /***********************************************************************/ /* Make selector json representation for Mongo tables. */ /***********************************************************************/ -#if defined(MONGO_SUPPORT) -bool FILTER::MakeSelector(PGLOBAL g, PSTRG s, bool m) +bool FILTER::MakeSelector(PGLOBAL g, PSTRG s) { s->Append('{'); @@ -1428,29 +1421,21 @@ bool FILTER::MakeSelector(PGLOBAL g, PSTRG s, bool m) s->Append(Opc == OP_AND ? "and" : "or"); s->Append("\":["); - if (((PFIL)Arg(0))->MakeSelector(g, s, m)) + if (((PFIL)Arg(0))->MakeSelector(g, s)) return true; s->Append(','); - if (((PFIL)Arg(1))->MakeSelector(g, s, m)) + if (((PFIL)Arg(1))->MakeSelector(g, s)) return true; s->Append(']'); } else { - char *pth, buf[501]; - if (GetArgType(0) != TYPE_COLBLK) return true; s->Append('"'); - - if (m) - pth = ((PMGOCOL)Arg(0))->Jpath; - else if (!(pth = ((PJCOL)Arg(0))->GetJpath(g, false))) - return true; - - s->Append(pth); + s->Append(((PCOL)Arg(0))->GetJpath(g, false)); s->Append("\":{\"$"); switch (Opc) { @@ -1472,33 +1457,33 @@ bool FILTER::MakeSelector(PGLOBAL g, PSTRG s, bool m) case OP_LE: s->Append("lte"); break; - //case OP_NULL: - // s->Append("ne"); - // break; - //case OP_LIKE: - // s->Append("ne"); - // break; - //case OP_EXIST: - // s->Append("ne"); - // break; + case OP_NULL: + case OP_LIKE: + case OP_EXIST: default: return true; } // endswitch Opc s->Append("\":"); - if (GetArgType(1) == TYPE_COLBLK) - return true; + if (GetArgType(1) == TYPE_COLBLK) { + s->Append("\"$"); + s->Append(((PEXTCOL)Arg(1))->GetJpath(g, false)); + s->Append('"'); + } else { + char buf[501]; + + Arg(1)->Prints(g, buf, 500); + s->Append(buf); + } // endif Type - Arg(1)->Prints(g, buf, 500); - s->Append(buf); s->Append('}'); } // endif Opc s->Append('}'); return false; } // end of MakeSelector -#endif // MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT /*********************************************************************/ /* Make file output of FILTER contents. */ diff --git a/storage/connect/filter.h b/storage/connect/filter.h index 42cddb08655..85c75a2227e 100644 --- a/storage/connect/filter.h +++ b/storage/connect/filter.h @@ -1,7 +1,7 @@ /*************** Filter H Declares Source Code File (.H) ***************/ -/* Name: FILTER.H Version 1.2 */ +/* Name: FILTER.H Version 1.3 */ /* */ -/* (C) Copyright to the author Olivier BERTRAND 2010-2015 */ +/* (C) Copyright to the author Olivier BERTRAND 2010-2017 */ /* */ /* This file contains the FILTER and derived classes declares. */ /***********************************************************************/ @@ -61,9 +61,9 @@ class DllExport FILTER : public XOBJECT { /* Filter description block */ //virtual PXOB CheckSubQuery(PGLOBAL, PSQL); //virtual bool CheckLocal(PTDB); //virtual int CheckSpcCol(PTDB tdbp, int n); -#if defined(MONGO_SUPPORT) - bool MakeSelector(PGLOBAL g, PSTRG s, bool m); -#endif // MONGO_SUPPORT +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) + bool MakeSelector(PGLOBAL g, PSTRG s); +#endif // MONGO_SUPPORT || JDBC_SUPPORT virtual void Printf(PGLOBAL g, FILE *f, uint n); virtual void Prints(PGLOBAL g, char *ps, uint z); // PFIL Linearize(bool nosep); diff --git a/storage/connect/global.h b/storage/connect/global.h index 2f76b331f50..cb756494efc 100644 --- a/storage/connect/global.h +++ b/storage/connect/global.h @@ -1,6 +1,7 @@ /***********************************************************************/ /* GLOBAL.H: Declaration file used by all CONNECT implementations. */ -/* (C) Copyright Olivier Bertrand 1993-2017 */ +/* (C) Copyright MariaDB Corporation Ab */ +/* Author Olivier Bertrand 1993-2017 */ /***********************************************************************/ /***********************************************************************/ diff --git a/storage/connect/ha_connect.cc b/storage/connect/ha_connect.cc index d6e4eff3f8d..79efdec13cc 100644 --- a/storage/connect/ha_connect.cc +++ b/storage/connect/ha_connect.cc @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2017 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -98,8 +98,7 @@ rnd_next signals that it has reached the end of its data. Calls to ha_connect::extra() are hints as to what will be occuring to the request. - Happy use!<br> - -Olivier + Author Olivier Bertrand */ #ifdef USE_PRAGMA_IMPLEMENTATION @@ -209,10 +208,10 @@ pthread_mutex_t parmut = PTHREAD_MUTEX_INITIALIZER; /***********************************************************************/ PQRYRES OEMColumns(PGLOBAL g, PTOS topt, char *tab, char *db, bool info); PQRYRES VirColumns(PGLOBAL g, bool info); -PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info); +PQRYRES JSONColumns(PGLOBAL g, PCSZ db, PCSZ dsn, PTOS topt, bool info); PQRYRES XMLColumns(PGLOBAL g, char *db, char *tab, PTOS topt, bool info); #if defined(MONGO_SUPPORT) -PQRYRES MGOColumns(PGLOBAL g, char *db, PTOS topt, bool info); +PQRYRES MGOColumns(PGLOBAL g, PCSZ db, PCSZ url, PTOS topt, bool info); #endif // MONGO_SUPPORT int TranslateJDBCType(int stp, char *tn, int prec, int& len, char& v); void PushWarning(PGLOBAL g, THD *thd, int level); @@ -701,7 +700,7 @@ static int connect_init_func(void *p) DTVAL::SetTimeShift(); // Initialize time zone shift once for all BINCOL::SetEndian(); // Initialize host endian setting #if defined(JDBC_SUPPORT) - JDBConn::SetJVM(); + JAVAConn::SetJVM(); #endif // JDBC_SUPPORT DBUG_RETURN(0); } // end of connect_init_func @@ -726,7 +725,7 @@ static int connect_done_func(void *) #endif // MONGO_SUPPORT #ifdef JDBC_SUPPORT - JDBConn::ResetJVM(); + JAVAConn::ResetJVM(); #endif // JDBC_SUPPORT #if defined(__WIN__) @@ -4081,7 +4080,7 @@ int ha_connect::info(uint flag) if (xmod == MODE_ANY || xmod == MODE_ALTER) { // Pure info, not a query pure= true; - xp->CheckCleanup(); + xp->CheckCleanup(xmod == MODE_ANY && valid_query_id == 0); } // endif xmod // This is necessary for getting file length @@ -4094,8 +4093,10 @@ int ha_connect::info(uint flag) } else DBUG_RETURN(HA_ERR_INTERNAL_ERROR); // Should never happen - if (!(tdbp= GetTDB(g))) - DBUG_RETURN(HA_ERR_INTERNAL_ERROR); // Should never happen + if (!(tdbp = GetTDB(g))) { + my_message(ER_UNKNOWN_ERROR, g->Message, MYF(0)); + DBUG_RETURN(HA_ERR_INTERNAL_ERROR); + } // endif tdbp valid_info = false; } // endif tdbp @@ -5299,16 +5300,18 @@ static int connect_assisted_discovery(handlerton *, THD* thd, #if defined(ODBC_SUPPORT) POPARM sop= NULL; PCSZ ucnc= NULL; - bool cnc= false; + PCSZ tabtyp = NULL; + bool cnc= false; int cto= -1, qto= -1; #endif // ODBC_SUPPORT #if defined(JDBC_SUPPORT) PJPARM sjp= NULL; +#endif // JDBC_SUPPORT +#if defined(JDBC_SUPPORT) || defined(MONGO_SUPPORT) PCSZ driver= NULL; char *url= NULL; //char *prop= NULL; - PCSZ tabtyp= NULL; -#endif // JDBC_SUPPORT +#endif // JDBC_SUPPORT || MONGO_SUPPORT uint tm, fnc= FNC_NO, supfnc= (FNC_NO | FNC_COL); bool bif, ok= false, dbf= false; TABTYPE ttp= TAB_UNDEF; @@ -5361,19 +5364,17 @@ static int connect_assisted_discovery(handlerton *, THD* thd, #endif // __WIN__ port= atoi(GetListOption(g, "port", topt->oplist, "0")); #if defined(ODBC_SUPPORT) - mxr= atoi(GetListOption(g,"maxres", topt->oplist, "0")); + tabtyp = GetListOption(g, "Tabtype", topt->oplist, NULL); + mxr= atoi(GetListOption(g,"maxres", topt->oplist, "0")); cto= atoi(GetListOption(g,"ConnectTimeout", topt->oplist, "-1")); qto= atoi(GetListOption(g,"QueryTimeout", topt->oplist, "-1")); if ((ucnc= GetListOption(g, "UseDSN", topt->oplist))) cnc= (!*ucnc || *ucnc == 'y' || *ucnc == 'Y' || atoi(ucnc) != 0); #endif -#if defined(JDBC_SUPPORT) +#if defined(JDBC_SUPPORT) || defined(MONGO_SUPPORT) driver= GetListOption(g, "Driver", topt->oplist, NULL); -// url= GetListOption(g, "URL", topt->oplist, NULL); -// prop = GetListOption(g, "Properties", topt->oplist, NULL); - tabtyp = GetListOption(g, "Tabtype", topt->oplist, NULL); -#endif // JDBC_SUPPORT +#endif // JDBC_SUPPORT || MONGO_SUPPORT #if defined(PROMPT_OK) cop= atoi(GetListOption(g, "checkdsn", topt->oplist, "0")); #endif // PROMPT_OK @@ -5585,14 +5586,14 @@ static int connect_assisted_discovery(handlerton *, THD* thd, ok = true; break; -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) case TAB_MONGO: if (!topt->tabname) topt->tabname = tab; ok = true; break; -#endif // MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT case TAB_VIR: ok = true; break; @@ -5733,13 +5734,36 @@ static int connect_assisted_discovery(handlerton *, THD* thd, qrp = VirColumns(g, fnc == FNC_COL); break; case TAB_JSON: - qrp = JSONColumns(g, (char*)db, dsn, topt, fnc == FNC_COL); + qrp = JSONColumns(g, db, dsn, topt, fnc == FNC_COL); break; -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) case TAB_MONGO: - qrp = MGOColumns(g, (char*)db, topt, fnc == FNC_COL); + if (!(url = strz(g, create_info->connect_string)) || !*url) + url = "mongodb://localhost:27017"; + +#if !defined(MONGO_SUPPORT) + driver = "JAVA"; + // strcpy(g->Message, "No column discovery for Java MONGO tables yet"); + // Temporarily use the JSONColumns function + qrp = JSONColumns(g, db, url, topt, fnc == FNC_COL); +#elif !defined(JDBC_SUPPORT) + driver = "C"; + qrp = MGOColumns(g, db, url, topt, fnc == FNC_COL); +#else // MONGO_SUPPORT && JDBC_SUPPORT + if (!driver) + driver = "C"; + + if (toupper(*driver) == 'C') { + qrp = MGOColumns(g, db, url, topt, fnc == FNC_COL); + } else { + // strcpy(g->Message, "No column discovery for Java MONGO tables yet"); + // Temporarily use the JSONColumns function + qrp = JSONColumns(g, db, url, topt, fnc == FNC_COL); + } // endif driver + +#endif // MONGO_SUPPORT && JDBC_SUPPORT break; -#endif // MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT #if defined(LIBXML2_SUPPORT) || defined(DOMDOC_SUPPORT) case TAB_XML: qrp = XMLColumns(g, (char*)db, tab, topt, fnc == FNC_COL); diff --git a/storage/connect/ha_connect.h b/storage/connect/ha_connect.h index 8d8307b4bd1..9d71aaa5433 100644 --- a/storage/connect/ha_connect.h +++ b/storage/connect/ha_connect.h @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2015 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -14,6 +14,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ /** @file ha_connect.h + Author Olivier Bertrand @brief The ha_connect engine is a prototype storage engine to access external data. diff --git a/storage/connect/javaconn.cpp b/storage/connect/javaconn.cpp new file mode 100644 index 00000000000..9fc44c5cbfe --- /dev/null +++ b/storage/connect/javaconn.cpp @@ -0,0 +1,599 @@ +/************ Javaconn C++ Functions Source Code File (.CPP) ***********/ +/* Name: JAVAConn.CPP Version 1.0 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the JAVA connection classes functions. */ +/***********************************************************************/ + +#if defined(__WIN__) +// This is needed for RegGetValue +#define _WINVER 0x0601 +#undef _WIN32_WINNT +#define _WIN32_WINNT 0x0601 +#endif // __WIN__ + +/***********************************************************************/ +/* Include relevant MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> +#include <m_string.h> +#if defined(__WIN__) +#include <direct.h> // for getcwd +#if defined(__BORLANDC__) +#define __MFC_COMPAT__ // To define min/max as macro +#endif // __BORLANDC__ +#else // !__WIN__ +#if defined(UNIX) +#include <errno.h> +#else // !UNIX +#endif // !UNIX +#include <stdio.h> +#include <stdlib.h> // for getenv +#define NODW +#endif // !__WIN__ + +/***********************************************************************/ +/* Required objects includes. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "colblk.h" +#include "xobject.h" +#include "xtable.h" +#include "tabext.h" +#include "javaconn.h" +#include "resource.h" +#include "valblk.h" +#include "osutil.h" + +#if defined(__WIN__) +extern "C" HINSTANCE s_hModule; // Saved module handle +#endif // __WIN__ +#define nullptr 0 + +//TYPCONV GetTypeConv(); +//int GetConvSize(); +extern char *JvmPath; // The connect_jvm_path global variable value +extern char *ClassPath; // The connect_class_path global variable value + +char *GetJavaWrapper(void); // The connect_java_wrapper variable value + +/***********************************************************************/ +/* Static JAVAConn objects. */ +/***********************************************************************/ +void *JAVAConn::LibJvm = NULL; +CRTJVM JAVAConn::CreateJavaVM = NULL; +GETJVM JAVAConn::GetCreatedJavaVMs = NULL; +#if defined(_DEBUG) +GETDEF JAVAConn::GetDefaultJavaVMInitArgs = NULL; +#endif // _DEBUG + +/***********************************************************************/ +/* Some macro's (should be defined elsewhere to be more accessible) */ +/***********************************************************************/ +#if defined(_DEBUG) +#define ASSERT(f) assert(f) +#define DEBUG_ONLY(f) (f) +#else // !_DEBUG +#define ASSERT(f) ((void)0) +#define DEBUG_ONLY(f) ((void)0) +#endif // !_DEBUG + +/***********************************************************************/ +/* Allocate the structure used to refer to the result set. */ +/***********************************************************************/ +static JCATPARM *AllocCatInfo(PGLOBAL g, JCATINFO fid, PCSZ db, + PCSZ tab, PQRYRES qrp) +{ + JCATPARM *cap; + +#if defined(_DEBUG) + assert(qrp); +#endif + + if ((cap = (JCATPARM *)PlgDBSubAlloc(g, NULL, sizeof(JCATPARM)))) { + memset(cap, 0, sizeof(JCATPARM)); + cap->Id = fid; + cap->Qrp = qrp; + cap->DB = db; + cap->Tab = tab; + } // endif cap + + return cap; +} // end of AllocCatInfo + +/***********************************************************************/ +/* JAVAConn construction/destruction. */ +/***********************************************************************/ +JAVAConn::JAVAConn(PGLOBAL g, PCSZ wrapper) +{ + m_G = g; + jvm = nullptr; // Pointer to the JVM (Java Virtual Machine) + env = nullptr; // Pointer to native interface + jdi = nullptr; // Pointer to the java wrapper class + job = nullptr; // The java wrapper class object + errid = nullptr; + DiscFunc = "Disconnect"; + Msg = NULL; + m_Wrap = (wrapper) ? wrapper : GetJavaWrapper(); + + if (!strchr(m_Wrap, '/')) { + // Add the wrapper package name + char *wn = (char*)PlugSubAlloc(g, NULL, strlen(m_Wrap) + 10); + m_Wrap = strcat(strcpy(wn, "wrappers/"), m_Wrap); + } // endif m_Wrap + + m_Opened = false; + m_Connected = false; + m_Rows = 0; +//*m_ErrMsg = '\0'; +} // end of JAVAConn + +//JAVAConn::~JAVAConn() +// { +//if (Connected()) +// EndCom(); + +// } // end of ~JAVAConn + +/***********************************************************************/ +/* Screen for errors. */ +/***********************************************************************/ +bool JAVAConn::Check(jint rc) +{ + jstring s; + + if (env->ExceptionCheck()) { + jthrowable exc = env->ExceptionOccurred(); + jmethodID tid = env->GetMethodID(env->FindClass("java/lang/Object"), + "toString", "()Ljava/lang/String;"); + + if (exc != nullptr && tid != nullptr) { + jstring s = (jstring)env->CallObjectMethod(exc, tid); + const char *utf = env->GetStringUTFChars(s, (jboolean)false); + env->DeleteLocalRef(s); + Msg = PlugDup(m_G, utf); + } else + Msg = "Exception occured"; + + env->ExceptionClear(); + } else if (rc < 0) { + s = (jstring)env->CallObjectMethod(job, errid); + Msg = (char*)env->GetStringUTFChars(s, (jboolean)false); + } else + Msg = NULL; + + return (Msg != NULL); +} // end of Check + +/***********************************************************************/ +/* Get MethodID if not exists yet. */ +/***********************************************************************/ +bool JAVAConn::gmID(PGLOBAL g, jmethodID& mid, const char *name, const char *sig) +{ + if (mid == nullptr) { + mid = env->GetMethodID(jdi, name, sig); + + if (Check()) { + strcpy(g->Message, Msg); + return true; + } else + return false; + + } else + return false; + +} // end of gmID + +#if 0 +/***********************************************************************/ +/* Utility routine. */ +/***********************************************************************/ +int JAVAConn::GetMaxValue(int n) +{ + jint m; + jmethodID maxid = nullptr; + + if (gmID(m_G, maxid, "GetMaxValue", "(I)I")) + return -1; + + // call method + if (Check(m = env->CallIntMethod(job, maxid, n))) + htrc("GetMaxValue: %s", Msg); + + return (int)m; +} // end of GetMaxValue +#endif // 0 + +/***********************************************************************/ +/* Reset the JVM library. */ +/***********************************************************************/ +void JAVAConn::ResetJVM(void) +{ + if (LibJvm) { +#if defined(__WIN__) + FreeLibrary((HMODULE)LibJvm); +#else // !__WIN__ + dlclose(LibJvm); +#endif // !__WIN__ + LibJvm = NULL; + CreateJavaVM = NULL; + GetCreatedJavaVMs = NULL; +#if defined(_DEBUG) + GetDefaultJavaVMInitArgs = NULL; +#endif // _DEBUG + } // endif LibJvm + +} // end of ResetJVM + +/***********************************************************************/ +/* Dynamically link the JVM library. */ +/* The purpose of this function is to allow using the CONNECT plugin */ +/* for other table types when the Java JDK is not installed. */ +/***********************************************************************/ +bool JAVAConn::GetJVM(PGLOBAL g) +{ + int ntry; + + if (!LibJvm) { + char soname[512]; + +#if defined(__WIN__) + for (ntry = 0; !LibJvm && ntry < 3; ntry++) { + if (!ntry && JvmPath) { + strcat(strcpy(soname, JvmPath), "\\jvm.dll"); + ntry = 3; // No other try + } else if (ntry < 2 && getenv("JAVA_HOME")) { + strcpy(soname, getenv("JAVA_HOME")); + + if (ntry == 1) + strcat(soname, "\\jre"); + + strcat(soname, "\\bin\\client\\jvm.dll"); + } else { + // Try to find it through the registry + char version[16]; + char javaKey[64] = "SOFTWARE\\JavaSoft\\Java Runtime Environment"; + LONG rc; + DWORD BufferSize = 16; + + strcpy(soname, "jvm.dll"); // In case it fails + + if ((rc = RegGetValue(HKEY_LOCAL_MACHINE, javaKey, "CurrentVersion", + RRF_RT_ANY, NULL, (PVOID)&version, &BufferSize)) == ERROR_SUCCESS) { + strcat(strcat(javaKey, "\\"), version); + BufferSize = sizeof(soname); + + if ((rc = RegGetValue(HKEY_LOCAL_MACHINE, javaKey, "RuntimeLib", + RRF_RT_ANY, NULL, (PVOID)&soname, &BufferSize)) != ERROR_SUCCESS) + printf("RegGetValue: rc=%ld\n", rc); + + } // endif rc + + ntry = 3; // Try this only once + } // endelse + + // Load the desired shared library + LibJvm = LoadLibrary(soname); + } // endfor ntry + + // Get the needed entries + if (!LibJvm) { + char buf[256]; + DWORD rc = GetLastError(); + + sprintf(g->Message, MSG(DLL_LOAD_ERROR), rc, soname); + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, NULL, rc, 0, + (LPTSTR)buf, sizeof(buf), NULL); + strcat(strcat(g->Message, ": "), buf); + } else if (!(CreateJavaVM = (CRTJVM)GetProcAddress((HINSTANCE)LibJvm, + "JNI_CreateJavaVM"))) { + sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), "JNI_CreateJavaVM"); + FreeLibrary((HMODULE)LibJvm); + LibJvm = NULL; + } else if (!(GetCreatedJavaVMs = (GETJVM)GetProcAddress((HINSTANCE)LibJvm, + "JNI_GetCreatedJavaVMs"))) { + sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), "JNI_GetCreatedJavaVMs"); + FreeLibrary((HMODULE)LibJvm); + LibJvm = NULL; +#if defined(_DEBUG) + } else if (!(GetDefaultJavaVMInitArgs = (GETDEF)GetProcAddress((HINSTANCE)LibJvm, + "JNI_GetDefaultJavaVMInitArgs"))) { + sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), + "JNI_GetDefaultJavaVMInitArgs"); + FreeLibrary((HMODULE)LibJvm); + LibJvm = NULL; +#endif // _DEBUG + } // endif LibJvm +#else // !__WIN__ + const char *error = NULL; + + for (ntry = 0; !LibJvm && ntry < 2; ntry++) { + if (!ntry && JvmPath) { + strcat(strcpy(soname, JvmPath), "/libjvm.so"); + ntry = 2; + } else if (!ntry && getenv("JAVA_HOME")) { + // TODO: Replace i386 by a better guess + strcat(strcpy(soname, getenv("JAVA_HOME")), "/jre/lib/i386/client/libjvm.so"); + } else { // Will need LD_LIBRARY_PATH to be set + strcpy(soname, "libjvm.so"); + ntry = 2; + } // endelse + + LibJvm = dlopen(soname, RTLD_LAZY); + } // endfor ntry + + // Load the desired shared library + if (!LibJvm) { + error = dlerror(); + sprintf(g->Message, MSG(SHARED_LIB_ERR), soname, SVP(error)); + } else if (!(CreateJavaVM = (CRTJVM)dlsym(LibJvm, "JNI_CreateJavaVM"))) { + error = dlerror(); + sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_CreateJavaVM", SVP(error)); + dlclose(LibJvm); + LibJvm = NULL; + } else if (!(GetCreatedJavaVMs = (GETJVM)dlsym(LibJvm, "JNI_GetCreatedJavaVMs"))) { + error = dlerror(); + sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_GetCreatedJavaVMs", SVP(error)); + dlclose(LibJvm); + LibJvm = NULL; +#if defined(_DEBUG) + } else if (!(GetDefaultJavaVMInitArgs = (GETDEF)dlsym(LibJvm, + "JNI_GetDefaultJavaVMInitArgs"))) { + error = dlerror(); + sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_GetDefaultJavaVMInitArgs", SVP(error)); + dlclose(LibJvm); + LibJvm = NULL; +#endif // _DEBUG + } // endif LibJvm +#endif // !__WIN__ + + } // endif LibJvm + + return LibJvm == NULL; +} // end of GetJVM + +/***********************************************************************/ +/* Open: connect to a data source. */ +/***********************************************************************/ +bool JAVAConn::Open(PGLOBAL g) +{ + bool brc = true, err = false; + jboolean jt = (trace > 0); + + // Link or check whether jvm library was linked + if (GetJVM(g)) + return true; + + // Firstly check whether the jvm was already created + JavaVM* jvms[1]; + jsize jsz; + jint rc = GetCreatedJavaVMs(jvms, 1, &jsz); + + if (rc == JNI_OK && jsz == 1) { + // jvm already existing + jvm = jvms[0]; + rc = jvm->AttachCurrentThread((void**)&env, nullptr); + + if (rc != JNI_OK) { + strcpy(g->Message, "Cannot attach jvm to the current thread"); + return true; + } // endif rc + + } else { + /*******************************************************************/ + /* Create a new jvm */ + /*******************************************************************/ + PSTRG jpop = new(g)STRING(g, 512, "-Djava.class.path=."); + char *cp = NULL; + char sep; + +#if defined(__WIN__) + sep = ';'; +#define N 1 + //#define N 2 + //#define N 3 +#else + sep = ':'; +#define N 1 +#endif + + // Add wrappers jar files + AddJars(jpop, sep); + + //================== prepare loading of Java VM ============================ + JavaVMInitArgs vm_args; // Initialization arguments + JavaVMOption* options = new JavaVMOption[N]; // JVM invocation options + + // where to find java .class + if (ClassPath && *ClassPath) { + jpop->Append(sep); + jpop->Append(ClassPath); + } // endif ClassPath + + // Java source will be compiled as a jar file installed in the plugin dir + jpop->Append(sep); + jpop->Append(GetPluginDir()); + jpop->Append("JdbcInterface.jar"); + + // All wrappers are pre-compiled in JavaWrappers.jar in the plugin dir + jpop->Append(sep); + jpop->Append(GetPluginDir()); + jpop->Append("JavaWrappers.jar"); + + if ((cp = getenv("CLASSPATH"))) { + jpop->Append(sep); + jpop->Append(cp); + } // endif cp + + if (trace) { + htrc("ClassPath=%s\n", ClassPath); + htrc("CLASSPATH=%s\n", cp); + htrc("%s\n", jpop->GetStr()); + } // endif trace + + options[0].optionString = jpop->GetStr(); +#if N == 2 + options[1].optionString = "-Xcheck:jni"; +#endif +#if N == 3 + options[1].optionString = "-Xms256M"; + options[2].optionString = "-Xmx512M"; +#endif +#if defined(_DEBUG) + vm_args.version = JNI_VERSION_1_2; // minimum Java version + rc = GetDefaultJavaVMInitArgs(&vm_args); +#else + vm_args.version = JNI_VERSION_1_6; // minimum Java version +#endif // _DEBUG + vm_args.nOptions = N; // number of options + vm_args.options = options; + vm_args.ignoreUnrecognized = false; // invalid options make the JVM init fail + + //=============== load and initialize Java VM and JNI interface ============= + rc = CreateJavaVM(&jvm, (void**)&env, &vm_args); // YES !! + delete options; // we then no longer need the initialisation options. + + switch (rc) { + case JNI_OK: + strcpy(g->Message, "VM successfully created"); + brc = false; + break; + case JNI_ERR: + strcpy(g->Message, "Initialising JVM failed: unknown error"); + break; + case JNI_EDETACHED: + strcpy(g->Message, "Thread detached from the VM"); + break; + case JNI_EVERSION: + strcpy(g->Message, "JNI version error"); + break; + case JNI_ENOMEM: + strcpy(g->Message, "Not enough memory"); + break; + case JNI_EEXIST: + strcpy(g->Message, "VM already created"); + break; + case JNI_EINVAL: + strcpy(g->Message, "Invalid arguments"); + break; + default: + sprintf(g->Message, "Unknown return code %d", (int)rc); + break; + } // endswitch rc + + if (trace) + htrc("%s\n", g->Message); + + if (brc) + return true; + + //=============== Display JVM version =============== + jint ver = env->GetVersion(); + printf("JVM Version %d.%d\n", ((ver >> 16) & 0x0f), (ver & 0x0f)); + } // endif rc + + // try to find the java wrapper class + jdi = env->FindClass(m_Wrap); + + if (jdi == nullptr) { + sprintf(g->Message, "ERROR: class %s not found!", m_Wrap); + return true; + } // endif jdi + +#if 0 // Suppressed because it does not make any usable change + if (b && jpath && *jpath) { + // Try to add that path the the jvm class path + jmethodID alp = env->GetStaticMethodID(jdi, "addLibraryPath", + "(Ljava/lang/String;)I"); + + if (alp == nullptr) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } else { + char *msg; + jstring path = env->NewStringUTF(jpath); + rc = env->CallStaticIntMethod(jdi, alp, path); + + if ((msg = Check(rc))) { + strcpy(g->Message, msg); + env->DeleteLocalRef(path); + return RC_FX; + } else switch (rc) { + case JNI_OK: + printf("jpath added\n"); + break; + case JNI_EEXIST: + printf("jpath already exist\n"); + break; + case JNI_ERR: + default: + strcpy(g->Message, "Error adding jpath"); + env->DeleteLocalRef(path); + return RC_FX; + } // endswitch rc + + env->DeleteLocalRef(path); + } // endif alp + + } // endif jpath +#endif // 0 + + // if class found, continue + jmethodID ctor = env->GetMethodID(jdi, "<init>", "(Z)V"); + + if (ctor == nullptr) { + sprintf(g->Message, "ERROR: %s constructor not found!", m_Wrap); + return true; + } else + job = env->NewObject(jdi, ctor, jt); + + if (job == nullptr) { + sprintf(g->Message, "%s class object not constructed!", m_Wrap); + return true; + } // endif job + + // If the object is successfully constructed, + // we can then search for the method we want to call, + // and invoke it for the object: + errid = env->GetMethodID(jdi, "GetErrmsg", "()Ljava/lang/String;"); + + if (env->ExceptionCheck()) { + strcpy(g->Message, "ERROR: method GetErrmsg() not found!"); + env->ExceptionDescribe(); + env->ExceptionClear(); + return true; + } // endif Check + + m_Opened = true; + return false; +} // end of Open + +/***********************************************************************/ +/* Disconnect connection */ +/***********************************************************************/ +void JAVAConn::Close() +{ + jint rc; + + if (m_Connected) { + jmethodID did = nullptr; + + // Could have been detached in case of join + rc = jvm->AttachCurrentThread((void**)&env, nullptr); + + if (gmID(m_G, did, DiscFunc, "()I")) + printf("%s\n", Msg); + else if (Check(env->CallIntMethod(job, did))) + printf("%s: %s\n", DiscFunc, Msg); + + m_Connected = false; + } // endif m_Connected + + if ((rc = jvm->DetachCurrentThread()) != JNI_OK) + printf("DetachCurrentThread: rc=%d\n", (int)rc); + + m_Opened = false; +} // end of Close diff --git a/storage/connect/javaconn.h b/storage/connect/javaconn.h new file mode 100644 index 00000000000..fba633945f0 --- /dev/null +++ b/storage/connect/javaconn.h @@ -0,0 +1,129 @@ +/***********************************************************************/ +/* JavaConn.h : header file for the Java connection classes. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Included C-definition files required by the interface. */ +/***********************************************************************/ +#include "block.h" +#include "jdbccat.h" + +/***********************************************************************/ +/* Java native interface. */ +/***********************************************************************/ +#include <jni.h> + +/***********************************************************************/ +/* Constants and defines. */ +/***********************************************************************/ +// Miscellaneous sizing info +#define MAX_NUM_OF_MSG 10 // Max number of error messages +//efine MAX_CURRENCY 30 // Max size of Currency($) string +#define MAX_TNAME_LEN 32 // Max size of table names +//efine MAX_FNAME_LEN 256 // Max size of field names +//efine MAX_STRING_INFO 256 // Max size of string from SQLGetInfo +//efine MAX_DNAME_LEN 256 // Max size of Recordset names +//efine MAX_CONNECT_LEN 512 // Max size of Connect string +//efine MAX_CURSOR_NAME 18 // Max size of a cursor name +#define DEFAULT_FIELD_TYPE 0 // TYPE_NULL + +#if !defined(__WIN__) +typedef unsigned char *PUCHAR; +#endif // !__WIN__ + +enum JCATINFO { + CAT_TAB = 1, // JDBC Tables + CAT_COL = 2, // JDBC Columns + CAT_KEY = 3, // JDBC PrimaryKeys +//CAT_STAT = 4, // SQLStatistics +//CAT_SPC = 5 // SQLSpecialColumns +}; + +/***********************************************************************/ +/* This structure is used to control the catalog functions. */ +/***********************************************************************/ +typedef struct tagJCATPARM { + JCATINFO Id; // Id to indicate function + PQRYRES Qrp; // Result set pointer + PCSZ DB; // Database (Schema) + PCSZ Tab; // Table name or pattern + PCSZ Pat; // Table type or column pattern +} JCATPARM; + +typedef jint(JNICALL *CRTJVM) (JavaVM **, void **, void *); +typedef jint(JNICALL *GETJVM) (JavaVM **, jsize, jsize *); +#if defined(_DEBUG) +typedef jint(JNICALL *GETDEF) (void *); +#endif // _DEBUG + +class JAVAConn; + +/***********************************************************************/ +/* JAVAConn class. */ +/***********************************************************************/ +class JAVAConn : public BLOCK { + friend class TDBJMG; +private: + JAVAConn(); // Standard (unused) constructor + +public: + // Constructor + JAVAConn(PGLOBAL g, PCSZ wrapper); + + // Set static variables + static void SetJVM(void) { + LibJvm = NULL; + CreateJavaVM = NULL; + GetCreatedJavaVMs = NULL; +#if defined(_DEBUG) + GetDefaultJavaVMInitArgs = NULL; +#endif // _DEBUG + } // end of SetJVM + + static void ResetJVM(void); + static bool GetJVM(PGLOBAL g); + + // Implementation +public: + //virtual ~JAVAConn(); + bool IsOpen(void) { return m_Opened; } + bool IsConnected(void) { return m_Connected; } + + // Java operations +protected: + bool gmID(PGLOBAL g, jmethodID& mid, const char *name, const char *sig); + bool Check(jint rc = 0); + +public: + virtual void AddJars(PSTRG jpop, char sep) = 0; + virtual bool Connect(PJPARM sop) = 0; + virtual bool Open(PGLOBAL g); + virtual bool MakeCursor(PGLOBAL g, PTDB tdbp, PCSZ options, + PCSZ filter, bool pipe) = 0; + virtual void Close(void); + +protected: + // Members +#if defined(__WIN__) + static HANDLE LibJvm; // Handle to the jvm DLL +#else // !__WIN__ + static void *LibJvm; // Handle for the jvm shared library +#endif // !__WIN__ + static CRTJVM CreateJavaVM; + static GETJVM GetCreatedJavaVMs; +#if defined(_DEBUG) + static GETDEF GetDefaultJavaVMInitArgs; +#endif // _DEBUG + PGLOBAL m_G; + JavaVM *jvm; // Pointer to the JVM (Java Virtual Machine) + JNIEnv *env; // Pointer to native interface + jclass jdi; // Pointer to the java wrapper class + jobject job; // The java wrapper class object + jmethodID errid; // The GetErrmsg method ID + bool m_Opened; + bool m_Connected; + PCSZ DiscFunc; + PCSZ Msg; + PCSZ m_Wrap; + int m_Rows; +}; // end of JAVAConn class definition diff --git a/storage/connect/jdbccat.h b/storage/connect/jdbccat.h index 0b87df8bb51..1210aff77d8 100644 --- a/storage/connect/jdbccat.h +++ b/storage/connect/jdbccat.h @@ -1,3 +1,6 @@ +#ifndef __JDBCCAT_H +#define __JDBCCAT_H + // Timeout and net wait defaults #define DEFAULT_LOGIN_TIMEOUT -1 // means do not set #define DEFAULT_QUERY_TIMEOUT -1 // means do not set @@ -8,9 +11,9 @@ typedef struct jdbc_parms { PCSZ Url; // Driver URL PCSZ User; // User connect info PCSZ Pwd; // Password connect info -//char *Properties; // Connection property list //int Cto; // Connect timeout //int Qto; // Query timeout + int Version; // Driver version int Fsize; // Fetch size bool Scrollable; // Scrollable cursor } JDBCPARM, *PJPARM; @@ -28,3 +31,5 @@ PQRYRES JDBCSrcCols(PGLOBAL g, PCSZ src, PJPARM sop); PQRYRES JDBCTables(PGLOBAL g, PCSZ db, PCSZ tabpat, PCSZ tabtyp, int maxres, bool info, PJPARM sop); PQRYRES JDBCDrivers(PGLOBAL g, int maxres, bool info); + +#endif // __JDBCCAT_H diff --git a/storage/connect/jdbconn.cpp b/storage/connect/jdbconn.cpp index 22e89a41efb..8cc35b9007d 100644 --- a/storage/connect/jdbconn.cpp +++ b/storage/connect/jdbconn.cpp @@ -53,38 +53,28 @@ #include "osutil.h" -#if defined(__WIN__) -extern "C" HINSTANCE s_hModule; // Saved module handle -#endif // __WIN__ -#define nullptr 0 +//#if defined(__WIN__) +//extern "C" HINSTANCE s_hModule; // Saved module handle +//#endif // __WIN__ +//#define nullptr 0 TYPCONV GetTypeConv(); int GetConvSize(); -extern char *JvmPath; // The connect_jvm_path global variable value -extern char *ClassPath; // The connect_class_path global variable value +//extern char *JvmPath; // The connect_jvm_path global variable value +//extern char *ClassPath; // The connect_class_path global variable value -char *GetJavaWrapper(void); // The connect_java_wrapper variable value - -/***********************************************************************/ -/* Static JDBConn objects. */ -/***********************************************************************/ -void *JDBConn::LibJvm = NULL; -CRTJVM JDBConn::CreateJavaVM = NULL; -GETJVM JDBConn::GetCreatedJavaVMs = NULL; -#if defined(_DEBUG) -GETDEF JDBConn::GetDefaultJavaVMInitArgs = NULL; -#endif // _DEBUG +//char *GetJavaWrapper(void); // The connect_java_wrapper variable value /***********************************************************************/ /* Some macro's (should be defined elsewhere to be more accessible) */ /***********************************************************************/ -#if defined(_DEBUG) -#define ASSERT(f) assert(f) -#define DEBUG_ONLY(f) (f) -#else // !_DEBUG -#define ASSERT(f) ((void)0) -#define DEBUG_ONLY(f) ((void)0) -#endif // !_DEBUG +//#if defined(_DEBUG) +//#define ASSERT(f) assert(f) +//#define DEBUG_ONLY(f) (f) +//#else // !_DEBUG +//#define ASSERT(f) ((void)0) +//#define DEBUG_ONLY(f) ((void)0) +//#endif // !_DEBUG // To avoid gcc warning int TranslateJDBCType(int stp, char *tn, int prec, int& len, char& v); @@ -239,11 +229,11 @@ PQRYRES JDBCColumns(PGLOBAL g, PCSZ db, PCSZ table, PCSZ colpat, FLD_SCALE, FLD_RADIX, FLD_NULL, FLD_REM}; unsigned int length[] = {0, 0, 0, 0, 6, 0, 10, 10, 6, 6, 6, 0}; bool b[] = {true, true, false, false, false, false, false, false, true, true, false, true}; - int i, n, ncol = 12; - PCOLRES crp; - PQRYRES qrp; + int i, n, ncol = 12; + PCOLRES crp; + PQRYRES qrp; JCATPARM *cap; - JDBConn *jcp = NULL; + JDBConn *jcp = NULL; /************************************************************************/ /* Do an evaluation of the result size. */ @@ -251,7 +241,7 @@ PQRYRES JDBCColumns(PGLOBAL g, PCSZ db, PCSZ table, PCSZ colpat, if (!info) { jcp = new(g)JDBConn(g, NULL); - if (jcp->Open(sjp) != RC_OK) // openReadOnly + noJDBCdialog + if (jcp->Connect(sjp)) // openReadOnly + noJDBCdialog return NULL; if (table && !strchr(table, '%')) { @@ -337,7 +327,7 @@ PQRYRES JDBCSrcCols(PGLOBAL g, PCSZ src, PJPARM sjp) PQRYRES qrp; JDBConn *jcp = new(g)JDBConn(g, NULL); - if (jcp->Open(sjp)) + if (jcp->Connect(sjp)) return NULL; if (strstr(src, "%s")) { @@ -379,7 +369,7 @@ PQRYRES JDBCTables(PGLOBAL g, PCSZ db, PCSZ tabpat, PCSZ tabtyp, /**********************************************************************/ jcp = new(g)JDBConn(g, NULL); - if (jcp->Open(sjp) == RC_FX) + if (jcp->Connect(sjp)) return NULL; if (!maxres) @@ -523,37 +513,16 @@ PQRYRES JDBCDrivers(PGLOBAL g, int maxres, bool info) /***********************************************************************/ /* JDBConn construction/destruction. */ /***********************************************************************/ -JDBConn::JDBConn(PGLOBAL g, TDBJDBC *tdbp) +JDBConn::JDBConn(PGLOBAL g, PCSZ wrapper) : JAVAConn(g, wrapper) { - m_G = g; - m_Tdb = tdbp; - jvm = nullptr; // Pointer to the JVM (Java Virtual Machine) - env= nullptr; // Pointer to native interface - jdi = nullptr; // Pointer to the java wrapper class - job = nullptr; // The java wrapper class object xqid = xuid = xid = grs = readid = fetchid = typid = errid = nullptr; prepid = xpid = pcid = nullptr; chrfldid = intfldid = dblfldid = fltfldid = bigfldid = nullptr; objfldid = datfldid = timfldid = tspfldid = nullptr; - //m_LoginTimeout = DEFAULT_LOGIN_TIMEOUT; -//m_QueryTimeout = DEFAULT_QUERY_TIMEOUT; -//m_UpdateOptions = 0; - Msg = NULL; - m_Wrap = (tdbp && tdbp->WrapName) ? tdbp->WrapName : GetJavaWrapper(); - - if (!strchr(m_Wrap, '/')) { - // Add the wrapper package name - char *wn = (char*)PlugSubAlloc(g, NULL, strlen(m_Wrap) + 10); - m_Wrap = strcat(strcpy(wn, "wrappers/"), m_Wrap); - } // endif m_Wrap - -//m_Driver = NULL; -//m_Url = NULL; -//m_User = NULL; -//m_Pwd = NULL; + DiscFunc = "JdbcDisconnect"; m_Ncol = 0; m_Aff = 0; - m_Rows = 0; + //m_Rows = 0; m_Fetch = 0; m_RowsetSize = 0; m_Updatable = true; @@ -563,7 +532,6 @@ JDBConn::JDBConn(PGLOBAL g, TDBJDBC *tdbp) m_Opened = false; m_IDQuoteChar[0] = '"'; m_IDQuoteChar[1] = 0; - //*m_ErrMsg = '\0'; } // end of JDBConn //JDBConn::~JDBConn() @@ -574,55 +542,6 @@ JDBConn::JDBConn(PGLOBAL g, TDBJDBC *tdbp) // } // end of ~JDBConn /***********************************************************************/ -/* Screen for errors. */ -/***********************************************************************/ -bool JDBConn::Check(jint rc) -{ - jstring s; - - if (env->ExceptionCheck()) { - jthrowable exc = env->ExceptionOccurred(); - jmethodID tid = env->GetMethodID(env->FindClass("java/lang/Object"), - "toString", "()Ljava/lang/String;"); - - if (exc != nullptr && tid != nullptr) { - jstring s = (jstring)env->CallObjectMethod(exc, tid); - const char *utf = env->GetStringUTFChars(s, (jboolean)false); - env->DeleteLocalRef(s); - Msg = PlugDup(m_G, utf); - } else - Msg = "Exception occured"; - - env->ExceptionClear(); - } else if (rc < 0) { - s = (jstring)env->CallObjectMethod(job, errid); - Msg = (char*)env->GetStringUTFChars(s, (jboolean)false); - } else - Msg = NULL; - - return (Msg != NULL); -} // end of Check - -/***********************************************************************/ -/* Get MethodID if not exists yet. */ -/***********************************************************************/ -bool JDBConn::gmID(PGLOBAL g, jmethodID& mid, const char *name, const char *sig) -{ - if (mid == nullptr) { - mid = env->GetMethodID(jdi, name, sig); - - if (Check()) { - strcpy(g->Message, Msg); - return true; - } else - return false; - - } else - return false; - -} // end of gmID - -/***********************************************************************/ /* Utility routine. */ /***********************************************************************/ int JDBConn::GetMaxValue(int n) @@ -641,381 +560,52 @@ int JDBConn::GetMaxValue(int n) } // end of GetMaxValue /***********************************************************************/ -/* Reset the JVM library. */ +/* AddJars: add some jar file to the Class path. */ /***********************************************************************/ -void JDBConn::ResetJVM(void) +void JDBConn::AddJars(PSTRG jpop, char sep) { - if (LibJvm) { -#if defined(__WIN__) - FreeLibrary((HMODULE)LibJvm); -#else // !__WIN__ - dlclose(LibJvm); -#endif // !__WIN__ - LibJvm = NULL; - CreateJavaVM = NULL; - GetCreatedJavaVMs = NULL; -#if defined(_DEBUG) - GetDefaultJavaVMInitArgs = NULL; -#endif // _DEBUG - } // endif LibJvm - -} // end of ResetJVM +#if defined(DEVELOPMENT) + jpop->Append( + ";C:/Jconnectors/postgresql-9.4.1208.jar" + ";C:/Oracle/ojdbc7.jar" + ";C:/Apache/commons-dbcp2-2.1.1/commons-dbcp2-2.1.1.jar" + ";C:/Apache/commons-pool2-2.4.2/commons-pool2-2.4.2.jar" + ";C:/Apache/commons-logging-1.2/commons-logging-1.2.jar" + ";C:/Jconnectors/mysql-connector-java-6.0.2-bin.jar" + ";C:/Jconnectors/mariadb-java-client-2.0.1.jar" + ";C:/Jconnectors/sqljdbc42.jar"); +#endif // DEVELOPMENT +} // end of AddJars /***********************************************************************/ -/* Dynamically link the JVM library. */ -/* The purpose of this function is to allow using the CONNECT plugin */ -/* for other table types when the Java JDK is not installed. */ +/* Connect: connect to a data source. */ /***********************************************************************/ -bool JDBConn::GetJVM(PGLOBAL g) -{ - int ntry; - - if (!LibJvm) { - char soname[512]; - -#if defined(__WIN__) - for (ntry = 0; !LibJvm && ntry < 3; ntry++) { - if (!ntry && JvmPath) { - strcat(strcpy(soname, JvmPath), "\\jvm.dll"); - ntry = 3; // No other try - } else if (ntry < 2 && getenv("JAVA_HOME")) { - strcpy(soname, getenv("JAVA_HOME")); - - if (ntry == 1) - strcat(soname, "\\jre"); - - strcat(soname, "\\bin\\client\\jvm.dll"); - } else { - // Try to find it through the registry - char version[16]; - char javaKey[64] = "SOFTWARE\\JavaSoft\\Java Runtime Environment"; - LONG rc; - DWORD BufferSize = 16; - - strcpy(soname, "jvm.dll"); // In case it fails - - if ((rc = RegGetValue(HKEY_LOCAL_MACHINE, javaKey, "CurrentVersion", - RRF_RT_ANY, NULL, (PVOID)&version, &BufferSize)) == ERROR_SUCCESS) { - strcat(strcat(javaKey, "\\"), version); - BufferSize = sizeof(soname); - - if ((rc = RegGetValue(HKEY_LOCAL_MACHINE, javaKey, "RuntimeLib", - RRF_RT_ANY, NULL, (PVOID)&soname, &BufferSize)) != ERROR_SUCCESS) - printf("RegGetValue: rc=%ld\n", rc); - - } // endif rc - - ntry = 3; // Try this only once - } // endelse - - // Load the desired shared library - LibJvm = LoadLibrary(soname); - } // endfor ntry - - // Get the needed entries - if (!LibJvm) { - char buf[256]; - DWORD rc = GetLastError(); - - sprintf(g->Message, MSG(DLL_LOAD_ERROR), rc, soname); - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, NULL, rc, 0, - (LPTSTR)buf, sizeof(buf), NULL); - strcat(strcat(g->Message, ": "), buf); - } else if (!(CreateJavaVM = (CRTJVM)GetProcAddress((HINSTANCE)LibJvm, - "JNI_CreateJavaVM"))) { - sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), "JNI_CreateJavaVM"); - FreeLibrary((HMODULE)LibJvm); - LibJvm = NULL; - } else if (!(GetCreatedJavaVMs = (GETJVM)GetProcAddress((HINSTANCE)LibJvm, - "JNI_GetCreatedJavaVMs"))) { - sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), "JNI_GetCreatedJavaVMs"); - FreeLibrary((HMODULE)LibJvm); - LibJvm = NULL; -#if defined(_DEBUG) - } else if (!(GetDefaultJavaVMInitArgs = (GETDEF)GetProcAddress((HINSTANCE)LibJvm, - "JNI_GetDefaultJavaVMInitArgs"))) { - sprintf(g->Message, MSG(PROCADD_ERROR), GetLastError(), - "JNI_GetDefaultJavaVMInitArgs"); - FreeLibrary((HMODULE)LibJvm); - LibJvm = NULL; -#endif // _DEBUG - } // endif LibJvm -#else // !__WIN__ - const char *error = NULL; - - for (ntry = 0; !LibJvm && ntry < 2; ntry++) { - if (!ntry && JvmPath) { - strcat(strcpy(soname, JvmPath), "/libjvm.so"); - ntry = 2; - } else if (!ntry && getenv("JAVA_HOME")) { - // TODO: Replace i386 by a better guess - strcat(strcpy(soname, getenv("JAVA_HOME")), "/jre/lib/i386/client/libjvm.so"); - } else { // Will need LD_LIBRARY_PATH to be set - strcpy(soname, "libjvm.so"); - ntry = 2; - } // endelse - - LibJvm = dlopen(soname, RTLD_LAZY); - } // endfor ntry - - // Load the desired shared library - if (!LibJvm) { - error = dlerror(); - sprintf(g->Message, MSG(SHARED_LIB_ERR), soname, SVP(error)); - } else if (!(CreateJavaVM = (CRTJVM)dlsym(LibJvm, "JNI_CreateJavaVM"))) { - error = dlerror(); - sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_CreateJavaVM", SVP(error)); - dlclose(LibJvm); - LibJvm = NULL; - } else if (!(GetCreatedJavaVMs = (GETJVM)dlsym(LibJvm, "JNI_GetCreatedJavaVMs"))) { - error = dlerror(); - sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_GetCreatedJavaVMs", SVP(error)); - dlclose(LibJvm); - LibJvm = NULL; -#if defined(_DEBUG) - } else if (!(GetDefaultJavaVMInitArgs = (GETDEF)dlsym(LibJvm, - "JNI_GetDefaultJavaVMInitArgs"))) { - error = dlerror(); - sprintf(g->Message, MSG(GET_FUNC_ERR), "JNI_GetDefaultJavaVMInitArgs", SVP(error)); - dlclose(LibJvm); - LibJvm = NULL; -#endif // _DEBUG - } // endif LibJvm -#endif // !__WIN__ - - } // endif LibJvm - - return LibJvm == NULL; -} // end of GetJVM - -/***********************************************************************/ -/* Open: connect to a data source. */ -/***********************************************************************/ -int JDBConn::Open(PJPARM sop) +bool JDBConn::Connect(PJPARM sop) { int irc = RC_FX; bool err = false; + jint rc; jboolean jt = (trace > 0); PGLOBAL& g = m_G; - // Link or check whether jvm library was linked - if (GetJVM(g)) - return RC_FX; - - // Firstly check whether the jvm was already created - JavaVM* jvms[1]; - jsize jsz; - jint rc = GetCreatedJavaVMs(jvms, 1, &jsz); - - if (rc == JNI_OK && jsz == 1) { - // jvm already existing - jvm = jvms[0]; - rc = jvm->AttachCurrentThread((void**)&env, nullptr); - - if (rc != JNI_OK) { - strcpy(g->Message, "Cannot attach jvm to the current thread"); - return RC_FX; - } // endif rc - - } else { - /*******************************************************************/ - /* Create a new jvm */ - /*******************************************************************/ - PSTRG jpop = new(g)STRING(g, 512, "-Djava.class.path=."); - char *cp = NULL; - char sep; - -#if defined(__WIN__) - sep = ';'; -#define N 1 -//#define N 2 -//#define N 3 -#else - sep = ':'; -#define N 1 -#endif - - // Java source will be compiled as a jar file installed in the plugin dir - jpop->Append(sep); - jpop->Append(GetPluginDir()); - jpop->Append("JdbcInterface.jar"); - - // All wrappers are pre-compiled in JavaWrappers.jar in the plugin dir - jpop->Append(sep); - jpop->Append(GetPluginDir()); - jpop->Append("JavaWrappers.jar"); - - //================== prepare loading of Java VM ============================ - JavaVMInitArgs vm_args; // Initialization arguments - JavaVMOption* options = new JavaVMOption[N]; // JVM invocation options - - // where to find java .class - if (ClassPath && *ClassPath) { - jpop->Append(sep); - jpop->Append(ClassPath); - } // endif ClassPath - - if ((cp = getenv("CLASSPATH"))) { - jpop->Append(sep); - jpop->Append(cp); - } // endif cp - - if (trace) { - htrc("ClassPath=%s\n", ClassPath); - htrc("CLASSPATH=%s\n", cp); - htrc("%s\n", jpop->GetStr()); - } // endif trace - - options[0].optionString = jpop->GetStr(); -#if N == 2 - options[1].optionString = "-Xcheck:jni"; -#endif -#if N == 3 - options[1].optionString = "-Xms256M"; - options[2].optionString = "-Xmx512M"; -#endif -#if defined(_DEBUG) - vm_args.version = JNI_VERSION_1_2; // minimum Java version - rc = GetDefaultJavaVMInitArgs(&vm_args); -#else - vm_args.version = JNI_VERSION_1_6; // minimum Java version -#endif // _DEBUG - vm_args.nOptions = N; // number of options - vm_args.options = options; - vm_args.ignoreUnrecognized = false; // invalid options make the JVM init fail - - //=============== load and initialize Java VM and JNI interface ============= - rc = CreateJavaVM(&jvm, (void**)&env, &vm_args); // YES !! - delete options; // we then no longer need the initialisation options. - - switch (rc) { - case JNI_OK: - strcpy(g->Message, "VM successfully created"); - irc = RC_OK; - break; - case JNI_ERR: - strcpy(g->Message, "Initialising JVM failed: unknown error"); - break; - case JNI_EDETACHED: - strcpy(g->Message, "Thread detached from the VM"); - break; - case JNI_EVERSION: - strcpy(g->Message, "JNI version error"); - break; - case JNI_ENOMEM: - strcpy(g->Message, "Not enough memory"); - break; - case JNI_EEXIST: - strcpy(g->Message, "VM already created"); - break; - case JNI_EINVAL: - strcpy(g->Message, "Invalid arguments"); - break; - default: - sprintf(g->Message, "Unknown return code %d", (int)rc); - break; - } // endswitch rc - - if (trace) - htrc("%s\n", g->Message); - - if (irc != RC_OK) - return irc; - - //=============== Display JVM version =============== - jint ver = env->GetVersion(); - printf("JVM Version %d.%d\n", ((ver>>16)&0x0f), (ver&0x0f)); - } // endif rc - - // try to find the java wrapper class - jdi = env->FindClass(m_Wrap); - - if (jdi == nullptr) { - sprintf(g->Message, "ERROR: class %s not found!", m_Wrap); - return RC_FX; - } // endif jdi - -#if 0 // Suppressed because it does not make any usable change - if (b && jpath && *jpath) { - // Try to add that path the the jvm class path - jmethodID alp = env->GetStaticMethodID(jdi, "addLibraryPath", - "(Ljava/lang/String;)I"); - - if (alp == nullptr) { - env->ExceptionDescribe(); - env->ExceptionClear(); - } else { - char *msg; - jstring path = env->NewStringUTF(jpath); - rc = env->CallStaticIntMethod(jdi, alp, path); - - if ((msg = Check(rc))) { - strcpy(g->Message, msg); - env->DeleteLocalRef(path); - return RC_FX; - } else switch (rc) { - case JNI_OK: - printf("jpath added\n"); - break; - case JNI_EEXIST: - printf("jpath already exist\n"); - break; - case JNI_ERR: - default: - strcpy(g->Message, "Error adding jpath"); - env->DeleteLocalRef(path); - return RC_FX; - } // endswitch rc - - env->DeleteLocalRef(path); - } // endif alp - - } // endif jpath -#endif // 0 - - // if class found, continue - jmethodID ctor = env->GetMethodID(jdi, "<init>", "(Z)V"); - - if (ctor == nullptr) { - sprintf(g->Message, "ERROR: %s constructor not found!", m_Wrap); - return RC_FX; - } else - job = env->NewObject(jdi, ctor, jt); - - // If the object is successfully constructed, - // we can then search for the method we want to call, - // and invoke it for the object: - if (job == nullptr) { - sprintf(g->Message, "%s class object not constructed!", m_Wrap); - return RC_FX; - } // endif job - - errid = env->GetMethodID(jdi, "GetErrmsg", "()Ljava/lang/String;"); - - if (env->ExceptionCheck()) { - strcpy(g->Message, "ERROR: method GetErrmsg() not found!"); - env->ExceptionDescribe(); - env->ExceptionClear(); - return RC_FX; - } // endif Check + /*******************************************************************/ + /* Create or attach a JVM. */ + /*******************************************************************/ + if (Open(g)) + return true; if (!sop) // DRIVER catalog table - return RC_OK; + return false; jmethodID cid = nullptr; if (gmID(g, cid, "JdbcConnect", "([Ljava/lang/String;IZ)I")) - return RC_FX; + return true; // Build the java string array jobjectArray parms = env->NewObjectArray(4, // constructs java array of 4 env->FindClass("java/lang/String"), NULL); // Strings -//m_Driver = sop->Driver; -//m_Url = sop->Url; -//m_User = sop->User; -//m_Pwd = sop->Pwd; m_Scrollable = sop->Scrollable; m_RowsetSize = sop->Fsize; //m_LoginTimeout = sop->Cto; @@ -1035,9 +625,6 @@ int JDBConn::Open(PJPARM sop) if (sop->Pwd) env->SetObjectArrayElement(parms, 3, env->NewStringUTF(sop->Pwd)); -//if (sop->Properties) -// env->SetObjectArrayElement(parms, 4, env->NewStringUTF(sop->Properties)); - // call method rc = env->CallIntMethod(job, cid, parms, m_RowsetSize, m_Scrollable); err = Check(rc); @@ -1045,7 +632,7 @@ int JDBConn::Open(PJPARM sop) if (err) { sprintf(g->Message, "Connecting: %s rc=%d", Msg, (int)rc); - return RC_FX; + return true; } // endif Msg jmethodID qcid = nullptr; @@ -1064,17 +651,18 @@ int JDBConn::Open(PJPARM sop) } // endif qcid if (gmID(g, typid, "ColumnType", "(ILjava/lang/String;)I")) - return RC_FX; + return true; else m_Opened = true; - return RC_OK; -} // end of Open + return false; +} // end of Connect + /***********************************************************************/ /* Execute an SQL command. */ /***********************************************************************/ -int JDBConn::ExecSQLcommand(PCSZ sql) +int JDBConn::ExecuteCommand(PCSZ sql) { int rc; jint n; @@ -1110,7 +698,7 @@ int JDBConn::ExecSQLcommand(PCSZ sql) } // endif ncol return rc; -} // end of ExecSQLcommand +} // end of ExecuteCommand /***********************************************************************/ /* Fetch next row. */ @@ -1170,39 +758,13 @@ int JDBConn::Rewind(PCSZ sql) jboolean b = env->CallBooleanMethod(job, fetchid, 0); rbuf = m_Rows; - } else if (ExecSQLcommand(sql) != RC_FX) + } else if (ExecuteCommand(sql) != RC_FX) rbuf = 0; return rbuf; } // end of Rewind /***********************************************************************/ -/* Disconnect connection */ -/***********************************************************************/ -void JDBConn::Close() -{ - if (m_Opened) { - jint rc; - jmethodID did = nullptr; - - // Could have been detached in case of join - rc = jvm->AttachCurrentThread((void**)&env, nullptr); - - if (gmID(m_G, did, "JdbcDisconnect", "()I")) - printf("%s\n", Msg); - else if (Check(env->CallIntMethod(job, did))) - printf("jdbcDisconnect: %s\n", Msg); - - if ((rc = jvm->DetachCurrentThread()) != JNI_OK) - printf("DetachCurrentThread: rc=%d\n", (int)rc); - - //rc = jvm->DestroyJavaVM(); - m_Opened = false; - } // endif m_Opened - -} // end of Close - -/***********************************************************************/ /* Retrieve and set the column value from the result set. */ /***********************************************************************/ void JDBConn::SetColumnValue(int rank, PSZ name, PVAL val) @@ -1424,7 +986,7 @@ int JDBConn::ExecuteUpdate(PCSZ sql) /***********************************************************************/ /* Get the number of lines of the result set. */ /***********************************************************************/ -int JDBConn::GetResultSize(PCSZ sql, JDBCCOL *colp) +int JDBConn::GetResultSize(PCSZ sql, PCOL colp) { int rc, n = 0; @@ -1565,53 +1127,6 @@ bool JDBConn::SetParam(JDBCCOL *colp) return rc; } // end of SetParam -#if 0 - /***********************************************************************/ - /* Get the list of Data Sources and set it in qrp. */ - /***********************************************************************/ - bool JDBConn::GetDataSources(PQRYRES qrp) - { - bool rv = false; - UCHAR *dsn, *des; - UWORD dir = SQL_FETCH_FIRST; - SWORD n1, n2, p1, p2; - PCOLRES crp1 = qrp->Colresp, crp2 = qrp->Colresp->Next; - RETCODE rc; - - n1 = crp1->Clen; - n2 = crp2->Clen; - - try { - rc = SQLAllocEnv(&m_henv); - - if (!Check(rc)) - ThrowDJX(rc, "SQLAllocEnv"); // Fatal - - for (int i = 0; i < qrp->Maxres; i++) { - dsn = (UCHAR*)crp1->Kdata->GetValPtr(i); - des = (UCHAR*)crp2->Kdata->GetValPtr(i); - rc = SQLDataSources(m_henv, dir, dsn, n1, &p1, des, n2, &p2); - - if (rc == SQL_NO_DATA_FOUND) - break; - else if (!Check(rc)) - ThrowDJX(rc, "SQLDataSources"); - - qrp->Nblin++; - dir = SQL_FETCH_NEXT; - } // endfor i - - } - catch (DJX *x) { - sprintf(m_G->Message, "%s: %s", x->m_Msg, x->GetErrorMessage(0)); - rv = true; - } // end try/catch - - Close(); - return rv; - } // end of GetDataSources -#endif // 0 - /***********************************************************************/ /* Get the list of Drivers and set it in qrp. */ /***********************************************************************/ @@ -1677,7 +1192,7 @@ bool JDBConn::SetParam(JDBCCOL *colp) jint *n = nullptr; jstring label; jmethodID colid = nullptr; - int rc = ExecSQLcommand(src); + int rc = ExecuteCommand(src); if (rc == RC_NF) { strcpy(g->Message, "Srcdef is not returning a result set"); @@ -2002,10 +1517,10 @@ bool JDBConn::SetParam(JDBCCOL *colp) /***********************************************************************/ /* Allocate a CONNECT result structure from the JDBC result. */ /***********************************************************************/ - PQRYRES JDBConn::AllocateResult(PGLOBAL g) + PQRYRES JDBConn::AllocateResult(PGLOBAL g, PTDB tdbp) { bool uns; - PJDBCCOL colp; + PCOL colp; PCOLRES *pcrp, crp; PQRYRES qrp; @@ -2030,8 +1545,7 @@ bool JDBConn::SetParam(JDBCCOL *colp) qrp->Nblin = 0; qrp->Cursor = 0; - for (colp = (PJDBCCOL)m_Tdb->Columns; colp; - colp = (PJDBCCOL)colp->GetNext()) + for (colp = tdbp->GetColumns(); colp; colp = colp->GetNext()) if (!colp->IsSpecial()) { *pcrp = (PCOLRES)PlugSubAlloc(g, NULL, sizeof(COLRES)); crp = *pcrp; @@ -2059,10 +1573,9 @@ bool JDBConn::SetParam(JDBCCOL *colp) memset(crp->Nulls, ' ', m_Rows); } // endelse Nullable - colp->SetCrp(crp); + ((EXTCOL*)colp)->SetCrp(crp); } // endif colp *pcrp = NULL; - //qrp->Nblin = n; return qrp; } // end of AllocateResult diff --git a/storage/connect/jdbconn.h b/storage/connect/jdbconn.h index 73271c8f5be..56f318d238b 100644 --- a/storage/connect/jdbconn.h +++ b/storage/connect/jdbconn.h @@ -1,61 +1,7 @@ /***********************************************************************/ /* JDBConn.h : header file for the JDBC connection classes. */ /***********************************************************************/ -//nclude <windows.h> /* Windows include file */ -//nclude <windowsx.h> /* Message crackers */ - -/***********************************************************************/ -/* Included C-definition files required by the interface. */ -/***********************************************************************/ -#include "block.h" - -/***********************************************************************/ -/* JDBC interface. */ -/***********************************************************************/ -#include <jni.h> - -/***********************************************************************/ -/* Constants and defines. */ -/***********************************************************************/ -// Miscellaneous sizing info -#define MAX_NUM_OF_MSG 10 // Max number of error messages -//efine MAX_CURRENCY 30 // Max size of Currency($) string -#define MAX_TNAME_LEN 32 // Max size of table names -//efine MAX_FNAME_LEN 256 // Max size of field names -//efine MAX_STRING_INFO 256 // Max size of string from SQLGetInfo -//efine MAX_DNAME_LEN 256 // Max size of Recordset names -//efine MAX_CONNECT_LEN 512 // Max size of Connect string -//efine MAX_CURSOR_NAME 18 // Max size of a cursor name -#define DEFAULT_FIELD_TYPE 0 // TYPE_NULL - -#if !defined(__WIN__) -typedef unsigned char *PUCHAR; -#endif // !__WIN__ - -enum JCATINFO { - CAT_TAB = 1, // JDBC Tables - CAT_COL = 2, // JDBC Columns - CAT_KEY = 3, // JDBC PrimaryKeys -//CAT_STAT = 4, // SQLStatistics -//CAT_SPC = 5 // SQLSpecialColumns -}; - -/***********************************************************************/ -/* This structure is used to control the catalog functions. */ -/***********************************************************************/ -typedef struct tagJCATPARM { - JCATINFO Id; // Id to indicate function - PQRYRES Qrp; // Result set pointer - PCSZ DB; // Database (Schema) - PCSZ Tab; // Table name or pattern - PCSZ Pat; // Table type or column pattern -} JCATPARM; - -typedef jint(JNICALL *CRTJVM) (JavaVM **, void **, void *); -typedef jint(JNICALL *GETJVM) (JavaVM **, jsize, jsize *); -#if defined(_DEBUG) -typedef jint(JNICALL *GETDEF) (void *); -#endif // _DEBUG +#include "javaconn.h" // JDBC connection to a data source class TDBJDBC; @@ -66,7 +12,7 @@ class TDBXJDC; /***********************************************************************/ /* JDBConn class. */ /***********************************************************************/ -class JDBConn : public BLOCK { +class JDBConn : public JAVAConn { friend class TDBJDBC; friend class TDBXJDC; //friend PQRYRES GetColumnInfo(PGLOBAL, char*&, char *, int, PVBLK&); @@ -74,118 +20,80 @@ private: JDBConn(); // Standard (unused) constructor public: - JDBConn(PGLOBAL g, TDBJDBC *tdbp); + // Constructor + JDBConn(PGLOBAL g, PCSZ wrapper); - int Open(PJPARM sop); - int Rewind(PCSZ sql); - void Close(void); - PQRYRES AllocateResult(PGLOBAL g); + virtual void AddJars(PSTRG jpop, char sep); + PQRYRES AllocateResult(PGLOBAL g, PTDB tdbp); // Attributes public: - char *GetQuoteChar(void) { return m_IDQuoteChar; } - // Database successfully opened? - bool IsOpen(void) { return m_Opened; } -//PSZ GetStringInfo(ushort infotype); - int GetMaxValue(int infotype); -//PSZ GetConnect(void) { return m_Connect; } + char *GetQuoteChar(void) { return m_IDQuoteChar; } + virtual int GetMaxValue(int infotype); public: // Operations - //void SetLoginTimeout(DWORD sec) {m_LoginTimeout = sec;} - //void SetQueryTimeout(DWORD sec) {m_QueryTimeout = sec;} - //void SetUserName(PSZ user) {m_User = user;} - //void SetUserPwd(PSZ pwd) {m_Pwd = pwd;} - int GetResultSize(PCSZ sql, JDBCCOL *colp); - int ExecuteQuery(PCSZ sql); - int ExecuteUpdate(PCSZ sql); - int Fetch(int pos = 0); + virtual bool Connect(PJPARM sop); + virtual bool MakeCursor(PGLOBAL g, PTDB tdbp, PCSZ options, + PCSZ filter, bool pipe) {return true;} + virtual int GetResultSize(PCSZ sql, PCOL colp); + virtual int ExecuteCommand(PCSZ sql); + virtual int ExecuteQuery(PCSZ sql); + virtual int ExecuteUpdate(PCSZ sql); + virtual int Fetch(int pos = 0); + virtual void SetColumnValue(int rank, PSZ name, PVAL val); + + // Jdbc operations bool PrepareSQL(PCSZ sql); - int ExecuteSQL(void); + int ExecuteSQL(void); // Prepared statement bool SetParam(JDBCCOL *colp); - int ExecSQLcommand(PCSZ sql); - void SetColumnValue(int rank, PSZ name, PVAL val); int GetCatInfo(JCATPARM *cap); - //bool GetDataSources(PQRYRES qrp); bool GetDrivers(PQRYRES qrp); PQRYRES GetMetaData(PGLOBAL g, PCSZ src); - -public: - // Set static variables - static void SetJVM(void) { - LibJvm = NULL; - CreateJavaVM = NULL; - GetCreatedJavaVMs = NULL; -#if defined(_DEBUG) - GetDefaultJavaVMInitArgs = NULL; -#endif // _DEBUG - } // end of SetJVM - - static void ResetJVM(void); - static bool GetJVM(PGLOBAL g); + int Rewind(PCSZ sql); // Implementation public: //virtual ~JDBConn(); - // JDBC operations -protected: - bool gmID(PGLOBAL g, jmethodID& mid, const char *name, const char *sig); - bool Check(jint rc = 0); -//void ThrowDJX(int rc, PSZ msg/*, HSTMT hstmt = SQL_NULL_HSTMT*/); -//void ThrowDJX(PSZ msg); -//void Free(void); - protected: // Members -#if defined(__WIN__) - static HANDLE LibJvm; // Handle to the jvm DLL -#else // !__WIN__ - static void *LibJvm; // Handle for the jvm shared library -#endif // !__WIN__ - static CRTJVM CreateJavaVM; - static GETJVM GetCreatedJavaVMs; -#if defined(_DEBUG) - static GETDEF GetDefaultJavaVMInitArgs; -#endif // _DEBUG - PGLOBAL m_G; - TDBJDBC *m_Tdb; - JavaVM *jvm; // Pointer to the JVM (Java Virtual Machine) - JNIEnv *env; // Pointer to native interface - jclass jdi; // Pointer to the java wrapper class - jobject job; // The java wrapper class object - jmethodID xqid; // The ExecuteQuery method ID - jmethodID xuid; // The ExecuteUpdate method ID - jmethodID xid; // The Execute method ID - jmethodID grs; // The GetResult method ID - jmethodID readid; // The ReadNext method ID - jmethodID fetchid; // The Fetch method ID - jmethodID typid; // The ColumnType method ID - jmethodID prepid; // The CreatePrepStmt method ID - jmethodID xpid; // The ExecutePrep method ID - jmethodID pcid; // The ClosePrepStmt method ID - jmethodID errid; // The GetErrmsg method ID - jmethodID objfldid; // The ObjectField method ID - jmethodID chrfldid; // The StringField method ID - jmethodID intfldid; // The IntField method ID - jmethodID dblfldid; // The DoubleField method ID - jmethodID fltfldid; // The FloatField method ID - jmethodID datfldid; // The DateField method ID - jmethodID timfldid; // The TimeField method ID - jmethodID tspfldid; // The TimestampField method ID - jmethodID bigfldid; // The BigintField method ID - PCSZ Msg; - char *m_Wrap; +#if 0 + JavaVM *jvm; // Pointer to the JVM (Java Virtual Machine) + JNIEnv *env; // Pointer to native interface + jclass jdi; // Pointer to the java wrapper class + jobject job; // The java wrapper class object + jmethodID errid; // The GetErrmsg method ID +#endif // 0 + jmethodID xqid; // The ExecuteQuery method ID + jmethodID xuid; // The ExecuteUpdate method ID + jmethodID xid; // The Execute method ID + jmethodID grs; // The GetResult method ID + jmethodID readid; // The ReadNext method ID + jmethodID fetchid; // The Fetch method ID + jmethodID typid; // The ColumnType method ID + jmethodID prepid; // The CreatePrepStmt method ID + jmethodID xpid; // The ExecutePrep method ID + jmethodID pcid; // The ClosePrepStmt method ID + jmethodID objfldid; // The ObjectField method ID + jmethodID chrfldid; // The StringField method ID + jmethodID intfldid; // The IntField method ID + jmethodID dblfldid; // The DoubleField method ID + jmethodID fltfldid; // The FloatField method ID + jmethodID datfldid; // The DateField method ID + jmethodID timfldid; // The TimeField method ID + jmethodID tspfldid; // The TimestampField method ID + jmethodID bigfldid; // The BigintField method ID +// PCSZ Msg; +// PCSZ m_Wrap; char m_IDQuoteChar[2]; PCSZ m_Pwd; int m_Ncol; int m_Aff; - int m_Rows; int m_Fetch; int m_RowsetSize; jboolean m_Updatable; jboolean m_Transact; jboolean m_Scrollable; - bool m_Opened; bool m_Full; }; // end of JDBConn class definition diff --git a/storage/connect/jmgfam.cpp b/storage/connect/jmgfam.cpp new file mode 100644 index 00000000000..c7115cdd720 --- /dev/null +++ b/storage/connect/jmgfam.cpp @@ -0,0 +1,357 @@ +/************ JMONGO FAM C++ Program Source Code File (.CPP) ***********/ +/* PROGRAM NAME: jmgfam.cpp */ +/* ------------- */ +/* Version 1.0 */ +/* */ +/* COPYRIGHT: */ +/* ---------- */ +/* (C) Copyright to the author Olivier BERTRAND 20017 */ +/* */ +/* WHAT THIS PROGRAM DOES: */ +/* ----------------------- */ +/* This program are the Java MongoDB access method classes. */ +/* */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant sections of the System header files. */ +/***********************************************************************/ +#include "my_global.h" +#if defined(__WIN__) +//#include <io.h> +//#include <fcntl.h> +//#include <errno.h> +#if defined(__BORLANDC__) +#define __MFC_COMPAT__ // To define min/max as macro +#endif // __BORLANDC__ +//#include <windows.h> +#else // !__WIN__ +#if defined(UNIX) || defined(UNIV_LINUX) +//#include <errno.h> +#include <unistd.h> +//#if !defined(sun) // Sun has the ftruncate fnc. +//#define USETEMP // Force copy mode for DELETE +//#endif // !sun +#else // !UNIX +//#include <io.h> +#endif // !UNIX +//#include <fcntl.h> +#endif // !__WIN__ + +/***********************************************************************/ +/* Include application header files: */ +/* global.h is header containing all global declarations. */ +/* plgdbsem.h is header containing the DB application declarations. */ +/* filamtxt.h is header containing the file AM classes declarations. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "reldef.h" +#include "filamtxt.h" +#include "tabdos.h" +#include "tabjson.h" +#include "jmgfam.h" + +#if defined(UNIX) || defined(UNIV_LINUX) +#include "osutil.h" +//#define _fileno fileno +//#define _O_RDONLY O_RDONLY +#endif + +/* --------------------------- Class JMGFAM -------------------------- */ + +/***********************************************************************/ +/* Constructors. */ +/***********************************************************************/ +JMGFAM::JMGFAM(PJDEF tdp) : DOSFAM((PDOSDEF)NULL) +{ + Jcp = NULL; + //Client = NULL; + //Database = NULL; + //Collection = NULL; + //Cursor = NULL; + //Query = NULL; + //Opts = NULL; + Ops.Driver = tdp->Schema; + Ops.Url = tdp->Uri; + Ops.User = NULL; + Ops.Pwd = NULL; + Ops.Scrollable = false; + Ops.Fsize = 0; + Ops.Version = tdp->Version; + To_Fbt = NULL; + Mode = MODE_ANY; + Uristr = tdp->Uri; + Db_name = tdp->Schema; + Coll_name = tdp->Collname; + Options = tdp->Options; + Filter = tdp->Filter; + Wrapname = tdp->Wrapname; + Done = false; + Pipe = tdp->Pipe; + Version = tdp->Version; + Lrecl = tdp->Lrecl + tdp->Ending; + Curpos = 0; +} // end of JMGFAM standard constructor + +JMGFAM::JMGFAM(PJMGFAM tdfp) : DOSFAM(tdfp) +{ + //Client = tdfp->Client; + //Database = NULL; + //Collection = tdfp->Collection; + //Cursor = tdfp->Cursor; + //Query = tdfp->Query; + //Opts = tdfp->Opts; + Ops = tdfp->Ops; + To_Fbt = tdfp->To_Fbt; + Mode = tdfp->Mode; + Uristr = tdfp->Uristr; + Db_name = tdfp->Db_name; + Coll_name = tdfp->Coll_name; + Options = tdfp->Options; + Filter = NULL; + Wrapname = tdfp->Wrapname; + Done = tdfp->Done; + Pipe = tdfp->Pipe; + Version = tdfp->Version; +} // end of JMGFAM copy constructor + +/***********************************************************************/ +/* Reset: reset position values at the beginning of file. */ +/***********************************************************************/ +void JMGFAM::Reset(void) +{ + TXTFAM::Reset(); + Fpos = Tpos = Spos = 0; +} // end of Reset + +/***********************************************************************/ +/* MGO GetFileLength: returns file size in number of bytes. */ +/***********************************************************************/ +int JMGFAM::GetFileLength(PGLOBAL g) +{ + return 0; +} // end of GetFileLength + +/***********************************************************************/ +/* Cardinality: returns table cardinality in number of rows. */ +/* This function can be called with a null argument to test the */ +/* availability of Cardinality implementation (1 yes, 0 no). */ +/***********************************************************************/ +int JMGFAM::Cardinality(PGLOBAL g) +{ + if (!g) + return 1; + + return (!Init(g)) ? Jcp->CollSize(g) : 0; +} // end of Cardinality + +/***********************************************************************/ +/* Note: This function is not really implemented yet. */ +/***********************************************************************/ +int JMGFAM::MaxBlkSize(PGLOBAL, int s) +{ + return s; +} // end of MaxBlkSize + +/***********************************************************************/ +/* Init: initialize MongoDB processing. */ +/***********************************************************************/ +bool JMGFAM::Init(PGLOBAL g) +{ + if (Done) + return false; + + /*********************************************************************/ + /* Open an JDBC connection for this table. */ + /* Note: this may not be the proper way to do. Perhaps it is better */ + /* to test whether a connection is already open for this datasource */ + /* and if so to allocate just a new result set. But this only for */ + /* drivers allowing concurency in getting results ??? */ + /*********************************************************************/ + if (!Jcp) + Jcp = new(g) JMgoConn(g, Coll_name, Wrapname); + else if (Jcp->IsOpen()) + Jcp->Close(); + + if (Jcp->Connect(&Ops)) + return true; + + Done = true; + return false; +} // end of Init + +/***********************************************************************/ +/* OpenTableFile: Open a MongoDB table. */ +/***********************************************************************/ +bool JMGFAM::OpenTableFile(PGLOBAL g) +{ + Mode = Tdbp->GetMode(); + + if (Pipe && Mode != MODE_READ) { + strcpy(g->Message, "Pipeline tables are read only"); + return true; + } // endif Pipe + + if (Init(g)) + return true; + + if (Jcp->GetMethodId(g, Mode)) + return true; + + if (Mode == MODE_DELETE && !Tdbp->GetNext()) { + // Delete all documents + if (!Jcp->MakeCursor(g, Tdbp, "all", Filter, false)) + if (Jcp->DocDelete(g, true) == RC_OK) + return false; + + return true; + } // endif Mode + + if (Mode == MODE_INSERT) + Jcp->MakeColumnGroups(g, Tdbp); + + if (Mode != MODE_UPDATE) + return Jcp->MakeCursor(g, Tdbp, Options, Filter, Pipe); + + return false; + } // end of OpenTableFile + +/***********************************************************************/ +/* GetRowID: return the RowID of last read record. */ +/***********************************************************************/ +int JMGFAM::GetRowID(void) +{ + return Rows; +} // end of GetRowID + +/***********************************************************************/ +/* GetPos: return the position of last read record. */ +/***********************************************************************/ +int JMGFAM::GetPos(void) +{ + return Fpos; +} // end of GetPos + +/***********************************************************************/ +/* GetNextPos: return the position of next record. */ +/***********************************************************************/ +int JMGFAM::GetNextPos(void) +{ + return Fpos; // TODO +} // end of GetNextPos + +/***********************************************************************/ +/* SetPos: Replace the table at the specified position. */ +/***********************************************************************/ +bool JMGFAM::SetPos(PGLOBAL g, int pos) +{ + Fpos = pos; + Placed = true; + return false; +} // end of SetPos + +/***********************************************************************/ +/* Record file position in case of UPDATE or DELETE. */ +/***********************************************************************/ +bool JMGFAM::RecordPos(PGLOBAL g) +{ + strcpy(g->Message, "JMGFAM::RecordPos NIY"); + return true; +} // end of RecordPos + +/***********************************************************************/ +/* Initialize Fpos and the current position for indexed DELETE. */ +/***********************************************************************/ +int JMGFAM::InitDelete(PGLOBAL g, int fpos, int spos) +{ + strcpy(g->Message, "JMGFAM::InitDelete NIY"); + return RC_FX; +} // end of InitDelete + +/***********************************************************************/ +/* Skip one record in file. */ +/***********************************************************************/ +int JMGFAM::SkipRecord(PGLOBAL g, bool header) +{ + return RC_OK; // Dummy +} // end of SkipRecord + +/***********************************************************************/ +/* ReadBuffer: Get next document from a collection. */ +/***********************************************************************/ +int JMGFAM::ReadBuffer(PGLOBAL g) +{ + int rc = RC_FX; + + if (!Curpos && Mode == MODE_UPDATE) + if (Jcp->MakeCursor(g, Tdbp, Options, Filter, Pipe)) + return RC_FX; + + if (++CurNum >= Rbuf) { + Rbuf = Jcp->Fetch(); + Curpos++; + CurNum = 0; + } // endif CurNum + + if (Rbuf > 0) { + PSZ str = Jcp->GetDocument(); + + if (str) { + if (trace == 1) + htrc("%s\n", str); + + strncpy(Tdbp->GetLine(), str, Lrecl); + rc = RC_OK; + } else + strcpy(g->Message, "Null document"); + + } else if (!Rbuf) + rc = RC_EF; + + return rc; +} // end of ReadBuffer + +/***********************************************************************/ +/* WriteBuffer: File write routine for MGO access method. */ +/***********************************************************************/ +int JMGFAM::WriteBuffer(PGLOBAL g) +{ + int rc = RC_OK; + + if (Mode == MODE_INSERT) { + rc = Jcp->DocWrite(g); + } else if (Mode == MODE_DELETE) { + rc = Jcp->DocDelete(g, false); + } else if (Mode == MODE_UPDATE) { + rc = Jcp->DocUpdate(g, Tdbp); + } // endif Mode + + return rc; +} // end of WriteBuffer + +/***********************************************************************/ +/* Data Base delete line routine for MGO and BLK access methods. */ +/***********************************************************************/ +int JMGFAM::DeleteRecords(PGLOBAL g, int irc) +{ + return (irc == RC_OK) ? WriteBuffer(g) : RC_OK; +} // end of DeleteRecords + +/***********************************************************************/ +/* Table file close routine for MGO access method. */ +/***********************************************************************/ +void JMGFAM::CloseTableFile(PGLOBAL g, bool) +{ + Jcp->Close(); + Done = false; +} // end of CloseTableFile + +/***********************************************************************/ +/* Rewind routine for MGO access method. */ +/***********************************************************************/ +void JMGFAM::Rewind(void) +{ + Jcp->Rewind(); +} // end of Rewind + diff --git a/storage/connect/jmgfam.h b/storage/connect/jmgfam.h new file mode 100644 index 00000000000..5c80d993833 --- /dev/null +++ b/storage/connect/jmgfam.h @@ -0,0 +1,79 @@ +/************** MongoFam H Declares Source Code File (.H) **************/ +/* Name: jmgfam.h Version 1.0 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the JAVA MongoDB access method classes declares */ +/***********************************************************************/ +#pragma once + +/***********************************************************************/ +/* Include MongoDB library header files. */ +/***********************************************************************/ +#include "block.h" +//#include "mongo.h" +#include "jmgoconn.h" + +typedef class JMGFAM *PJMGFAM; +typedef class MGODEF *PMGODEF; + +/***********************************************************************/ +/* This is the Java MongoDB Access Method class declaration. */ +/***********************************************************************/ +class DllExport JMGFAM : public DOSFAM { + friend void mongo_init(bool); +public: + // Constructor + JMGFAM(PJDEF tdp); + JMGFAM(PJMGFAM txfp); + + // Implementation + virtual AMT GetAmType(void) { return TYPE_AM_MGO; } + virtual bool GetUseTemp(void) { return false; } + virtual int GetPos(void); + virtual int GetNextPos(void); + virtual PTXF Duplicate(PGLOBAL g) { return (PTXF)new(g) JMGFAM(this); } + void SetLrecl(int lrecl) { Lrecl = lrecl; } + + // Methods + virtual void Reset(void); + virtual int GetFileLength(PGLOBAL g); + virtual int Cardinality(PGLOBAL g); + virtual int MaxBlkSize(PGLOBAL g, int s); + virtual bool AllocateBuffer(PGLOBAL g) { return false; } + virtual int GetRowID(void); + virtual bool RecordPos(PGLOBAL g); + virtual bool SetPos(PGLOBAL g, int recpos); + virtual int SkipRecord(PGLOBAL g, bool header); + virtual bool OpenTableFile(PGLOBAL g); + virtual int ReadBuffer(PGLOBAL g); + virtual int WriteBuffer(PGLOBAL g); + virtual int DeleteRecords(PGLOBAL g, int irc); + virtual void CloseTableFile(PGLOBAL g, bool abort); + virtual void Rewind(void); + +protected: + virtual bool OpenTempFile(PGLOBAL g) { return false; } + virtual bool MoveIntermediateLines(PGLOBAL g, bool *b) { return false; } + virtual int RenameTempFile(PGLOBAL g) { return RC_OK; } + virtual int InitDelete(PGLOBAL g, int fpos, int spos); + bool Init(PGLOBAL g); +//bool MakeCursor(PGLOBAL g); + + // Members + JMgoConn *Jcp; // Points to a Mongo connection class + JDBCPARM Ops; // Additional parameters + PFBLOCK To_Fbt; // Pointer to temp file block + MODE Mode; + PCSZ Uristr; + PCSZ Db_name; + PCSZ Coll_name; + PCSZ Options; + PCSZ Filter; + PSZ Wrapname; + bool Done; // Init done + bool Pipe; + int Version; + int Curpos; // Cursor position of last fetch +}; // end of class JMGFAM + diff --git a/storage/connect/jmgoconn.cpp b/storage/connect/jmgoconn.cpp new file mode 100644 index 00000000000..cb0729ed1f7 --- /dev/null +++ b/storage/connect/jmgoconn.cpp @@ -0,0 +1,811 @@ +/************ JMgoConn C++ Functions Source Code File (.CPP) ***********/ +/* Name: JMgoConn.CPP Version 1.1 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the MongoDB Java connection classes functions. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> + +/***********************************************************************/ +/* Required objects includes. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "colblk.h" +#include "xobject.h" +#include "xtable.h" +#include "filter.h" +#include "jmgoconn.h" + +#define nullptr 0 + +bool IsNum(PSZ s); + +/* --------------------------- Class JNCOL --------------------------- */ + +/***********************************************************************/ +/* Add a column in the column list. */ +/***********************************************************************/ +void JNCOL::AddCol(PGLOBAL g, PCOL colp, PSZ jp) +{ + char *p; + PJKC kp, kcp; + + if ((p = strchr(jp, '.'))) { + PJNCOL icp; + + *p++ = 0; + + for (kp = Klist; kp; kp = kp->Next) + if (kp->Jncolp && !strcmp(jp, kp->Key)) + break; + + if (!kp) { + icp = new(g) JNCOL(IsNum(p)); + kcp = (PJKC)PlugSubAlloc(g, NULL, sizeof(JKCOL)); + kcp->Next = NULL; + kcp->Jncolp = icp; + kcp->Colp = NULL; + + if (Array) { + kcp->Key = NULL; + kcp->N = atoi(p); + } else { + kcp->Key = PlugDup(g, jp); + kcp->N = 0; + } // endif Array + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } else + icp = kp->Jncolp; + + *(p - 1) = '.'; + icp->AddCol(g, colp, p); + } else { + kcp = (PJKC)PlugSubAlloc(g, NULL, sizeof(JKCOL)); + + kcp->Next = NULL; + kcp->Jncolp = NULL; + kcp->Colp = colp; + + if (Array) { + kcp->Key = NULL; + kcp->N = atoi(jp); + } else { + kcp->Key = jp; + kcp->N = 0; + } // endif Array + + if (Klist) { + for (kp = Klist; kp->Next; kp = kp->Next); + + kp->Next = kcp; + } else + Klist = kcp; + + } // endif jp + +} // end of AddCol + +/***********************************************************************/ +/* JMgoConn construction/destruction. */ +/***********************************************************************/ +JMgoConn::JMgoConn(PGLOBAL g, PCSZ collname, PCSZ wrapper) + : JAVAConn(g, wrapper) +{ + CollName = collname; + readid = fetchid = getdocid = objfldid = fcollid = acollid = + mkdocid = docaddid = mkarid = araddid = insertid = updateid = + deleteid = gcollid = countid = rewindid = nullptr; + DiscFunc = "MongoDisconnect"; + Fpc = NULL; + m_Version = 0; + m_Fetch = 0; + m_Version = 0; +} // end of JMgoConn + +/***********************************************************************/ +/* AddJars: add some jar file to the Class path. */ +/***********************************************************************/ +void JMgoConn::AddJars(PSTRG jpop, char sep) +{ +#if defined(DEVELOPMENT) + if (m_Version == 2) { + //jpop->Append(sep); + //jpop->Append("C:/Eclipse/workspace/MongoWrap2/bin"); + jpop->Append(sep); + jpop->Append("C:/mongo-java-driver/mongo-java-driver-2.13.3.jar"); + } else { + //jpop->Append(sep); + //jpop->Append("C:/Eclipse/workspace/MongoWrap3/bin"); + jpop->Append(sep); + jpop->Append("C:/mongo-java-driver/mongo-java-driver-3.4.2.jar"); + } // endif m_Version +#endif // DEVELOPMENT +} // end of AddJars + +/***********************************************************************/ +/* Connect: connect to a data source. */ +/***********************************************************************/ +bool JMgoConn::Connect(PJPARM sop) +{ + bool err = false; + jint rc; + jboolean brc; + jstring cln; + PGLOBAL& g = m_G; + + m_Version = sop->Version; + + /*******************************************************************/ + /* Create or attach a JVM. */ + /*******************************************************************/ + if (Open(g)) + return true; + + /*******************************************************************/ + /* Connect to MongoDB. */ + /*******************************************************************/ + jmethodID cid = nullptr; + + if (gmID(g, cid, "MongoConnect", "([Ljava/lang/String;)I")) + return true; + + // Build the java string array + jobjectArray parms = env->NewObjectArray(4, // constructs java array of 4 + env->FindClass("java/lang/String"), NULL); // Strings + + //m_Scrollable = sop->Scrollable; + //m_RowsetSize = sop->Fsize; + + // change some elements + if (sop->Driver) + env->SetObjectArrayElement(parms, 0, env->NewStringUTF(sop->Url)); + + if (sop->Url) + env->SetObjectArrayElement(parms, 1, env->NewStringUTF(sop->Driver)); + + if (sop->User) + env->SetObjectArrayElement(parms, 2, env->NewStringUTF(sop->User)); + + if (sop->Pwd) + env->SetObjectArrayElement(parms, 3, env->NewStringUTF(sop->Pwd)); + + // call method + rc = env->CallIntMethod(job, cid, parms); + err = Check(rc); + env->DeleteLocalRef(parms); // Not used anymore + + if (err) { + sprintf(g->Message, "Connecting: %s rc=%d", Msg, (int)rc); + return true; + } // endif Msg + + /*********************************************************************/ + /* Get the collection. */ + /*********************************************************************/ + if (gmID(g, gcollid, "GetCollection", "(Ljava/lang/String;)Z")) + return true; + + cln = env->NewStringUTF(CollName); + brc = env->CallBooleanMethod(job, gcollid, cln); + env->DeleteLocalRef(cln); + + if (Check(brc ? -1 : 0)) { + sprintf(g->Message, "GetCollection: %s", Msg); + return true; + } // endif Msg + + m_Connected = true; + return false; +} // end of Connect + +/***********************************************************************/ +/* CollSize: returns the number of documents in the collection. */ +/***********************************************************************/ +int JMgoConn::CollSize(PGLOBAL g) +{ + if (!gmID(g, countid, "GetCollSize", "()J")) { + jlong card = env->CallLongMethod(job, countid); + + return (int)card; + } else + return 2; // Make MariaDB happy + +} // end of CollSize + +/***********************************************************************/ +/* OpenDB: Data Base open routine for MONGO access method. */ +/***********************************************************************/ +bool JMgoConn::MakeCursor(PGLOBAL g, PTDB tdbp, PCSZ options, + PCSZ filter, bool pipe) +{ + const char *p; + bool b = false, id = (tdbp->GetMode() != MODE_READ), all = false; + uint len; + PCOL cp; + PSZ jp; + PCSZ op = NULL, sf = NULL, Options = options; + PSTRG s = NULL; + + if (Options && !stricmp(Options, "all")) { + Options = NULL; + all = true; + } // endif Options + + for (cp = tdbp->GetColumns(); cp; cp = cp->GetNext()) + if (!strcmp(cp->GetName(), "_id")) + id = true; + else if (cp->GetFmt() && !strcmp(cp->GetFmt(), "*") && (!Options || pipe)) + all = true; + + if (pipe && Options) { + if (trace) + htrc("Pipeline: %s\n", Options); + + p = strrchr(Options, ']'); + + if (!p) { + strcpy(g->Message, "Missing ] in pipeline"); + return true; + } else + *(char*)p = 0; + + s = new(g) STRING(g, 1023, (PSZ)Options); + + if (tdbp->GetFilter()) { + s->Append(",{\"$match\":"); + + if (tdbp->GetFilter()->MakeSelector(g, s)) { + strcpy(g->Message, "Failed making selector"); + return NULL; + } else + s->Append('}'); + + tdbp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (!all && tdbp->GetColumns()) { + // Project list + len = s->GetLength(); + s->Append(",{\"$project\":{\""); + + if (!id) + s->Append("_id\":0,\""); + + for (PCOL cp = tdbp->GetColumns(); cp; cp = cp->GetNext()) { + if (b) + s->Append(",\""); + else + b = true; + + if ((jp = cp->GetJpath(g, true))) + s->Append(jp); + else { + s->Truncate(len); + goto nop; + } // endif Jpath + + s->Append("\":1"); + } // endfor cp + + s->Append("}}"); + } // endif all + + nop: + s->Append("]}"); + s->Resize(s->GetLength() + 1); + *(char*)p = ']'; // Restore Colist for discovery + p = s->GetStr(); + + if (trace) + htrc("New Pipeline: %s\n", p); + + return AggregateCollection(p); + } else { + if (filter || tdbp->GetFilter()) { + if (trace) { + if (filter) + htrc("Filter: %s\n", filter); + + if (tdbp->GetFilter()) { + char buf[512]; + + tdbp->GetFilter()->Prints(g, buf, 511); + htrc("To_Filter: %s\n", buf); + } // endif To_Filter + + } // endif trace + + s = new(g) STRING(g, 1023, (PSZ)filter); + len = s->GetLength(); + + if (tdbp->GetFilter()) { + if (filter) + s->Append(','); + + if (tdbp->GetFilter()->MakeSelector(g, s)) { + strcpy(g->Message, "Failed making selector"); + return NULL; + } // endif Selector + + tdbp->SetFilter(NULL); // Not needed anymore + } // endif To_Filter + + if (trace) + htrc("selector: %s\n", s->GetStr()); + + s->Resize(s->GetLength() + 1); + sf = PlugDup(g, s->GetStr()); + } // endif Filter + + if (!all) { + if (Options && *Options) { + if (trace) + htrc("options=%s\n", Options); + + op = Options; + } else if (tdbp->GetColumns()) { + // Projection list + if (s) + s->Set("{\""); + else + s = new(g) STRING(g, 511, "{\""); + + if (!id) + s->Append("_id\":0,\""); + + for (PCOL cp = tdbp->GetColumns(); cp; cp = cp->GetNext()) { + if (b) + s->Append(",\""); + else + b = true; + + if ((jp = cp->GetJpath(g, true))) + s->Append(jp); + else { + // Can this happen? + htrc("Fail getting projection path of %s\n", cp->GetName()); + goto nope; + } // endif Jpath + + s->Append("\":1"); + } // endfor cp + + s->Append("}"); + s->Resize(s->GetLength() + 1); + op = s->GetStr(); + } else { + // count(*) ? + op = "{\"_id\":1}"; + } // endif Options + + } // endif all + + nope: + return FindCollection(sf, op); + } // endif Pipe + +} // end of MakeCursor + +/***********************************************************************/ +/* Find a collection and make cursor. */ +/***********************************************************************/ +bool JMgoConn::FindCollection(PCSZ query, PCSZ proj) +{ + bool rc = true; + jboolean brc; + jstring qry = nullptr, prj = nullptr; + PGLOBAL& g = m_G; + + // Get the methods used to execute a query and get the result + if (!gmID(g, fcollid, "FindColl", "(Ljava/lang/String;Ljava/lang/String;)Z")) { + if (query) + qry = env->NewStringUTF(query); + + if (proj) + prj = env->NewStringUTF(proj); + + brc = env->CallBooleanMethod(job, fcollid, qry, prj); + + if (!Check(brc ? -1 : 0)) { + rc = false; + } else + sprintf(g->Message, "FindColl: %s", Msg); + + if (query) + env->DeleteLocalRef(qry); + + if (proj) + env->DeleteLocalRef(prj); + + } // endif xqid + + return rc; +} // end of FindCollection + +/***********************************************************************/ +/* Find a collection and make cursor. */ +/***********************************************************************/ +bool JMgoConn::AggregateCollection(PCSZ pipeline) +{ + bool rc = true; + jboolean brc; + jstring pip = nullptr; + PGLOBAL& g = m_G; + + // Get the methods used to execute a query and get the result + if (!gmID(g, acollid, "AggregateColl", "(Ljava/lang/String;)Z")) { + pip = env->NewStringUTF(pipeline); + + brc = env->CallBooleanMethod(job, acollid, pip); + + if (!Check(brc ? -1 : 0)) { + rc = false; + } else + sprintf(g->Message, "AggregateColl: %s", Msg); + + env->DeleteLocalRef(pip); + } // endif acollid + + return rc; +} // end of AggregateCollection + +/***********************************************************************/ +/* Fetch next row. */ +/***********************************************************************/ +int JMgoConn::Fetch(int pos) +{ + jint rc = JNI_ERR; + PGLOBAL& g = m_G; + + //if (m_Full) // Result set has one row + // return 1; + + //if (pos) { + // if (!m_Scrollable) { + // strcpy(g->Message, "Cannot fetch(pos) if FORWARD ONLY"); + // return rc; + // } else if (gmID(m_G, fetchid, "Fetch", "(I)Z")) + // return rc; + + // if (env->CallBooleanMethod(job, fetchid, pos)) + // rc = m_Rows; + + //} else { + if (gmID(g, readid, "ReadNext", "()I")) + return (int)rc; + + rc = env->CallIntMethod(job, readid); + + if (!Check(rc)) { + //if (rc == 0) + // m_Full = (m_Fetch == 1); + //else + // m_Fetch++; + + m_Rows += (int)rc; + } else + sprintf(g->Message, "Fetch: %s", Msg); + + //} // endif pos + + return (int)rc; +} // end of Fetch + +/***********************************************************************/ +/* Get the Json string of the current document. */ +/***********************************************************************/ +PSZ JMgoConn::GetDocument(void) +{ + PGLOBAL& g = m_G; + PSZ doc = NULL; + jstring jdc; + + if (!gmID(g, getdocid, "GetDoc", "()Ljava/lang/String;")) { + jdc = (jstring)env->CallObjectMethod(job, getdocid); + + if (jdc) + doc = (PSZ)env->GetStringUTFChars(jdc, (jboolean)false); + + } // endif getdocid + + return doc; + } // end of GetDocument + +/***********************************************************************/ +/* Group columns for inserting or updating. */ +/***********************************************************************/ +void JMgoConn::MakeColumnGroups(PGLOBAL g, PTDB tdbp) +{ + Fpc = new(g) JNCOL(false); + + for (PCOL colp = tdbp->GetColumns(); colp; colp = colp->GetNext()) + if (!colp->IsSpecial()) + Fpc->AddCol(g, colp, colp->GetJpath(g, false)); + +} // end of MakeColumnGroups + +/***********************************************************************/ +/* Get additional method ID. */ +/***********************************************************************/ +bool JMgoConn::GetMethodId(PGLOBAL g, MODE mode) +{ + if (mode == MODE_UPDATE) { + if (gmID(g, mkdocid, "MakeDocument", "()Ljava/lang/Object;")) + return true; + + if (gmID(g, docaddid, "DocAdd", + "(Ljava/lang/Object;Ljava/lang/String;Ljava/lang/Object;)Z")) + return true; + + if (gmID(g, updateid, "CollUpdate", "(Ljava/lang/Object;)J")) + return true; + + } else if (mode == MODE_INSERT) { + if (gmID(g, mkdocid, "MakeDocument", "()Ljava/lang/Object;")) + return true; + + if (gmID(g, docaddid, "DocAdd", + "(Ljava/lang/Object;Ljava/lang/String;Ljava/lang/Object;)Z")) + return true; + + if (gmID(g, mkarid, "MakeArray", "()Ljava/lang/Object;")) + return true; + + if (gmID(g, araddid, "ArrayAdd", "(Ljava/lang/Object;ILjava/lang/Object;)Z")) + return true; + + if (gmID(g, insertid, "CollInsert", "(Ljava/lang/Object;)Z")) + return true; + + } else if (mode == MODE_DELETE) + if (gmID(g, deleteid, "CollDelete", "(Z)J")) + return true; + + return gmID(g, rewindid, "Rewind", "()Z"); +} // end of GetMethodId + +/***********************************************************************/ +/* MakeObject. */ +/***********************************************************************/ +jobject JMgoConn::MakeObject(PGLOBAL g, PCOL colp, bool&error ) +{ + jclass cls; + jmethodID cns = nullptr; // Constructor + jobject val = nullptr; + PVAL valp = colp->GetValue(); + + error = false; + + if (valp->IsNull()) + return NULL; + + try { + switch (valp->GetType()) { + case TYPE_STRING: + val = env->NewStringUTF(valp->GetCharValue()); + break; + case TYPE_INT: + case TYPE_SHORT: + cls = env->FindClass("java/lang/Integer"); + cns = env->GetMethodID(cls, "<init>", "(I)V"); + val = env->NewObject(cls, cns, valp->GetIntValue()); + break; + case TYPE_TINY: + cls = env->FindClass("java/lang/Boolean"); + cns = env->GetMethodID(cls, "<init>", "(Z)V"); + val = env->NewObject(cls, cns, (valp->GetIntValue() != 0)); + break; + case TYPE_BIGINT: + cls = env->FindClass("java/lang/Long"); + cns = env->GetMethodID(cls, "<init>", "(J)V"); + val = env->NewObject(cls, cns, valp->GetBigintValue()); + break; + case TYPE_DOUBLE: + cls = env->FindClass("java/lang/Double"); + cns = env->GetMethodID(cls, "<init>", "(D)V"); + val = env->NewObject(cls, cns, valp->GetFloatValue()); + break; + default: + sprintf(g->Message, "Cannot make object from %d type", valp->GetType()); + error = true; + break; + } // endswitch Type + + } catch (...) { + sprintf(g->Message, "Cannot make object from %s value", colp->GetName()); + error = true; + } // end try/catch + + return val; +} // end of MakeObject + +/***********************************************************************/ +/* MakeDoc. */ +/***********************************************************************/ +jobject JMgoConn::MakeDoc(PGLOBAL g, PJNCOL jcp) +{ + bool error = false; + jobject parent, child, val; + jstring jkey; + + if (jcp->Array) + parent = env->CallObjectMethod(job, mkarid); + else + parent = env->CallObjectMethod(job, mkdocid); + + for (PJKC kp = jcp->Klist; kp; kp = kp->Next) + if (kp->Jncolp) { + if (!(child = MakeDoc(g, kp->Jncolp))) + return NULL; + + if (!jcp->Array) { + jkey = env->NewStringUTF(kp->Key); + + if (env->CallBooleanMethod(job, docaddid, parent, jkey, child)) + return NULL; + + env->DeleteLocalRef(jkey); + } else + if (env->CallBooleanMethod(job, araddid, parent, kp->N, child)) + return NULL; + + } else { + if (!(val = MakeObject(g, kp->Colp, error))) { + if (error) + return NULL; + + } else if (!jcp->Array) { + jkey = env->NewStringUTF(kp->Key); + + if (env->CallBooleanMethod(job, docaddid, parent, jkey, val)) + return NULL; + + env->DeleteLocalRef(jkey); + } else if (env->CallBooleanMethod(job, araddid, parent, kp->N, val)) { + if (Check(-1)) + sprintf(g->Message, "ArrayAdd: %s", Msg); + else + sprintf(g->Message, "ArrayAdd: unknown error"); + + return NULL; + } // endif ArrayAdd + + } // endif Jncolp + + return parent; +} // end of MakeDoc + +/***********************************************************************/ +/* Insert a new document in the collation. */ +/***********************************************************************/ +int JMgoConn::DocWrite(PGLOBAL g) +{ + jobject doc; + + if (!Fpc || !(doc = MakeDoc(g, Fpc))) + return RC_FX; + + if (env->CallBooleanMethod(job, insertid, doc)) { + if (Check(-1)) + sprintf(g->Message, "CollInsert: %s", Msg); + else + sprintf(g->Message, "CollInsert: unknown error"); + + return RC_FX; + } // endif Insert + + return RC_OK; +} // end of DocWrite + +/***********************************************************************/ +/* Update the current document from the collection. */ +/***********************************************************************/ +int JMgoConn::DocUpdate(PGLOBAL g, PTDB tdbp) +{ + int rc = RC_OK; + bool error; + PCOL colp; + jstring jkey; + jobject val, upd, updlist = env->CallObjectMethod(job, mkdocid); + + // Make the list of changes to do + for (colp = tdbp->GetSetCols(); colp; colp = colp->GetNext()) { + jkey = env->NewStringUTF(colp->GetJpath(g, false)); + val = MakeObject(g, colp, error); + + if (error) + return RC_FX; + + if (env->CallBooleanMethod(job, docaddid, updlist, jkey, val)) + return NULL; + + env->DeleteLocalRef(jkey); + } // endfor colp + + // Make the update parameter + upd = env->CallObjectMethod(job, mkdocid); + jkey = env->NewStringUTF("$set"); + + if (env->CallBooleanMethod(job, docaddid, upd, jkey, updlist)) + return NULL; + + env->DeleteLocalRef(jkey); + + jlong ar = env->CallLongMethod(job, updateid, upd); + + if (trace) + htrc("DocUpdate: ar = %ld\n", ar); + + if (Check((int)ar)) { + sprintf(g->Message, "CollUpdate: %s", Msg); + rc = RC_FX; + } // endif ar + + return rc; +} // end of DocUpdate + +/***********************************************************************/ +/* Remove all or only the current document from the collection. */ +/***********************************************************************/ +int JMgoConn::DocDelete(PGLOBAL g, bool all) +{ + int rc = RC_OK; + jlong ar = env->CallLongMethod(job, deleteid, all); + + if (trace) + htrc("DocDelete: ar = %ld\n", ar); + + if (Check((int)ar)) { + sprintf(g->Message, "CollDelete: %s", Msg); + rc = RC_FX; + } // endif ar + + return rc; +} // end of DocDelete + +/***********************************************************************/ +/* Rewind the collection. */ +/***********************************************************************/ +bool JMgoConn::Rewind(void) +{ + return env->CallBooleanMethod(job, rewindid); +} // end of Rewind + +/***********************************************************************/ +/* Retrieve the column string value from the document. */ +/***********************************************************************/ +PSZ JMgoConn::GetColumnValue(PSZ path) +{ + PGLOBAL& g = m_G; + PSZ fld = NULL; + jstring fn, jn = nullptr; + + if (!path || (jn = env->NewStringUTF(path)) == nullptr) { + sprintf(g->Message, "Fail to allocate jstring %s", SVP(path)); + throw (int)TYPE_AM_MGO; + } // endif name + + if (!gmID(g, objfldid, "GetField", "(Ljava/lang/String;)Ljava/lang/String;")) { + fn = (jstring)env->CallObjectMethod(job, objfldid, jn); + + if (fn) + fld = (PSZ)env->GetStringUTFChars(fn, (jboolean)false); + + } // endif objfldid + + return fld; +} // end of GetColumnValue + diff --git a/storage/connect/jmgoconn.h b/storage/connect/jmgoconn.h new file mode 100644 index 00000000000..8e8577efe97 --- /dev/null +++ b/storage/connect/jmgoconn.h @@ -0,0 +1,136 @@ +/***********************************************************************/ +/* JMgoConn.h : header file for the MongoDB connection classes. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Java interface. */ +/***********************************************************************/ +#include "javaconn.h" + +// Java connection to a MongoDB data source +class TDBJMG; +class JMGCOL; + +/***********************************************************************/ +/* Include MongoDB library header files. */ +/***********************************************************************/ +typedef class JNCOL *PJNCOL; +typedef class MGODEF *PMGODEF; +typedef class TDBJMG *PTDBJMG; +typedef class JMGCOL *PJMGCOL; + +#if 0 +/***********************************************************************/ +/* Class used to get the columns of a mongo collection. */ +/***********************************************************************/ +class MGODISC : public BLOCK { +public: + // Constructor + MGODISC(PGLOBAL g, int *lg); + + // Functions + int GetColumns(PGLOBAL g, char *db, PTOS topt); + bool FindInDoc(PGLOBAL g, bson_iter_t *iter, const bson_t *doc, + char *pcn, char *pfmt, int i, int k, bool b); + + // Members + BCOL bcol; + PBCOL bcp, fbcp, pbcp; + PMGODEF tdp; + TDBJMG *tmgp; + int *length; + int n, k, lvl; + bool all; +}; // end of MGODISC +#endif // 0 + +typedef struct JKCOL { + JKCOL *Next; + PJNCOL Jncolp; + PCOL Colp; + char *Key; + int N; +} *PJKC; + +/***********************************************************************/ +/* Used when inserting values in a MongoDB collection. */ +/***********************************************************************/ +class JNCOL : public BLOCK { +public: + // Constructor + JNCOL(bool ar) { Klist = NULL; Array = ar; } + + // Methods + void AddCol(PGLOBAL g, PCOL colp, PSZ jp); + + //Members + PJKC Klist; + bool Array; +}; // end of JNCOL; + +/***********************************************************************/ +/* JMgoConn class. */ +/***********************************************************************/ +class JMgoConn : public JAVAConn { + friend class TDBJMG; + //friend class TDBXJDC; + //friend PQRYRES GetColumnInfo(PGLOBAL, char*&, char *, int, PVBLK&); +private: + JMgoConn(); // Standard (unused) constructor + +public: + // Constructor + JMgoConn(PGLOBAL g, PCSZ collname, PCSZ wrapper); + + // Implementation +public: + virtual void AddJars(PSTRG jpop, char sep); + virtual bool Connect(PJPARM sop); + virtual bool MakeCursor(PGLOBAL g, PTDB tdbp, PCSZ options, PCSZ filter, bool pipe); +// PQRYRES AllocateResult(PGLOBAL g, TDBEXT *tdbp, int n); + + // Attributes +public: +// virtual int GetMaxValue(int infotype); + +public: + // Operations + virtual int Fetch(int pos = 0); + virtual PSZ GetColumnValue(PSZ name); + + int CollSize(PGLOBAL g); + bool FindCollection(PCSZ query, PCSZ proj); + bool AggregateCollection(PCSZ pipeline); + void MakeColumnGroups(PGLOBAL g, PTDB tdbp); + bool GetMethodId(PGLOBAL g, MODE mode); + jobject MakeObject(PGLOBAL g, PCOL colp, bool& error); + jobject MakeDoc(PGLOBAL g, PJNCOL jcp); + int DocWrite(PGLOBAL g); + int DocUpdate(PGLOBAL g, PTDB tdbp); + int DocDelete(PGLOBAL g, bool all); + bool Rewind(void); + PSZ GetDocument(void); + +protected: + // Members + PCSZ CollName; // The collation name + jmethodID gcollid; // The GetCollection method ID + jmethodID countid; // The GetCollSize method ID + jmethodID fcollid; // The FindColl method ID + jmethodID acollid; // The AggregateColl method ID + jmethodID readid; // The ReadNext method ID + jmethodID fetchid; // The Fetch method ID + jmethodID rewindid; // The Rewind method ID + jmethodID getdocid; // The GetDoc method ID + jmethodID objfldid; // The ObjectField method ID + jmethodID mkdocid; // The MakeDocument method ID + jmethodID docaddid; // The DocAdd method ID + jmethodID mkarid; // The MakeArray method ID + jmethodID araddid; // The ArrayAdd method ID + jmethodID insertid; // The CollInsert method ID + jmethodID updateid; // The CollUpdate method ID + jmethodID deleteid; // The CollDelete method ID + PJNCOL Fpc; // To JNCOL classes + int m_Fetch; + int m_Version; // Java driver version (2 or 3) +}; // end of JMgoConn class definition diff --git a/storage/connect/mongo.cpp b/storage/connect/mongo.cpp new file mode 100644 index 00000000000..d86d0e0b38e --- /dev/null +++ b/storage/connect/mongo.cpp @@ -0,0 +1,99 @@ +/************** mongo C++ Program Source Code File (.CPP) **************/ +/* PROGRAM NAME: mongo Version 1.0 */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* These programs are the MGODEF class execution routines. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant sections of the MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> + +/***********************************************************************/ +/* Include application header files: */ +/* global.h is header containing all global declarations. */ +/* plgdbsem.h is header containing the DB application declarations. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "xtable.h" +#include "tabext.h" +#if defined(MONGO_SUPPORT) +#include "tabmgo.h" +#endif // MONGO_SUPPORT +#if defined(JDBC_SUPPORT) +#include "tabjmg.h" +#endif // JDBC_SUPPORT + +/* -------------------------- Class MGODEF --------------------------- */ + +MGODEF::MGODEF(void) +{ + Driver = NULL; + Uri = NULL; + Colist = NULL; + Filter = NULL; + Level = 0; + Base = 0; + Version = 0; + Pipe = false; +} // end of MGODEF constructor + +/***********************************************************************/ +/* DefineAM: define specific AM block values. */ +/***********************************************************************/ +bool MGODEF::DefineAM(PGLOBAL g, LPCSTR, int poff) +{ + if (EXTDEF::DefineAM(g, "MGO", poff)) + return true; + else if (!Tabschema) + Tabschema = GetStringCatInfo(g, "Dbname", "*"); + +# if !defined(JDBC_SUPPORT) + Driver = "C"; +#elif !defined(MONGO_SUPPORT) + Driver = "JAVA"; +#else + Driver = GetStringCatInfo(g, "Driver", "C"); +#endif + Uri = GetStringCatInfo(g, "Connect", "mongodb://localhost:27017"); + Colist = GetStringCatInfo(g, "Colist", NULL); + Filter = GetStringCatInfo(g, "Filter", NULL); + Base = GetIntCatInfo("Base", 0) ? 1 : 0; + Version = GetIntCatInfo("Version", 3); + + if (Version == 2) + Wrapname = GetStringCatInfo(g, "Wrapper", "Mongo2Interface"); + else + Wrapname = GetStringCatInfo(g, "Wrapper", "Mongo3Interface"); + + Pipe = GetBoolCatInfo("Pipeline", false); + return false; +} // end of DefineAM + +/***********************************************************************/ +/* GetTable: makes a new Table Description Block. */ +/***********************************************************************/ +PTDB MGODEF::GetTable(PGLOBAL g, MODE m) +{ + if (Catfunc == FNC_COL) { +#if defined(MONGO_SUPPORT) + if (Driver && toupper(*Driver) == 'C') + return new(g)TDBGOL(this); +#endif // MONGO_SUPPORT + strcpy(g->Message, "No column find for Java Mongo yet"); + return NULL; + } // endif Catfunc + +#if defined(MONGO_SUPPORT) + if (Driver && toupper(*Driver) == 'C') + return new(g) TDBMGO(this); +#endif // MONGO_SUPPORT +#if defined(JDBC_SUPPORT) + return new(g) TDBJMG(this); +#else // !JDBC_SUPPORT + strcpy(g->Message, "No MONGO nor Java support"); + return NULL; +#endif // !JDBC_SUPPORT +} // end of GetTable + diff --git a/storage/connect/mongo.h b/storage/connect/mongo.h new file mode 100644 index 00000000000..c155a276a8a --- /dev/null +++ b/storage/connect/mongo.h @@ -0,0 +1,62 @@ +/**************** mongo H Declares Source Code File (.H) ***************/ +/* Name: mongo.h Version 1.0 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the common MongoDB classes declares. */ +/***********************************************************************/ +#ifndef __MONGO_H +#define __MONGO_H + +#include "osutil.h" +#include "block.h" +#include "colblk.h" + +typedef class MGODEF *PMGODEF; + +typedef struct _bncol { + struct _bncol *Next; + char *Name; + char *Fmt; + int Type; + int Len; + int Scale; + bool Cbn; + bool Found; +} BCOL, *PBCOL; + +/***********************************************************************/ +/* MongoDB table. */ +/***********************************************************************/ +class DllExport MGODEF : public EXTDEF { /* Table description */ + friend class TDBMGO; + friend class TDBJMG; + friend class TDBGOL; + friend class MGOFAM; + friend class MGODISC; + friend PQRYRES MGOColumns(PGLOBAL, PCSZ, PCSZ, PTOS, bool); +public: + // Constructor + MGODEF(void); + + // Implementation + virtual const char *GetType(void) { return "MONGO"; } + + // Methods + virtual bool DefineAM(PGLOBAL g, LPCSTR am, int poff); + virtual PTDB GetTable(PGLOBAL g, MODE m); + +protected: + // Members + PCSZ Driver; /* MongoDB Driver (C or JAVA) */ + PCSZ Uri; /* MongoDB connection URI */ + PSZ Wrapname; /* Java wrapper name */ + PCSZ Colist; /* Options list */ + PCSZ Filter; /* Filtering query */ + int Level; /* Used for catalog table */ + int Base; /* The array index base */ + int Version; /* The Java driver version */ + bool Pipe; /* True is Colist is a pipeline */ +}; // end of MGODEF + +#endif // __MONGO_H diff --git a/storage/connect/mongofam.cpp b/storage/connect/mongofam.cpp index 2452d10e114..e40ed422d06 100644 --- a/storage/connect/mongofam.cpp +++ b/storage/connect/mongofam.cpp @@ -1,7 +1,7 @@ /************ MONGO FAM C++ Program Source Code File (.CPP) ************/ /* PROGRAM NAME: mongofam.cpp */ /* ------------- */ -/* Version 1.1 */ +/* Version 1.3 */ /* */ /* COPYRIGHT: */ /* ---------- */ @@ -17,26 +17,6 @@ /* Include relevant sections of the System header files. */ /***********************************************************************/ #include "my_global.h" -#if defined(__WIN__) -//#include <io.h> -//#include <fcntl.h> -//#include <errno.h> -#if defined(__BORLANDC__) -#define __MFC_COMPAT__ // To define min/max as macro -#endif // __BORLANDC__ -//#include <windows.h> -#else // !__WIN__ -#if defined(UNIX) || defined(UNIV_LINUX) -//#include <errno.h> -#include <unistd.h> -//#if !defined(sun) // Sun has the ftruncate fnc. -//#define USETEMP // Force copy mode for DELETE -//#endif // !sun -#else // !UNIX -//#include <io.h> -#endif // !UNIX -//#include <fcntl.h> -#endif // !__WIN__ /***********************************************************************/ /* Include application header files: */ @@ -54,20 +34,8 @@ #if defined(UNIX) || defined(UNIV_LINUX) #include "osutil.h" -//#define _fileno fileno -//#define _O_RDONLY O_RDONLY #endif -// Required to initialize libmongoc's internals -void mongo_init(bool init) -{ - if (init) - mongoc_init(); - else - mongoc_cleanup(); - -} // end of mongo_init - /* --------------------------- Class MGOFAM -------------------------- */ /***********************************************************************/ @@ -75,88 +43,39 @@ void mongo_init(bool init) /***********************************************************************/ MGOFAM::MGOFAM(PJDEF tdp) : DOSFAM((PDOSDEF)NULL) { - Client = NULL; - Database = NULL; - Collection = NULL; - Cursor = NULL; - Query = NULL; - Opts = NULL; + Cmgp = NULL; + Pcg.Tdbp = NULL; + + if (tdp) { + Pcg.Uristr = tdp->Uri; + Pcg.Db_name = tdp->Schema; + Pcg.Coll_name = tdp->Collname; + Pcg.Options = tdp->Options; + Pcg.Filter = tdp->Filter; + Pcg.Pipe = tdp->Pipe && tdp->Options != NULL; + } else { + Pcg.Uristr = NULL; + Pcg.Db_name = NULL; + Pcg.Coll_name = NULL; + Pcg.Options = NULL; + Pcg.Filter = NULL; + Pcg.Pipe = false; + } // endif tdp + To_Fbt = NULL; Mode = MODE_ANY; - Uristr = tdp->Uri; - Db_name = tdp->Schema; - Coll_name = tdp->Collname; - Options = tdp->Options; - Filter = tdp->Filter; Done = false; - Pipe = tdp->Pipe; Lrecl = tdp->Lrecl + tdp->Ending; } // end of MGOFAM standard constructor MGOFAM::MGOFAM(PMGOFAM tdfp) : DOSFAM(tdfp) { - Client = tdfp->Client; - Database = NULL; - Collection = tdfp->Collection; - Cursor = tdfp->Cursor; - Query = tdfp->Query; - Opts = tdfp->Opts; + Pcg = tdfp->Pcg; To_Fbt = tdfp->To_Fbt; Mode = tdfp->Mode; - Uristr = tdfp->Uristr; - Db_name = tdfp->Db_name; - Coll_name = tdfp->Coll_name; - Options = tdfp->Options; - Filter = NULL; Done = tdfp->Done; - Pipe = tdfp->Pipe; } // end of MGOFAM copy constructor -#if 0 -void *MGOFAM::mgo_alloc(size_t n) -{ - char *mst = (char*)PlgDBSubAlloc(G, NULL, n + sizeof(size_t)); - - if (mst) { - *(size_t*)mst = n; - return mst + sizeof(size_t); - } // endif mst - - return NULL; -} // end of mgo_alloc - -void *MGOFAM::mgo_calloc(size_t n, size_t sz) -{ - void *m = mgo_alloc(n * sz); - - if (m) - memset(m, 0, n * sz); - - return m; -} // end of mgo_calloc - -void *MGOFAM::mgo_realloc(void *m, size_t n) -{ - if (!m) - return n ? mgo_alloc(n) : NULL; - - size_t *osz = (size_t*)((char*)m - sizeof(size_t)); - - if (n > *osz) { - void *nwm = mgo_alloc(n); - - if (nwm) - memcpy(nwm, m, *osz); - - return nwm; - } else { - *osz = n; - return m; - } // endif n - -} // end of mgo_realloc -#endif // 0 - /***********************************************************************/ /* Reset: reset position values at the beginning of file. */ /***********************************************************************/ @@ -175,47 +94,16 @@ int MGOFAM::GetFileLength(PGLOBAL g) } // end of GetFileLength /***********************************************************************/ -/* Cardinality: returns table cardinality in number of rows. */ +/* Cardinality: returns the number of documents in the collection. */ /* This function can be called with a null argument to test the */ /* availability of Cardinality implementation (1 yes, 0 no). */ /***********************************************************************/ int MGOFAM::Cardinality(PGLOBAL g) { - if (g) { - if (!Init(g)) { - bson_t *query; - const char *jf = NULL; - - if (Pipe) - return 10; - else if (Filter) - jf = Filter; - - if (jf) { - query = bson_new_from_json((const uint8_t *)jf, -1, &Error); - - if (!query) { - htrc("Wrong filter: %s", Error.message); - return 10; - } // endif Query - - } else - query = bson_new(); - - int64_t card = (int)mongoc_collection_count(Collection, - MONGOC_QUERY_NONE, query, 0, 0, NULL, &Error); - - if (card < 0) - sprintf(g->Message, "Collection count: %s", Error.message); - - bson_destroy(query); - return card; - } else - return -1; - - } else + if (!g) return 1; + return (!Init(g)) ? Cmgp->CollSize(g) : 0; } // end of Cardinality /***********************************************************************/ @@ -234,276 +122,30 @@ bool MGOFAM::Init(PGLOBAL g) if (Done) return false; - Uri = mongoc_uri_new(Uristr); + /*********************************************************************/ + /* Open an C connection for this table. */ + /*********************************************************************/ + if (!Cmgp) { + Pcg.Tdbp = Tdbp; + Cmgp = new(g) CMgoConn(g, &Pcg); + } else if (Cmgp->IsConnected()) + Cmgp->Close(); - if (!Uri) { - sprintf(g->Message, "Failed to parse URI: \"%s\"", Uristr); + if (Cmgp->Connect(g)) return true; - } // endif Uri - - // Create a new client pool instance - Pool = mongoc_client_pool_new(Uri); - mongoc_client_pool_set_error_api(Pool, 2); - - // Register the application name so we can track it in the profile logs - // on the server. This can also be done from the URI. - mongoc_client_pool_set_appname(Pool, "Connect"); - - // Create a new client instance - Client = mongoc_client_pool_pop(Pool); - //Client = mongoc_client_new(uristr); - - if (!Client) { - sprintf(g->Message, "Failed to get Client"); - return true; - } // endif Client - - //mongoc_client_set_error_api(Client, 2); - - // Register the application name so we can track it in the profile logs - // on the server. This can also be done from the URI. - //mongoc_client_set_appname(Client, "Connect"); - - // Get a handle on the database Db_name and collection Coll_name - // Database = mongoc_client_get_database(Client, Db_name); - // Collection = mongoc_database_get_collection(Database, Coll_name); - Collection = mongoc_client_get_collection(Client, Db_name, Coll_name); - - if (!Collection) { - sprintf(g->Message, "Failed to get Collection %s.%s", Db_name, Coll_name); - return true; - } // endif Collection Done = true; return false; } // end of Init /***********************************************************************/ -/* OpenDB: Data Base open routine for MONGO access method. */ -/***********************************************************************/ -bool MGOFAM::MakeCursor(PGLOBAL g) -{ - const char *p; - bool b = false, id = (Mode != MODE_READ), all = false; - uint len; - PSZ jp; - PCOL cp; - PSTRG s = NULL; - - if (Options && !stricmp(Options, "all")) { - Options = NULL; - all = true; - } // endif Options - - for (cp = Tdbp->GetColumns(); cp; cp = cp->GetNext()) - if (!strcmp(cp->GetName(), "_id")) - id = true; - else if (cp->GetFmt() && !strcmp(cp->GetFmt(), "*") && !Options) - all = true; - - if (Pipe) { - if (trace) - htrc("Pipeline: %s\n", Options); - - p = strrchr(Options, ']'); - - if (!p) { - strcpy(g->Message, "Missing ] in pipeline"); - return true; - } else - *(char*)p = 0; - - s = new(g) STRING(g, 1023, (PSZ)Options); - len = s->GetLength(); - - if (Tdbp->GetFilter()) { - s->Append(",{\"$match\":"); - - if (!Tdbp->GetFilter()->MakeSelector(g, s, false)) { - s->Append('}'); - Tdbp->SetFilter(NULL); // Not needed anymore - } else { - if (((TDBJSN*)Tdbp)->Xcol) - Tdbp->SetFilter(NULL); // Incompatible - - htrc("Failed making selector\n"); - s->Truncate(len); - } // endif Selector - - } // endif To_Filter - - if (!all) { - // Project list - len = s->GetLength(); - - if (Tdbp->GetColumns()) { - s->Append(",{\"$project\":{\""); - - if (!id) - s->Append("_id\":0,\""); - - for (PCOL cp = Tdbp->GetColumns(); cp; cp = cp->GetNext()) { - if (b) - s->Append(",\""); - else - b = true; - - if ((jp = ((PJCOL)cp)->GetJpath(g, true))) - s->Append(jp); - else { - s->Truncate(len); - goto nop; - } // endif Jpath - - s->Append("\":1"); - } // endfor cp - - } else - s->Append(",{\"$project\":{\"_id\":1}}"); - - s->Append("}}"); - } // endif all - - nop: - s->Append("]}"); - s->Resize(s->GetLength() + 1); - p = s->GetStr(); - - if (trace) - htrc("New Pipeline: %s\n", p); - - Query = bson_new_from_json((const uint8_t *)p, -1, &Error); - - if (!Query) { - sprintf(g->Message, "Wrong pipeline: %s", Error.message); - return true; - } // endif Query - - Cursor = mongoc_collection_aggregate(Collection, MONGOC_QUERY_NONE, - Query, NULL, NULL); - - if (mongoc_cursor_error(Cursor, &Error)) { - sprintf(g->Message, "Mongo aggregate Failure: %s", Error.message); - return true; - } // endif cursor - - } else { - if (Filter || Tdbp->GetFilter()) { - if (trace) { - if (Filter) - htrc("Filter: %s\n", Filter); - - if (Tdbp->GetFilter()) { - char buf[512]; - - Tdbp->GetFilter()->Prints(g, buf, 511); - htrc("To_Filter: %s\n", buf); - } // endif To_Filter - - } // endif trace - - s = new(g) STRING(g, 1023, (PSZ)Filter); - len = s->GetLength(); - - if (Tdbp->GetFilter()) { - if (Filter) - s->Append(','); - - if (Tdbp->GetFilter()->MakeSelector(g, s, false)) { - if (((TDBJSN*)Tdbp)->Xcol) - Tdbp->SetFilter(NULL); // Incompatible - - htrc("Cannot make selector\n"); - s->Truncate(len); - } else - Tdbp->SetFilter(NULL); // Not needed anymore - - } // endif To_Filter - - if (trace) - htrc("selector: %s\n", s->GetStr()); - - s->Resize(s->GetLength() + 1); - - if (s->GetLength()) { - Query = bson_new_from_json((const uint8_t *)s->GetStr(), -1, &Error); - - if (!Query) { - sprintf(g->Message, "Wrong filter: %s", Error.message); - return true; - } // endif Query - - } else - Query = bson_new(); - - } else - Query = bson_new(); - - if (!all) { - if (Options && *Options) { - if (trace) - htrc("options=%s\n", Options); - - p = Options; - } else if (Tdbp->GetColumns()) { - // Projection list - if (s) - s->Set("{\"projection\":{\""); - else - s = new(g) STRING(g, 511, "{\"projection\":{\""); - - if (!id) - s->Append("_id\":0,\""); - - for (PCOL cp = Tdbp->GetColumns(); cp; cp = cp->GetNext()) { - if (b) - s->Append(",\""); - else - b = true; - - if ((jp = ((PJCOL)cp)->GetJpath(g, true))) - s->Append(jp); - else { - s->Reset(); - s->Resize(0); - goto nope; - } // endif Jpath - - s->Append("\":1"); - } // endfor cp - - s->Append("}}"); - s->Resize(s->GetLength() + 1); - p = s->GetStr(); - } else { - // count(*) ? - p = "{\"projection\":{\"_id\":1}}"; - } // endif Options - - Opts = bson_new_from_json((const uint8_t *)p, -1, &Error); - - if (!Opts) { - sprintf(g->Message, "Wrong options: %s", Error.message); - return true; - } // endif Opts - - } // endif all - - nope: - Cursor = mongoc_collection_find_with_opts(Collection, Query, Opts, NULL); - } // endif Pipe - - return false; -} // end of MakeCursor - -/***********************************************************************/ /* OpenTableFile: Open a MongoDB table. */ /***********************************************************************/ bool MGOFAM::OpenTableFile(PGLOBAL g) { Mode = Tdbp->GetMode(); - if (Pipe && Mode != MODE_READ) { + if (Pcg.Pipe && Mode != MODE_READ) { strcpy(g->Message, "Pipeline tables are read only"); return true; } // endif Pipe @@ -511,20 +153,11 @@ bool MGOFAM::OpenTableFile(PGLOBAL g) if (Init(g)) return true; - if (Mode == MODE_DELETE && !Tdbp->GetNext()) { - // Store the number of deleted lines - DelRows = Cardinality(g); - + if (Mode == MODE_DELETE && !Tdbp->GetNext()) // Delete all documents - if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE, - Query, NULL, &Error)) { - sprintf(g->Message, "Remove all: %s", Error.message); - return true; - } // endif remove - - } else if (Mode != MODE_INSERT) - if (MakeCursor(g)) - return true; + return Cmgp->DocDelete(g); + else if (Mode == MODE_INSERT) + Cmgp->MakeColumnGroups(g); return false; } // end of OpenTableFile @@ -590,90 +223,17 @@ int MGOFAM::SkipRecord(PGLOBAL g, bool header) } // end of SkipRecord /***********************************************************************/ -/* Use to trace restaurants document contains. */ -/***********************************************************************/ -void MGOFAM::ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k) -{ - - if (!doc || bson_iter_init(iter, doc)) { - const char *key; - - while (bson_iter_next(iter)) { - key = bson_iter_key(iter); - htrc("Found element key: \"%s\"\n", key); - - if (BSON_ITER_HOLDS_UTF8(iter)) - htrc("%s.%s=\"%s\"\n", k, key, bson_iter_utf8(iter, NULL)); - else if (BSON_ITER_HOLDS_INT32(iter)) - htrc("%s.%s=%d\n", k, key, bson_iter_int32(iter)); - else if (BSON_ITER_HOLDS_INT64(iter)) - htrc("%s.%s=%lld\n", k, key, bson_iter_int64(iter)); - else if (BSON_ITER_HOLDS_DOUBLE(iter)) - htrc("%s.%s=%g\n", k, key, bson_iter_double(iter)); - else if (BSON_ITER_HOLDS_DATE_TIME(iter)) - htrc("%s.%s=date(%lld)\n", k, key, bson_iter_date_time(iter)); - else if (BSON_ITER_HOLDS_OID(iter)) { - char str[25]; - - bson_oid_to_string(bson_iter_oid(iter), str); - htrc("%s.%s=%s\n", k, key, str); - } else if (BSON_ITER_HOLDS_DECIMAL128(iter)) { - char *str = NULL; - bson_decimal128_t dec; - - bson_iter_decimal128(iter, &dec); - bson_decimal128_to_string(&dec, str); - htrc("%s.%s=%s\n", k, key, str); - } else if (BSON_ITER_HOLDS_DOCUMENT(iter)) { - bson_iter_t child; - - if (bson_iter_recurse(iter, &child)) - ShowDocument(&child, NULL, key); - - } else if (BSON_ITER_HOLDS_ARRAY(iter)) { - bson_t *arr; - bson_iter_t itar; - const uint8_t *data = NULL; - uint32_t len = 0; - - bson_iter_array(iter, &len, &data); - arr = bson_new_from_data(data, len); - ShowDocument(&itar, arr, key); - } // endif's - - } // endwhile bson_iter_next - - } // endif bson_iter_init - -} // end of ShowDocument - -/***********************************************************************/ /* ReadBuffer: Get next document from a collection. */ /***********************************************************************/ int MGOFAM::ReadBuffer(PGLOBAL g) { - int rc = RC_OK; - - if (mongoc_cursor_next(Cursor, &Document)) { - char *str = bson_as_json(Document, NULL); - - if (trace > 1) { - bson_iter_t iter; - ShowDocument(&iter, Document, ""); - } else if (trace == 1) - htrc("%s\n", str); - - strncpy(Tdbp->GetLine(), str, Lrecl); - bson_free(str); - } else if (mongoc_cursor_error(Cursor, &Error)) { - sprintf(g->Message, "Mongo Cursor Failure: %s", Error.message); - rc = RC_FX; - } else { - //mongoc_cursor_destroy(Cursor); - rc = RC_EF; - } // endif's Cursor + int rc = Cmgp->ReadNext(g); - return rc; + if (rc != RC_OK) + return rc; + + strncpy(Tdbp->GetLine(), Cmgp->GetDocument(g), Lrecl); + return RC_OK; } // end of ReadBuffer /***********************************************************************/ @@ -681,84 +241,7 @@ int MGOFAM::ReadBuffer(PGLOBAL g) /***********************************************************************/ int MGOFAM::WriteBuffer(PGLOBAL g) { - int rc = RC_OK; - bson_t *doc; - - //if (Mode == MODE_INSERT && !Collection) { - // if ((Database = mongoc_client_get_database(Client, db_name))) - // Collection = mongoc_database_create_collection(Database, coll_name, - // NULL, &Error); - - // if (!Collection) - // if (Database) { - // sprintf(g->Message, "Create collection: %s", Error.message); - // return RC_FX; - // } else { - // sprintf(g->Message, "Fail to get database %s", db_name); - // return RC_FX; - // } // endif Database - - //} // endif Collection - - if (!(doc = bson_new_from_json((const uint8_t *)Tdbp->GetLine(), - -1, &Error))) { - sprintf(g->Message, "Wrong document: %s", Error.message); - return RC_FX; - } // endif doc - - if (Mode != MODE_INSERT) { - bool b = false; - bson_iter_t iter; - bson_t *selector = bson_new(); - - bson_iter_init(&iter, Document); - - if (bson_iter_find(&iter, "_id")) { - if (BSON_ITER_HOLDS_OID(&iter)) - b = BSON_APPEND_OID(selector, "_id", bson_iter_oid(&iter)); - else if (BSON_ITER_HOLDS_INT32(&iter)) - b = BSON_APPEND_INT32(selector, "_id", bson_iter_int32(&iter)); - else if (BSON_ITER_HOLDS_INT64(&iter)) - b = BSON_APPEND_INT64(selector, "_id", bson_iter_int64(&iter)); - else if (BSON_ITER_HOLDS_DOUBLE(&iter)) - b = BSON_APPEND_DOUBLE(selector, "_id", bson_iter_double(&iter)); - else if (BSON_ITER_HOLDS_UTF8(&iter)) - b = BSON_APPEND_UTF8(selector, "_id", bson_iter_utf8(&iter, NULL)); - - } // endif iter - - if (!b) { - strcpy(g->Message, "Cannot find _id"); - return RC_FX; - } // endif oid - - if (Mode == MODE_DELETE) { - if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE, - selector, NULL, &Error)) { - sprintf(g->Message, "Remove: %s", Error.message); - bson_destroy(selector); - return RC_FX; - } // endif remove - - } else { - if (!mongoc_collection_update(Collection, MONGOC_UPDATE_NONE, - selector, doc, NULL, &Error)) { - sprintf(g->Message, "Update: %s", Error.message); - bson_destroy(selector); - return RC_FX; - } // endif remove - - } // endif Mode - - bson_destroy(selector); - } else if (!mongoc_collection_insert(Collection, MONGOC_INSERT_NONE, - doc, NULL, &Error)) { - sprintf(g->Message, "Inserting: %s", Error.message); - rc = RC_FX; - } // endif insert - - bson_destroy(doc); - return rc; + return Cmgp->Write(g); } // end of WriteBuffer /***********************************************************************/ @@ -774,18 +257,7 @@ int MGOFAM::DeleteRecords(PGLOBAL g, int irc) /***********************************************************************/ void MGOFAM::CloseTableFile(PGLOBAL g, bool) { - if (Query) bson_destroy(Query); - if (Opts) bson_destroy(Opts); - if (Cursor) mongoc_cursor_destroy(Cursor); - if (Collection) mongoc_collection_destroy(Collection); - // mongoc_database_destroy(Database); - // mongoc_client_destroy(Client); - if (Client) mongoc_client_pool_push(Pool, Client); - if (Pool) mongoc_client_pool_destroy(Pool); - if (Uri) mongoc_uri_destroy(Uri); -//bson_mem_restore_vtable(); -//mongoc_cleanup(); -//G = NULL; + Cmgp->Close(); Done = false; } // end of CloseTableFile @@ -794,9 +266,6 @@ void MGOFAM::CloseTableFile(PGLOBAL g, bool) /***********************************************************************/ void MGOFAM::Rewind(void) { - mongoc_cursor_t *cursor = mongoc_cursor_clone(Cursor); - - mongoc_cursor_destroy(Cursor); - Cursor = cursor; + Cmgp->Rewind(); } // end of Rewind diff --git a/storage/connect/mongofam.h b/storage/connect/mongofam.h index f3b7ea3d05f..99cc6128ffd 100644 --- a/storage/connect/mongofam.h +++ b/storage/connect/mongofam.h @@ -1,21 +1,11 @@ /************** MongoFam H Declares Source Code File (.H) **************/ -/* Name: mongofam.h Version 1.3 */ +/* Name: mongofam.h Version 1.4 */ /* */ /* (C) Copyright to the author Olivier BERTRAND 2017 */ /* */ /* This file contains the MongoDB access method classes declares. */ /***********************************************************************/ -#pragma once - -/***********************************************************************/ -/* Include MongoDB library header files. */ -/***********************************************************************/ -#include <bson.h> -#include <bcon.h> -#include <mongoc.h> - -#include "block.h" -//#include "array.h" +#include "cmgoconn.h" typedef class TXTFAM *PTXF; typedef class MGOFAM *PMGOFAM; @@ -37,6 +27,7 @@ public: virtual bool GetUseTemp(void) { return false; } virtual int GetPos(void); virtual int GetNextPos(void); + void SetTdbp(PTDBDOS tdbp) { Tdbp = tdbp; } virtual PTXF Duplicate(PGLOBAL g) { return (PTXF)new(g) MGOFAM(this); } void SetLrecl(int lrecl) { Lrecl = lrecl; } @@ -63,35 +54,12 @@ protected: virtual int RenameTempFile(PGLOBAL g) { return RC_OK; } virtual int InitDelete(PGLOBAL g, int fpos, int spos); bool Init(PGLOBAL g); - bool MakeCursor(PGLOBAL g); - void ShowDocument(bson_iter_t *i, const bson_t *b, const char *k); - //static void *mgo_alloc(size_t n); - //static void *mgo_calloc(size_t n, size_t sz); - //static void *mgo_realloc(void *m, size_t n); - //static void mgo_free(void *) {} - // Members -//static PGLOBAL G; - mongoc_uri_t *Uri; - mongoc_client_pool_t *Pool; // Thread safe client pool - mongoc_client_t *Client; // The MongoDB client - mongoc_database_t *Database; // The MongoDB database - mongoc_collection_t *Collection; // The MongoDB collection - mongoc_cursor_t *Cursor; - const bson_t *Document; -//bson_mem_vtable_t Vtable; - bson_t *Query; // MongoDB cursor filter - bson_t *Opts; // MongoDB cursor options - bson_error_t Error; - PFBLOCK To_Fbt; // Pointer to temp file block - MODE Mode; - const char *Uristr; - const char *Db_name; - const char *Coll_name; - const char *Options; - const char *Filter; - bool Done; // Init done - bool Pipe; + CMgoConn *Cmgp; // Points to a C Mongo connection class + CMGOPARM Pcg; // Parms passed to Cmgp + PFBLOCK To_Fbt; // Pointer to temp file block + MODE Mode; + bool Done; // Init done }; // end of class MGOFAM diff --git a/storage/connect/msgid.h b/storage/connect/msgid.h index 0e9c036dc49..cee78aa1783 100644 --- a/storage/connect/msgid.h +++ b/storage/connect/msgid.h @@ -1,3 +1,4 @@ +/* Copyright (C) MariaDB Corporation Ab */ #define MSG_ACCESS_VIOLATN 200 #define MSG_ADD_BAD_TYPE 201 #define MSG_ALLOC_ERROR 202 diff --git a/storage/connect/mycat.cc b/storage/connect/mycat.cc index 0104213ed92..906ac734157 100644 --- a/storage/connect/mycat.cc +++ b/storage/connect/mycat.cc @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2017 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -86,7 +86,7 @@ #if defined(JDBC_SUPPORT) #define NJDBC #include "tabjdbc.h" -#endif // ODBC_SUPPORT +#endif // JDBC_SUPPORT #if defined(PIVOT_SUPPORT) #include "tabpivot.h" #endif // PIVOT_SUPPORT @@ -96,9 +96,9 @@ #if defined(XML_SUPPORT) #include "tabxml.h" #endif // XML_SUPPORT -#if defined(MONGO_SUPPORT) -#include "tabmgo.h" -#endif // MONGO_SUPPORT +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) +#include "mongo.h" +#endif // MONGO_SUPPORT || JDBC_SUPPORT #if defined(ZIP_SUPPORT) #include "tabzip.h" #endif // ZIP_SUPPORT @@ -133,21 +133,21 @@ TABTYPE GetTypeID(const char *type) : (!stricmp(type, "CSV")) ? TAB_CSV : (!stricmp(type, "FMT")) ? TAB_FMT : (!stricmp(type, "DBF")) ? TAB_DBF -#ifdef XML_SUPPORT +#if defined(XML_SUPPORT) : (!stricmp(type, "XML")) ? TAB_XML #endif : (!stricmp(type, "INI")) ? TAB_INI : (!stricmp(type, "VEC")) ? TAB_VEC -#ifdef ODBC_SUPPORT +#if defined(ODBC_SUPPORT) : (!stricmp(type, "ODBC")) ? TAB_ODBC #endif -#ifdef JDBC_SUPPORT +#if defined(JDBC_SUPPORT) : (!stricmp(type, "JDBC")) ? TAB_JDBC #endif : (!stricmp(type, "MYSQL")) ? TAB_MYSQL : (!stricmp(type, "MYPRX")) ? TAB_MYSQL : (!stricmp(type, "DIR")) ? TAB_DIR -#ifdef __WIN__ +#if defined(__WIN__) : (!stricmp(type, "MAC")) ? TAB_MAC : (!stricmp(type, "WMI")) ? TAB_WMI #endif @@ -156,15 +156,15 @@ TABTYPE GetTypeID(const char *type) : (!stricmp(type, "OCCUR")) ? TAB_OCCUR : (!stricmp(type, "CATLG")) ? TAB_PRX // Legacy : (!stricmp(type, "PROXY")) ? TAB_PRX -#ifdef PIVOT_SUPPORT +#if defined(PIVOT_SUPPORT) : (!stricmp(type, "PIVOT")) ? TAB_PIVOT #endif : (!stricmp(type, "VIR")) ? TAB_VIR : (!stricmp(type, "JSON")) ? TAB_JSON -#ifdef ZIP_SUPPORT +#if defined(ZIP_SUPPORT) : (!stricmp(type, "ZIP")) ? TAB_ZIP #endif -#ifdef MONGO_SUPPORT +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) : (!stricmp(type, "MONGO")) ? TAB_MONGO #endif : (!stricmp(type, "OEM")) ? TAB_OEM : TAB_NIY; @@ -557,9 +557,9 @@ PRELDEF MYCAT::MakeTableDesc(PGLOBAL g, PTABLE tablep, LPCSTR am) #endif // PIVOT_SUPPORT case TAB_VIR: tdp= new(g) VIRDEF; break; case TAB_JSON: tdp= new(g) JSONDEF; break; -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) case TAB_MONGO: tdp = new(g) MGODEF; break; -#endif // MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT #if defined(ZIP_SUPPORT) case TAB_ZIP: tdp= new(g) ZIPDEF; break; #endif // ZIP_SUPPORT diff --git a/storage/connect/mycat.h b/storage/connect/mycat.h index 75dd0e5d150..f0f889722dd 100644 --- a/storage/connect/mycat.h +++ b/storage/connect/mycat.h @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2015 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -15,7 +15,7 @@ /**************** MYCAT H Declares Source Code File (.H) ***************/ /* Name: MYCAT.H Version 2.3 */ -/* */ +/* Author: Olivier Bertrand */ /* This file contains the CONNECT plugin MYCAT class definitions. */ /***********************************************************************/ #ifndef __MYCAT__H diff --git a/storage/connect/os.h b/storage/connect/os.h index 8056a272990..e2b165fb3f5 100644 --- a/storage/connect/os.h +++ b/storage/connect/os.h @@ -1,3 +1,4 @@ +/* Copyright (C) MariaDB Corporation Ab */ #ifndef _OS_H_INCLUDED #define _OS_H_INCLUDED diff --git a/storage/connect/osutil.c b/storage/connect/osutil.c index 66743c7403b..da896fec50e 100644 --- a/storage/connect/osutil.c +++ b/storage/connect/osutil.c @@ -1,3 +1,4 @@ +/* Copyright (C) MariaDB Corporation Ab */ #include "my_global.h" #include <stdlib.h> #include <string.h> diff --git a/storage/connect/osutil.h b/storage/connect/osutil.h index ac63d4ee973..7e6b8823b9b 100644 --- a/storage/connect/osutil.h +++ b/storage/connect/osutil.h @@ -1,3 +1,4 @@ +/* Copyright (C) MariaDB Corporation Ab */ #ifndef __OSUTIL_H__ #define __OSUTIL_H__ diff --git a/storage/connect/plgdbsem.h b/storage/connect/plgdbsem.h index 7308ab6ad8b..a66e2e9832d 100644 --- a/storage/connect/plgdbsem.h +++ b/storage/connect/plgdbsem.h @@ -138,12 +138,12 @@ enum AMT {TYPE_AM_ERROR = 0, /* Type not defined */ TYPE_AM_VIR = 171, /* Virtual tables am type no */ TYPE_AM_DMY = 172, /* DMY Dummy tables am type no */ TYPE_AM_SET = 180, /* SET Set tables am type no */ - TYPE_AM_MYSQL = 192, /* MYSQL access method type no */ - TYPE_AM_MYX = 193, /* MYSQL EXEC access method type */ - TYPE_AM_CAT = 195, /* Catalog access method type no */ - TYPE_AM_ZIP = 198, /* ZIP access method type no */ - TYPE_AM_MGO = 199, /* MGO access method type no */ - TYPE_AM_OUT = 200}; /* Output relations (storage) */ + TYPE_AM_MYSQL = 190, /* MYSQL access method type no */ + TYPE_AM_MYX = 191, /* MYSQL EXEC access method type */ + TYPE_AM_CAT = 192, /* Catalog access method type no */ + TYPE_AM_ZIP = 193, /* ZIP access method type no */ + TYPE_AM_MGO = 194, /* MGO access method type no */ + TYPE_AM_OUT = 200}; /* Output relations (storage) */ enum RECFM {RECFM_NAF = -2, /* Not a file */ RECFM_OEM = -1, /* OEM file access method */ diff --git a/storage/connect/tabext.h b/storage/connect/tabext.h index a7f5fb9d856..162fb516400 100644 --- a/storage/connect/tabext.h +++ b/storage/connect/tabext.h @@ -104,6 +104,8 @@ protected: /* This is the base class for all external tables. */ /***********************************************************************/ class DllExport TDBEXT : public TDB { + friend class JAVAConn; + friend class JMgoConn; public: // Constructors TDBEXT(EXTDEF *tdp); @@ -164,7 +166,7 @@ protected: }; // end of class TDBEXT /***********************************************************************/ -/* Virual class EXTCOL: external column. */ +/* Virtual class EXTCOL: external column. */ /***********************************************************************/ class DllExport EXTCOL : public COLBLK { friend class TDBEXT; diff --git a/storage/connect/tabjdbc.cpp b/storage/connect/tabjdbc.cpp index 7c82a2fc138..b6a1487955b 100644 --- a/storage/connect/tabjdbc.cpp +++ b/storage/connect/tabjdbc.cpp @@ -223,10 +223,10 @@ bool JDBCDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff) } // endif Connect if (Url) - rc = ParseURL(g, Url); - - if (rc == RC_FX) // Error - return true; + if ((rc = ParseURL(g, Url)) == RC_FX) { + sprintf(g->Message, "Wrong JDBC URL %s", Url); + return true; + } // endif rc Wrapname = GetStringCatInfo(g, "Wrapper", NULL); return false; @@ -305,12 +305,12 @@ TDBJDBC::TDBJDBC(PJDBCDEF tdp) : TDBEXT(tdp) if (tdp) { Ops.Driver = tdp->Driver; Ops.Url = tdp->Url; - WrapName = tdp->Wrapname; + Wrapname = tdp->Wrapname; Ops.User = tdp->Username; Ops.Pwd = tdp->Password; Ops.Scrollable = tdp->Scrollable; } else { - WrapName = NULL; + Wrapname = NULL; Ops.Driver = NULL; Ops.Url = NULL; Ops.User = NULL; @@ -328,7 +328,7 @@ TDBJDBC::TDBJDBC(PTDBJDBC tdbp) : TDBEXT(tdbp) { Jcp = tdbp->Jcp; // is that right ? Cnp = tdbp->Cnp; - WrapName = tdbp->WrapName; + Wrapname = tdbp->Wrapname; Ops = tdbp->Ops; Prepared = tdbp->Prepared; Werr = tdbp->Werr; @@ -562,7 +562,7 @@ bool TDBJDBC::OpenDB(PGLOBAL g) /* Table already open, just replace it at its beginning. */ /*******************************************************************/ if (Memory == 1) { - if ((Qrp = Jcp->AllocateResult(g))) + if ((Qrp = Jcp->AllocateResult(g, this))) Memory = 2; // Must be filled else Memory = 0; // Allocation failed, don't use it @@ -596,11 +596,11 @@ bool TDBJDBC::OpenDB(PGLOBAL g) /* drivers allowing concurency in getting results ??? */ /*********************************************************************/ if (!Jcp) - Jcp = new(g)JDBConn(g, this); + Jcp = new(g)JDBConn(g, Wrapname); else if (Jcp->IsOpen()) Jcp->Close(); - if (Jcp->Open(&Ops) == RC_FX) + if (Jcp->Connect(&Ops)) return true; else if (Quoted) Quote = Jcp->GetQuoteChar(); @@ -608,7 +608,7 @@ bool TDBJDBC::OpenDB(PGLOBAL g) Use = USE_OPEN; // Do it now in case we are recursively called /*********************************************************************/ - /* Make the command and allocate whatever is used for getting results. */ + /* Make the command and allocate whatever is used for getting results*/ /*********************************************************************/ if (Mode == MODE_READ || Mode == MODE_READX) { if (Memory > 1 && !Srcdef) { @@ -625,7 +625,7 @@ bool TDBJDBC::OpenDB(PGLOBAL g) } else if (n) { Jcp->m_Rows = n; - if ((Qrp = Jcp->AllocateResult(g))) + if ((Qrp = Jcp->AllocateResult(g, this))) Memory = 2; // Must be filled else { strcpy(g->Message, "Result set memory allocation failed"); @@ -1134,11 +1134,11 @@ bool TDBXJDC::OpenDB(PGLOBAL g) /* drivers allowing concurency in getting results ??? */ /*********************************************************************/ if (!Jcp) { - Jcp = new(g) JDBConn(g, this); + Jcp = new(g) JDBConn(g, Wrapname); } else if (Jcp->IsOpen()) Jcp->Close(); - if (Jcp->Open(&Ops) == RC_FX) + if (Jcp->Connect(&Ops)) return true; Use = USE_OPEN; // Do it now in case we are recursively called @@ -1173,7 +1173,7 @@ int TDBXJDC::ReadDB(PGLOBAL g) else Query->Set(Cmdlist->Cmd); - if ((rc = Jcp->ExecSQLcommand(Query->GetStr())) == RC_FX) + if ((rc = Jcp->ExecuteCommand(Query->GetStr())) == RC_FX) Nerr++; if (rc == RC_NF) diff --git a/storage/connect/tabjdbc.h b/storage/connect/tabjdbc.h index 8fda3cec90b..d422ed26ef2 100644 --- a/storage/connect/tabjdbc.h +++ b/storage/connect/tabjdbc.h @@ -41,7 +41,7 @@ protected: // Members PSZ Driver; /* JDBC driver */ PSZ Url; /* JDBC driver URL */ - PSZ Wrapname; /* Java wrapper name */ + PSZ Wrapname; /* Java driver name */ }; // end of JDBCDEF #if !defined(NJDBC) @@ -89,7 +89,7 @@ protected: JDBConn *Jcp; // Points to a JDBC connection class JDBCCOL *Cnp; // Points to count(*) column JDBCPARM Ops; // Additional parameters - char *WrapName; // Points to Java wrapper name + PSZ Wrapname; // Points to Java wrapper name bool Prepared; // True when using prepared statement bool Werr; // Write error bool Rerr; // Rewind error diff --git a/storage/connect/tabjmg.cpp b/storage/connect/tabjmg.cpp new file mode 100644 index 00000000000..c784fa9b3c0 --- /dev/null +++ b/storage/connect/tabjmg.cpp @@ -0,0 +1,505 @@ +/************** tabjmg C++ Program Source Code File (.CPP) *************/ +/* PROGRAM NAME: tabjmg Version 1.2 */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* This file contains the MongoDB classes using the Java Driver. */ +/***********************************************************************/ + +/***********************************************************************/ +/* Include relevant sections of the MariaDB header file. */ +/***********************************************************************/ +#include <my_global.h> + +/***********************************************************************/ +/* Include application header files: */ +/* global.h is header containing all global declarations. */ +/* plgdbsem.h is header containing the DB application declarations. */ +/***********************************************************************/ +#include "global.h" +#include "plgdbsem.h" +#include "xtable.h" +#include "maputil.h" +#include "filamtxt.h" +#include "tabext.h" +#include "tabjmg.h" +#include "tabmul.h" +#include "checklvl.h" +#include "resource.h" +#include "mycat.h" // for FNC_COL +#include "filter.h" + +/***********************************************************************/ +/* This should be an option. */ +/***********************************************************************/ +#define MAXCOL 200 /* Default max column nb in result */ +#define TYPE_UNKNOWN 12 /* Must be greater than other types */ + +/* --------------------------- Class TDBJMG -------------------------- */ + +/***********************************************************************/ +/* Implementation of the TDBJMG class. */ +/***********************************************************************/ +TDBJMG::TDBJMG(PMGODEF tdp) : TDBEXT(tdp) +{ + Jcp = NULL; +//Cnp = NULL; + + if (tdp) { + Ops.Driver = tdp->Tabschema; + Ops.Url = tdp->Uri; + Ops.Version = tdp->Version; + Uri = tdp->Uri; + Db_name = tdp->Tabschema; + Wrapname = tdp->Wrapname; + Coll_name = tdp->Tabname; + Options = tdp->Colist; + Filter = tdp->Filter; + B = tdp->Base ? 1 : 0; + Pipe = tdp->Pipe && Options != NULL; + } else { + Ops.Driver = NULL; + Ops.Url = NULL; + Ops.Version = 0; + Uri = NULL; + Db_name = NULL; + Coll_name = NULL; + Options = NULL; + Filter = NULL; + B = 0; + Pipe = false; + } // endif tdp + + Ops.User = NULL; + Ops.Pwd = NULL; + Ops.Scrollable = false; + Ops.Fsize = Ops.CheckSize(Rows); + Fpos = -1; + N = 0; + Done = false; +} // end of TDBJMG standard constructor + +TDBJMG::TDBJMG(TDBJMG *tdbp) : TDBEXT(tdbp) +{ + Uri = tdbp->Uri; +//Pool = tdbp->Pool; +//Client = tdbp->Client; +//Database = NULL; +//Collection = tdbp->Collection; +//Cursor = tdbp->Cursor; +//Query = tdbp->Query; +//Opts = tdbp->Opts; +//Fpc = tdbp->Fpc; +//Cnd = tdbp->Cnd; +//Uristr = tdbp->Uristr; + Db_name = tdbp->Db_name;; + Coll_name = tdbp->Coll_name; + Options = tdbp->Options; + Filter = tdbp->Filter; + B = tdbp->B; + Fpos = tdbp->Fpos; + N = tdbp->N; + Done = tdbp->Done; + Pipe = tdbp->Pipe; +} // end of TDBJMG copy constructor + +// Used for update +PTDB TDBJMG::Clone(PTABS t) +{ + PTDB tp; + PJMGCOL cp1, cp2; + PGLOBAL g = t->G; + + tp = new(g) TDBJMG(this); + + for (cp1 = (PJMGCOL)Columns; cp1; cp1 = (PJMGCOL)cp1->GetNext()) + if (!cp1->IsSpecial()) { + cp2 = new(g) JMGCOL(cp1, tp); // Make a copy + NewPointer(t, cp1, cp2); + } // endif cp1 + + return tp; +} // end of Clone + +/***********************************************************************/ +/* Allocate JSN column description block. */ +/***********************************************************************/ +PCOL TDBJMG::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n) +{ + PJMGCOL colp = new(g) JMGCOL(g, cdp, this, cprec, n); + +//colp->Mbuf = (char*)PlugSubAlloc(g, NULL, colp->Long + 1); + return colp; + //return (colp->ParseJpath(g)) ? NULL : colp; +} // end of MakeCol + +/***********************************************************************/ +/* InsertSpecialColumn: Put a special column ahead of the column list.*/ +/***********************************************************************/ +PCOL TDBJMG::InsertSpecialColumn(PCOL colp) +{ + if (!colp->IsSpecial()) + return NULL; + + colp->SetNext(Columns); + Columns = colp; + return colp; +} // end of InsertSpecialColumn + +/***********************************************************************/ +/* MONGO Cardinality: returns table size in number of rows. */ +/***********************************************************************/ +int TDBJMG::Cardinality(PGLOBAL g) +{ + if (!g) + return 1; + else if (Cardinal < 0) + Cardinal = (!Init(g)) ? Jcp->CollSize(g) : 0; + + return Cardinal; +} // end of Cardinality + +/***********************************************************************/ +/* MONGO GetMaxSize: returns collection size estimate. */ +/***********************************************************************/ +int TDBJMG::GetMaxSize(PGLOBAL g) +{ + if (MaxSize < 0) + MaxSize = Cardinality(g); + + return MaxSize; +} // end of GetMaxSize + +/***********************************************************************/ +/* Init: initialize MongoDB processing. */ +/***********************************************************************/ +bool TDBJMG::Init(PGLOBAL g) +{ + if (Done) + return false; + + /*********************************************************************/ + /* Open an JDBC connection for this table. */ + /* Note: this may not be the proper way to do. Perhaps it is better */ + /* to test whether a connection is already open for this datasource */ + /* and if so to allocate just a new result set. But this only for */ + /* drivers allowing concurency in getting results ??? */ + /*********************************************************************/ + if (!Jcp) + Jcp = new(g) JMgoConn(g, Coll_name, Wrapname); + else if (Jcp->IsOpen()) + Jcp->Close(); + + if (Jcp->Connect(&Ops)) + return true; + + Done = true; + return false; +} // end of Init + +/***********************************************************************/ +/* OpenDB: Data Base open routine for MONGO access method. */ +/***********************************************************************/ +bool TDBJMG::OpenDB(PGLOBAL g) +{ + if (Use == USE_OPEN) { + /*******************************************************************/ + /* Table already open replace it at its beginning. */ + /*******************************************************************/ + if (Jcp->Rewind()) + return true; + + Fpos = -1; + return false; + } // endif Use + + /*********************************************************************/ + /* First opening. */ + /*********************************************************************/ + if (Pipe && Mode != MODE_READ) { + strcpy(g->Message, "Pipeline tables are read only"); + return true; + } // endif Pipe + + if (Init(g)) + return true; + + if (Jcp->GetMethodId(g, Mode)) + return true; + + if (Mode == MODE_DELETE && !Next) { + // Delete all documents + if (!Jcp->MakeCursor(g, this, "all", Filter, false)) + if (Jcp->DocDelete(g, true) == RC_OK) + return false; + + return true; + } // endif Mode + + if (Mode == MODE_INSERT) + Jcp->MakeColumnGroups(g, this); + + if (Mode != MODE_UPDATE) + return Jcp->MakeCursor(g, this, Options, Filter, Pipe); + + return false; +} // end of OpenDB + +/***********************************************************************/ +/* Data Base indexed read routine for ODBC access method. */ +/***********************************************************************/ +bool TDBJMG::ReadKey(PGLOBAL g, OPVAL op, const key_range *kr) +{ + strcpy(g->Message, "MONGO tables are not indexable"); + return true; +} // end of ReadKey + +/***********************************************************************/ +/* ReadDB: Get next document from a collection. */ +/***********************************************************************/ +int TDBJMG::ReadDB(PGLOBAL g) +{ + int rc = RC_OK; + + if (!N && Mode == MODE_UPDATE) + if (Jcp->MakeCursor(g, this, Options, Filter, Pipe)) + return RC_FX; + + if (++CurNum >= Rbuf) { + Rbuf = Jcp->Fetch(); + Curpos = Fpos + 1; + CurNum = 0; + N++; + } // endif CurNum + + rc = (Rbuf > 0) ? RC_OK : (Rbuf == 0) ? RC_EF : RC_FX; + + return rc; +} // end of ReadDB + +/***********************************************************************/ +/* WriteDB: Data Base write routine for DOS access method. */ +/***********************************************************************/ +int TDBJMG::WriteDB(PGLOBAL g) +{ + int rc = RC_OK; + + if (Mode == MODE_INSERT) { + rc = Jcp->DocWrite(g); + } else if (Mode == MODE_DELETE) { + rc = Jcp->DocDelete(g, false); + } else if (Mode == MODE_UPDATE) { + rc = Jcp->DocUpdate(g, this); + } // endif Mode + + return rc; +} // end of WriteDB + +/***********************************************************************/ +/* Data Base delete line routine for ODBC access method. */ +/***********************************************************************/ +int TDBJMG::DeleteDB(PGLOBAL g, int irc) +{ + return (irc == RC_OK) ? WriteDB(g) : RC_OK; +} // end of DeleteDB + +/***********************************************************************/ +/* Table close routine for MONGO tables. */ +/***********************************************************************/ +void TDBJMG::CloseDB(PGLOBAL g) +{ + Jcp->Close(); + Done = false; +} // end of CloseDB + +/* ----------------------------- JMGCOL ------------------------------ */ + +/***********************************************************************/ +/* JMGCOL public constructor. */ +/***********************************************************************/ +JMGCOL::JMGCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i) + : EXTCOL(cdp, tdbp, cprec, i, "MGO") +{ + Tmgp = (PTDBJMG)(tdbp->GetOrig() ? tdbp->GetOrig() : tdbp); + Jpath = cdp->GetFmt() ? cdp->GetFmt() : cdp->GetName(); +//Mbuf = NULL; +} // end of JMGCOL constructor + +/***********************************************************************/ +/* JMGCOL constructor used for copying columns. */ +/* tdbp is the pointer to the new table descriptor. */ +/***********************************************************************/ +JMGCOL::JMGCOL(JMGCOL *col1, PTDB tdbp) : EXTCOL(col1, tdbp) +{ + Tmgp = col1->Tmgp; + Jpath = col1->Jpath; +//Mbuf = col1->Mbuf; +} // end of JMGCOL copy constructor + +/***********************************************************************/ +/* Get path when proj is false or projection path when proj is true. */ +/***********************************************************************/ +PSZ JMGCOL::GetJpath(PGLOBAL g, bool proj) +{ + if (Jpath) { + if (proj) { + char *p1, *p2, *projpath = PlugDup(g, Jpath); + int i = 0; + + for (p1 = p2 = projpath; *p1; p1++) + if (*p1 == '.') { + if (!i) + *p2++ = *p1; + + i = 1; + } else if (i) { + if (!isdigit(*p1)) { + *p2++ = *p1; + i = 0; + } // endif p1 + + } else + *p2++ = *p1; + + *p2 = 0; + return projpath; + } else + return Jpath; + + } else + return Name; + +} // end of GetJpath + +#if 0 +/***********************************************************************/ +/* Mini: used to suppress blanks to json strings. */ +/***********************************************************************/ +char *JMGCOL::Mini(PGLOBAL g, const bson_t *bson, bool b) +{ + char *s, *str = NULL; + int i, k = 0; + bool ok = true; + + if (b) + s = str = bson_array_as_json(bson, NULL); + else + s = str = bson_as_json(bson, NULL); + + for (i = 0; i < Long && s[i]; i++) { + switch (s[i]) { + case ' ': + if (ok) continue; + case '"': + ok = !ok; + default: + break; + } // endswitch s[i] + + Mbuf[k++] = s[i]; + } // endfor i + + bson_free(str); + + if (i >= Long) { + sprintf(g->Message, "Value too long for column %s", Name); + throw (int)TYPE_AM_MGO; + } // endif i + + Mbuf[k] = 0; + return Mbuf; +} // end of Mini +#endif // 0 + +/***********************************************************************/ +/* ReadColumn: */ +/***********************************************************************/ +void JMGCOL::ReadColumn(PGLOBAL g) +{ + Value->SetValue_psz(Tmgp->Jcp->GetColumnValue(Jpath)); +} // end of ReadColumn + +/***********************************************************************/ +/* WriteColumn: */ +/***********************************************************************/ +void JMGCOL::WriteColumn(PGLOBAL g) +{ + // Check whether this node must be written + if (Value != To_Val) + Value->SetValue_pval(To_Val, FALSE); // Convert the updated value + +} // end of WriteColumn + +#if 0 +/***********************************************************************/ +/* AddValue: Add column value to the document to insert or update. */ +/***********************************************************************/ +bool JMGCOL::AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd) +{ + bool rc = false; + + if (Value->IsNull()) { + if (upd) + rc = BSON_APPEND_NULL(doc, key); + else + return false; + + } else switch (Buf_Type) { + case TYPE_STRING: + rc = BSON_APPEND_UTF8(doc, key, Value->GetCharValue()); + break; + case TYPE_INT: + case TYPE_SHORT: + rc = BSON_APPEND_INT32(doc, key, Value->GetIntValue()); + break; + case TYPE_TINY: + rc = BSON_APPEND_BOOL(doc, key, Value->GetIntValue()); + break; + case TYPE_BIGINT: + rc = BSON_APPEND_INT64(doc, key, Value->GetBigintValue()); + break; + case TYPE_DOUBLE: + rc = BSON_APPEND_DOUBLE(doc, key, Value->GetFloatValue()); + break; + case TYPE_DECIM: + {bson_decimal128_t dec; + + if (bson_decimal128_from_string(Value->GetCharValue(), &dec)) + rc = BSON_APPEND_DECIMAL128(doc, key, &dec); + + } break; + case TYPE_DATE: + rc = BSON_APPEND_DATE_TIME(doc, key, Value->GetBigintValue() * 1000); + break; + default: + sprintf(g->Message, "Type %d not supported yet", Buf_Type); + return true; + } // endswitch Buf_Type + + if (!rc) { + strcpy(g->Message, "Adding value failed"); + return true; + } else + return false; + +} // end of AddValue + +/* ---------------------------TDBGOL class --------------------------- */ + +/***********************************************************************/ +/* TDBGOL class constructor. */ +/***********************************************************************/ +TDBGOL::TDBGOL(PMGODEF tdp) : TDBCAT(tdp) +{ + Topt = tdp->GetTopt(); + Db = (char*)tdp->GetTabschema(); +} // end of TDBJCL constructor + + /***********************************************************************/ + /* GetResult: Get the list the JSON file columns. */ + /***********************************************************************/ +PQRYRES TDBGOL::GetResult(PGLOBAL g) +{ + return MGOColumns(g, Db, Topt, false); +} // end of GetResult +#endif // 0 + + /* -------------------------- End of mongo --------------------------- */ diff --git a/storage/connect/tabjmg.h b/storage/connect/tabjmg.h new file mode 100644 index 00000000000..92b5df2419f --- /dev/null +++ b/storage/connect/tabjmg.h @@ -0,0 +1,122 @@ +/**************** tabjmg H Declares Source Code File (.H) **************/ +/* Name: tabjmg.h Version 1.0 */ +/* */ +/* (C) Copyright to the author Olivier BERTRAND 2017 */ +/* */ +/* This file contains the MongoDB classes using the Java Driver. */ +/***********************************************************************/ +#include "mongo.h" +#include "jmgoconn.h" +#include "jdbccat.h" + +/* -------------------------- TDBJMG class --------------------------- */ + +/***********************************************************************/ +/* This is the MongoDB Table Type using the Java Driver. */ +/* The table is a collection, each record being a document. */ +/***********************************************************************/ +class DllExport TDBJMG : public TDBEXT { + friend class JMGCOL; + friend class MGODEF; + friend class MGODISC; + friend class JAVAConn; + friend PQRYRES MGOColumns(PGLOBAL, PCSZ, PCSZ, PTOS, bool); +public: + // Constructor + TDBJMG(PMGODEF tdp); + TDBJMG(TDBJMG *tdbp); + + // Implementation + virtual AMT GetAmType(void) { return TYPE_AM_MGO; } + virtual PTDB Duplicate(PGLOBAL g) { return (PTDB)new(g) TDBJMG(this); } + + // Methods + virtual PTDB Clone(PTABS t); + virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n); + virtual PCOL InsertSpecialColumn(PCOL colp); +//virtual void SetFilter(PFIL fp); + virtual int RowNumber(PGLOBAL g, bool b = FALSE) { return N; } + + // Database routines + virtual int Cardinality(PGLOBAL g); + virtual int GetMaxSize(PGLOBAL g); + virtual bool OpenDB(PGLOBAL g); + virtual int ReadDB(PGLOBAL g); + virtual int WriteDB(PGLOBAL g); + virtual int DeleteDB(PGLOBAL g, int irc); + virtual void CloseDB(PGLOBAL g); + virtual bool ReadKey(PGLOBAL g, OPVAL op, const key_range *kr); + +protected: + bool Init(PGLOBAL g); + + // Members + JMgoConn *Jcp; // Points to a Mongo connection class +//JMGCOL *Cnp; // Points to count(*) column + JDBCPARM Ops; // Additional parameters + PCSZ Uri; + PCSZ Db_name; + PCSZ Coll_name; + PCSZ Options; // The MongoDB options + PCSZ Filter; // The filtering query + PSZ Wrapname; // Java wrapper name + int Fpos; // The current row index + int N; // The current Rownum + int B; // Array index base + bool Done; // Init done + bool Pipe; // True for pipeline +}; // end of class TDBJMG + +/* --------------------------- JMGCOL class -------------------------- */ + +/***********************************************************************/ +/* Class JMGCOL: MongoDB access method column descriptor. */ +/***********************************************************************/ +class DllExport JMGCOL : public EXTCOL { + friend class TDBJMG; + friend class FILTER; +public: + // Constructors + JMGCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i); + JMGCOL(JMGCOL *colp, PTDB tdbp); // Constructor used in copy process + + // Implementation + virtual int GetAmType(void) {return Tmgp->GetAmType();} + + // Methods + //virtual bool SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check); + virtual PSZ GetJpath(PGLOBAL g, bool proj); + virtual void ReadColumn(PGLOBAL g); + virtual void WriteColumn(PGLOBAL g); +//bool AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd); + +protected: + // Default constructor not to be used + JMGCOL(void) {} +//char *GetProjPath(PGLOBAL g); +//char *Mini(PGLOBAL g, const bson_t *bson, bool b); + + // Members + TDBJMG *Tmgp; // To the MGO table block + char *Jpath; // The json path +//char *Mbuf; // The Mini buffer +}; // end of class JMGCOL + +#if 0 +/***********************************************************************/ +/* This is the class declaration for the MONGO catalog table. */ +/***********************************************************************/ +class DllExport TDBGOL : public TDBCAT { +public: + // Constructor + TDBGOL(PMGODEF tdp); + +protected: + // Specific routines + virtual PQRYRES GetResult(PGLOBAL g); + + // Members + PTOS Topt; + char *Db; +}; // end of class TDBGOL +#endif 0 diff --git a/storage/connect/tabjson.cpp b/storage/connect/tabjson.cpp index e4246b7cf29..2deb3e845c3 100644 --- a/storage/connect/tabjson.cpp +++ b/storage/connect/tabjson.cpp @@ -31,6 +31,9 @@ #if defined(ZIP_SUPPORT) #include "filamzip.h" #endif // ZIP_SUPPORT +#if defined(JDBC_SUPPORT) +#include "jmgfam.h" +#endif // JDBC_SUPPORT #if defined(MONGO_SUPPORT) #include "mongofam.h" #endif // MONGO_SUPPORT @@ -68,7 +71,7 @@ typedef struct _jncol { /* JSONColumns: construct the result blocks containing the description */ /* of all the columns of a table contained inside a JSON file. */ /***********************************************************************/ -PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) +PQRYRES JSONColumns(PGLOBAL g, PCSZ db, PCSZ dsn, PTOS topt, bool info) { static int buftyp[] = {TYPE_STRING, TYPE_SHORT, TYPE_STRING, TYPE_INT, TYPE_INT, TYPE_SHORT, TYPE_SHORT, TYPE_STRING}; @@ -78,7 +81,8 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) char *p, colname[65], fmt[129]; int i, j, lvl, n = 0; int ncol = sizeof(buftyp) / sizeof(int); - PCSZ sep; + bool mgo = (GetTypeID(topt->type) == TAB_MONGO); + PCSZ sep, level; PVAL valp; JCOL jcol; PJCL jcp, fjcp = NULL, pjcp = NULL; @@ -108,8 +112,14 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) /*********************************************************************/ /* Open the input file. */ /*********************************************************************/ - lvl = GetIntegerTableOption(g, topt, "Level", 0); - lvl = (lvl < 0) ? 0 : (lvl > 16) ? 16 : lvl; + level = GetStringTableOption(g, topt, "Level", NULL); + + if (level) { + lvl = atoi(level); + lvl = (lvl > 16) ? 16 : lvl; + } else + lvl = 0; + sep = GetStringTableOption(g, topt, "Separator", "."); tdp = new(g) JSONDEF; @@ -119,11 +129,6 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) #endif // ZIP_SUPPORT tdp->Fn = GetStringTableOption(g, topt, "Filename", NULL); - if (!tdp->Fn && !dsn) { - strcpy(g->Message, MSG(MISSING_FNAME)); - return NULL; - } // endif Fn - if (!(tdp->Database = SetPath(g, db))) return NULL; @@ -133,22 +138,30 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) tdp->Xcol = GetStringTableOption(g, topt, "Expand", NULL); tdp->Uri = (dsn && *dsn ? dsn : NULL); - if (trace) + if (!tdp->Fn && !tdp->Uri) { + strcpy(g->Message, MSG(MISSING_FNAME)); + return NULL; + } // endif Fn + + if (trace) htrc("File %s objname=%s pretty=%d lvl=%d\n", tdp->Fn, tdp->Objname, tdp->Pretty, lvl); if (tdp->Uri) { -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) tdp->Collname = GetStringTableOption(g, topt, "Name", NULL); tdp->Collname = GetStringTableOption(g, topt, "Tabname", tdp->Collname); tdp->Schema = GetStringTableOption(g, topt, "Dbname", "test"); - tdp->Options = PlugDup(g, "all"); -// tdp->Pipe = GetBooleanTableOption(g, topt, "Pipeline", false); + tdp->Options = (PSZ)GetStringTableOption(g, topt, "Colist", "all"); + tdp->Pipe = GetBooleanTableOption(g, topt, "Pipeline", false); + tdp->Version = GetIntegerTableOption(g, topt, "Version", 3); + tdp->Wrapname = (PSZ)GetStringTableOption(g, topt, "Wrapper", + (tdp->Version == 2) ? "Mongo2Interface" : "Mongo3Interface"); tdp->Pretty = 0; -#else // !MONGO_SUPPORT +#else // !MONGO_SUPPORT || JDBC_SUPPORT sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO"); return NULL; -#endif // !MONGO_SUPPORT +#endif // !MONGO_SUPPORT || JDBC_SUPPORT } // endif Uri if (tdp->Pretty == 2) { @@ -167,10 +180,12 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) jsp = (tjsp->GetDoc()) ? tjsp->GetDoc()->GetValue(0) : NULL; } else { - if (!(tdp->Lrecl = GetIntegerTableOption(g, topt, "Lrecl", 0))) { - sprintf(g->Message, "LRECL must be specified for pretty=%d", tdp->Pretty); - return NULL; - } // endif lrecl + if (!(tdp->Lrecl = GetIntegerTableOption(g, topt, "Lrecl", 0))) + if (!mgo) { + sprintf(g->Message, "LRECL must be specified for pretty=%d", tdp->Pretty); + return NULL; + } else + tdp->Lrecl = 8192; // Should be enough tdp->Ending = GetIntegerTableOption(g, topt, "Ending", CRLF); @@ -182,12 +197,21 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) return NULL; #endif // !ZIP_SUPPORT } else if (tdp->Uri) { -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) +#if !defined(JDBC_SUPPORT) tjnp = new(g) TDBJSN(tdp, new(g) MGOFAM(tdp)); -#else // !MONGO_SUPPORT - sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO"); +#elif !defined(MONGO_SUPPORT) + tjnp = new(g) TDBJSN(tdp, new(g) JMGFAM(tdp)); +#else + if (tdp->Driver && toupper(*tdp->Driver) == 'C') + tjnp = new(g) TDBJSN(tdp, new(g) MGOFAM(tdp)); + else + tjnp = new(g) TDBJSN(tdp, new(g) JMGFAM(tdp)); +#endif +#else // !MONGO_SUPPORT && !JDBC_SUPPORT + sprintf(g->Message, "No MongoDB support"); return NULL; -#endif // !MONGO_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT } else tjnp = new(g) TDBJSN(tdp, new(g) DOSFAM(tdp)); @@ -229,15 +253,15 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) jcol.Found = true; colname[64] = 0; fmt[128] = 0; - *fmt = '$'; if (!tdp->Uri) { + *fmt = '$'; fmt[1] = '.'; p = fmt + 2; } else - p = fmt + 1; + p = fmt; - jrp = (PJPR*)PlugSubAlloc(g, NULL, sizeof(PJPR) * lvl); + jrp = (PJPR*)PlugSubAlloc(g, NULL, sizeof(PJPR) * MY_MAX(lvl, 0)); /*********************************************************************/ /* Analyse the JSON tree and define columns. */ @@ -300,12 +324,13 @@ PQRYRES JSONColumns(PGLOBAL g, char *db, char *dsn, PTOS topt, bool info) } // endswitch jsp goto retry; - } else { + } else if (lvl >= 0) { jcol.Type = TYPE_STRING; jcol.Len = 256; jcol.Scale = 0; jcol.Cbn = true; - } // endif's + } else + continue; // Check whether this column was already found for (jcp = fjcp; jcp; jcp = jcp->Next) @@ -461,11 +486,16 @@ JSONDEF::JSONDEF(void) Base = 0; Strict = false; Sep = '.'; -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) Uri = NULL; Collname = Schema = Options = Filter = NULL; Pipe = false; -#endif // MONGO_SUPPORT + Driver = NULL; + Version = 0; +#if defined(JDBC_SUPPORT) + Wrapname = NULL; +#endif // JDBC_SUPPORT +#endif // !MONGO_SUPPORT && !JDBC_SUPPORT } // end of JSONDEF constructor /***********************************************************************/ @@ -482,7 +512,7 @@ bool JSONDEF::DefineAM(PGLOBAL g, LPCSTR, int poff) Sep = *GetStringCatInfo(g, "Separator", "."); if (Uri = GetStringCatInfo(g, "Connect", NULL)) { -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) Collname = GetStringCatInfo(g, "Name", (Catfunc & (FNC_TABLE | FNC_COL)) ? NULL : Name); Collname = GetStringCatInfo(g, "Tabname", Collname); @@ -490,11 +520,19 @@ bool JSONDEF::DefineAM(PGLOBAL g, LPCSTR, int poff) Options = GetStringCatInfo(g, "Colist", NULL); Filter = GetStringCatInfo(g, "Filter", NULL); Pipe = GetBoolCatInfo("Pipeline", false); + Driver = GetStringCatInfo(g, "Driver", NULL); + Version = GetIntCatInfo("Version", 3); Pretty = 0; -#else // !MONGO_SUPPORT +#if defined(JDBC_SUPPORT) + if (Version == 2) + Wrapname = GetStringCatInfo(g, "Wrapper", "Mongo2Interface"); + else + Wrapname = GetStringCatInfo(g, "Wrapper", "Mongo3Interface"); +#endif // JDBC_SUPPORT +#else // !MONGO_SUPPORT && !JDBC_SUPPORT sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO"); return true; -#endif // !MONGO_SUPPORT +#endif // !MONGO_SUPPORT && !JDBC_SUPPORT } // endif Uri return DOSDEF::DefineAM(g, (Uri ? "XMGO" : "DOS"), poff); @@ -520,12 +558,18 @@ PTDB JSONDEF::GetTable(PGLOBAL g, MODE m) (m == MODE_UPDATE || m == MODE_DELETE)); if (Uri) { -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) +#if !defined(JDBC_SUPPORT) txfp = new(g) MGOFAM(this); -#else // !MONGO_SUPPORT - sprintf(g->Message, MSG(NO_FEAT_SUPPORT), "MONGO"); - return NULL; -#endif // !MONGO_SUPPORT +#elif !defined(MONGO_SUPPORT) + txfp = new(g) JMGFAM(this); +#else + if (Driver && toupper(*Driver) == 'C') + txfp = new(g) MGOFAM(this); + else + txfp = new(g) JMGFAM(this); +#endif +#endif // MONGO_SUPPORT || JDBC_SUPPORT } else if (Zipped) { #if defined(ZIP_SUPPORT) if (m == MODE_READ || m == MODE_ANY || m == MODE_ALTER) { @@ -559,14 +603,19 @@ PTDB JSONDEF::GetTable(PGLOBAL g, MODE m) tdbp = new(g) TDBJSN(this, txfp); #if USE_G - // Allocate the parse work memory - PGLOBAL G = (PGLOBAL)PlugSubAlloc(g, NULL, sizeof(GLOBAL)); - memset(G, 0, sizeof(GLOBAL)); - G->Sarea_Size = Lrecl * 10; - G->Sarea = PlugSubAlloc(g, NULL, G->Sarea_Size); - PlugSubSet(G, G->Sarea, G->Sarea_Size); - G->jump_level = 0; - ((TDBJSN*)tdbp)->G = G; + if (Lrecl) { + // Allocate the parse work memory + PGLOBAL G = (PGLOBAL)PlugSubAlloc(g, NULL, sizeof(GLOBAL)); + memset(G, 0, sizeof(GLOBAL)); + G->Sarea_Size = Lrecl * 10; + G->Sarea = PlugSubAlloc(g, NULL, G->Sarea_Size); + PlugSubSet(G, G->Sarea, G->Sarea_Size); + G->jump_level = 0; + ((TDBJSN*)tdbp)->G = G; + } else { + strcpy(g->Message, "LRECL is not defined"); + return NULL; + } // endif Lrecl #else ((TDBJSN*)tdbp)->G = g; #endif @@ -791,13 +840,16 @@ bool TDBJSN::OpenDB(PGLOBAL g) return true; } // endswitch Jmode - if (Xcol && Txfp->GetAmType() != TYPE_AM_MGO) - To_Filter = NULL; // Imcompatible - } // endif Use - return TDBDOS::OpenDB(g); - } // end of OpenDB + if (TDBDOS::OpenDB(g)) + return true; + + if (Xcol) + To_Filter = NULL; // Imcompatible + + return false; +} // end of OpenDB /***********************************************************************/ /* SkipHeader: Physically skip first header line if applicable. */ @@ -1282,7 +1334,7 @@ fin: /***********************************************************************/ /* Get Jpath converted to Mongo path. */ /***********************************************************************/ -char *JSONCOL::GetJpath(PGLOBAL g, bool proj) +PSZ JSONCOL::GetJpath(PGLOBAL g, bool proj) { if (Jpath) { char *p1, *p2, *mgopath; @@ -2204,8 +2256,8 @@ void TDBJSON::CloseDB(PGLOBAL g) TDBJCL::TDBJCL(PJDEF tdp) : TDBCAT(tdp) { Topt = tdp->GetTopt(); - Db = (char*)tdp->GetDB(); - Dsn = (char*)tdp->Uri; + Db = tdp->GetDB(); + Dsn = tdp->Uri; } // end of TDBJCL constructor /***********************************************************************/ diff --git a/storage/connect/tabjson.h b/storage/connect/tabjson.h index c87bf7ff903..6cdd2993e1f 100644 --- a/storage/connect/tabjson.h +++ b/storage/connect/tabjson.h @@ -36,10 +36,13 @@ class DllExport JSONDEF : public DOSDEF { /* Table description */ friend class TDBJSON; friend class TDBJSN; friend class TDBJCL; - friend PQRYRES JSONColumns(PGLOBAL, char*, char*, PTOS, bool); +#if defined(JDBC_SUPPORT) + friend class JMGFAM; +#endif // JDBC_SUPPORT #if defined(MONGO_SUPPORT) friend class MGOFAM; #endif // MONGO_SUPPORT + friend PQRYRES JSONColumns(PGLOBAL, PCSZ, PCSZ, PTOS, bool); public: // Constructor JSONDEF(void); @@ -63,13 +66,18 @@ public: bool Strict; /* Strict syntax checking */ char Sep; /* The Jpath separator */ const char *Uri; /* MongoDB connection URI */ -#if defined(MONGO_SUPPORT) +#if defined(MONGO_SUPPORT) || defined(JDBC_SUPPORT) PCSZ Collname; /* External collection name */ PCSZ Schema; /* External schema (DB) name */ PSZ Options; /* Colist ; Pipe */ PSZ Filter; /* Filter */ + PSZ Driver; /* MongoDB Driver (C or JAVA) */ bool Pipe; /* True if Colist is a pipeline */ -#endif // MONGO_SUPPORT + int Version; /* Driver version */ +#if defined(JDBC_SUPPORT) + PSZ Wrapname; /* MongoDB java wrapper name */ +#endif // JDBC_SUPPORT +#endif // MONGO_SUPPORT || JDBC_SUPPORT }; // end of JSONDEF /* -------------------------- TDBJSN class --------------------------- */ @@ -81,6 +89,9 @@ public: class DllExport TDBJSN : public TDBDOS { friend class JSONCOL; friend class JSONDEF; +#if defined(JDBC_SUPPORT) + friend class JMGFAM; +#endif // JDBC_SUPPORT #if defined(MONGO_SUPPORT) friend class MGOFAM; #endif // MONGO_SUPPORT @@ -148,8 +159,13 @@ public: class DllExport JSONCOL : public DOSCOL { friend class TDBJSN; friend class TDBJSON; +#if defined(JDBC_SUPPORT) + friend class JMGFAM; +#endif // JDBC_SUPPORT +#if defined(MONGO_SUPPORT) friend class MGOFAM; - public: +#endif // MONGO_SUPPORT +public: // Constructors JSONCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i); JSONCOL(JSONCOL *colp, PTDB tdbp); // Constructor used in copy process @@ -160,7 +176,7 @@ class DllExport JSONCOL : public DOSCOL { // Methods virtual bool SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check); bool ParseJpath(PGLOBAL g); - char *GetJpath(PGLOBAL g, bool proj); + virtual PSZ GetJpath(PGLOBAL g, bool proj); virtual void ReadColumn(PGLOBAL g); virtual void WriteColumn(PGLOBAL g); @@ -252,7 +268,7 @@ class DllExport TDBJCL : public TDBCAT { virtual PQRYRES GetResult(PGLOBAL g); // Members - PTOS Topt; - char *Db; - char *Dsn; + PTOS Topt; + PCSZ Db; + PCSZ Dsn; }; // end of class TDBJCL diff --git a/storage/connect/tabmgo.cpp b/storage/connect/tabmgo.cpp index 639affaac1c..810785ede80 100644 --- a/storage/connect/tabmgo.cpp +++ b/storage/connect/tabmgo.cpp @@ -35,11 +35,13 @@ #define MAXCOL 200 /* Default max column nb in result */ #define TYPE_UNKNOWN 12 /* Must be greater than other types */ +bool IsNum(PSZ s); + /***********************************************************************/ /* MGOColumns: construct the result blocks containing the description */ /* of all the columns of a document contained inside MongoDB. */ /***********************************************************************/ -PQRYRES MGOColumns(PGLOBAL g, char *db, PTOS topt, bool info) +PQRYRES MGOColumns(PGLOBAL g, PCSZ db, PCSZ uri, PTOS topt, bool info) { static int buftyp[] = {TYPE_STRING, TYPE_SHORT, TYPE_STRING, TYPE_INT, TYPE_INT, TYPE_SHORT, TYPE_SHORT, TYPE_STRING}; @@ -64,7 +66,7 @@ PQRYRES MGOColumns(PGLOBAL g, char *db, PTOS topt, bool info) /*********************************************************************/ mgd = new(g) MGODISC(g, (int*)length); - if ((n = mgd->GetColumns(g, db, topt)) < 0) + if ((n = mgd->GetColumns(g, db, uri, topt)) < 0) goto err; skipit: @@ -142,7 +144,7 @@ MGODISC::MGODISC(PGLOBAL g, int *lg) { /***********************************************************************/ /* Class used to get the columns of a mongo collection. */ /***********************************************************************/ -int MGODISC::GetColumns(PGLOBAL g, char *db, PTOS topt) +int MGODISC::GetColumns(PGLOBAL g, PCSZ db, PCSZ uri, PTOS topt) { PCSZ level; bson_iter_t iter; @@ -164,7 +166,7 @@ int MGODISC::GetColumns(PGLOBAL g, char *db, PTOS topt) /* Open the MongoDB collection. */ /*********************************************************************/ tdp = new(g) MGODEF; - tdp->Uri = GetStringTableOption(g, topt, "Connect", "mongodb://localhost:27017"); + tdp->Uri = uri; tdp->Tabname = GetStringTableOption(g, topt, "Name", NULL); tdp->Tabname = GetStringTableOption(g, topt, "Tabname", tdp->Tabname); tdp->Tabschema = GetStringTableOption(g, topt, "Dbname", db); @@ -200,7 +202,7 @@ int MGODISC::GetColumns(PGLOBAL g, char *db, PTOS topt) case RC_FX: return -1; default: - doc = tmgp->Document; + doc = tmgp->Cmgp->Document; } // endswitch ReadDB if (FindInDoc(g, &iter, doc, NULL, NULL, i, k, false)) @@ -368,140 +370,33 @@ bool MGODISC::FindInDoc(PGLOBAL g, bson_iter_t *iter, const bson_t *doc, return false; } // end of FindInDoc -/* -------------------------- Class MGODEF --------------------------- */ - -MGODEF::MGODEF(void) -{ - Uri = NULL; - Colist = NULL; - Filter = NULL; - Level = 0; - Base = 0; - Pipe = false; -} // end of MGODEF constructor - -/***********************************************************************/ -/* DefineAM: define specific AM block values. */ -/***********************************************************************/ -bool MGODEF::DefineAM(PGLOBAL g, LPCSTR, int poff) -{ - if (EXTDEF::DefineAM(g, "MGO", poff)) - return true; - else if (!Tabschema) - Tabschema = GetStringCatInfo(g, "Dbname", "*"); - - Uri = GetStringCatInfo(g, "Connect", "mongodb://localhost:27017"); - Colist = GetStringCatInfo(g, "Colist", NULL); - Filter = GetStringCatInfo(g, "Filter", NULL); - Base = GetIntCatInfo("Base", 0) ? 1 : 0; - Pipe = GetBoolCatInfo("Pipeline", false); - return false; -} // end of DefineAM - -/***********************************************************************/ -/* GetTable: makes a new Table Description Block. */ -/***********************************************************************/ -PTDB MGODEF::GetTable(PGLOBAL g, MODE m) -{ - if (Catfunc == FNC_COL) - return new(g)TDBGOL(this); - - return new(g) TDBMGO(this); -} // end of GetTable - -/* --------------------------- Class INCOL --------------------------- */ - -/***********************************************************************/ -/* Add a column in the column list. */ -/***********************************************************************/ -void INCOL::AddCol(PGLOBAL g, PCOL colp, char *jp) -{ - char *p; - PKC kp, kcp; - - if ((p = strchr(jp, '.'))) { - PINCOL icp; - - *p = 0; - - for (kp = Klist; kp; kp = kp->Next) - if (kp->Incolp && !strcmp(jp, kp->Key)) - break; - - if (!kp) { - icp = new(g) INCOL; - kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); - kcp->Next = NULL; - kcp->Incolp = icp; - kcp->Colp = NULL; - kcp->Key = PlugDup(g, jp); - - if (Klist) { - for (kp = Klist; kp->Next; kp = kp->Next); - - kp->Next = kcp; - } else - Klist = kcp; - - } else - icp = kp->Incolp; - - *p = '.'; - icp->AddCol(g, colp, p + 1); - } else { - kcp = (PKC)PlugSubAlloc(g, NULL, sizeof(KEYCOL)); - - kcp->Next = NULL; - kcp->Incolp = NULL; - kcp->Colp = colp; - kcp->Key = jp; - - if (Klist) { - for (kp = Klist; kp->Next; kp = kp->Next); - - kp->Next = kcp; - } else - Klist = kcp; - - } // endif jp - -} // end of AddCol - /* --------------------------- Class TDBMGO -------------------------- */ /***********************************************************************/ /* Implementation of the TDBMGO class. */ /***********************************************************************/ -TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp) +TDBMGO::TDBMGO(MGODEF *tdp) : TDBEXT(tdp) { - G = NULL; - Uri = NULL; - Pool = NULL; - Client = NULL; - Database = NULL; - Collection = NULL; - Cursor = NULL; - Query = NULL; - Opts = NULL; - Fpc = NULL; + Cmgp = NULL; Cnd = NULL; + Pcg.Tdbp = this; if (tdp) { - Uristr = tdp->Uri; - Db_name = tdp->Tabschema; - Coll_name = tdp->Tabname; - Options = tdp->Colist; - Filter = tdp->Filter; + Pcg.Uristr = tdp->Uri; + Pcg.Db_name = tdp->Tabschema; + Pcg.Coll_name = tdp->Tabname; + Pcg.Options = tdp->Colist; + Pcg.Filter = tdp->Filter; + Pcg.Pipe = tdp->Pipe && Options != NULL; B = tdp->Base ? 1 : 0; - Pipe = tdp->Pipe && Options != NULL; } else { - Uristr = NULL; - Db_name = NULL; - Coll_name = NULL; - Options = NULL; - Filter = NULL; + Pcg.Uristr = NULL; + Pcg.Db_name = NULL; + Pcg.Coll_name = NULL; + Pcg.Options = NULL; + Pcg.Filter = NULL; + Pcg.Pipe = false; B = 0; - Pipe = false; } // endif tdp Fpos = -1; @@ -511,27 +406,13 @@ TDBMGO::TDBMGO(PMGODEF tdp) : TDBEXT(tdp) TDBMGO::TDBMGO(TDBMGO *tdbp) : TDBEXT(tdbp) { - G = tdbp->G; - Uri = tdbp->Uri; - Pool = tdbp->Pool; - Client = tdbp->Client; - Database = NULL; - Collection = tdbp->Collection; - Cursor = tdbp->Cursor; - Query = tdbp->Query; - Opts = tdbp->Opts; - Fpc = tdbp->Fpc; + Cmgp = tdbp->Cmgp; Cnd = tdbp->Cnd; - Uristr = tdbp->Uristr; - Db_name = tdbp->Db_name;; - Coll_name = tdbp->Coll_name; - Options = tdbp->Options; - Filter = tdbp->Filter; + Pcg = tdbp->Pcg; B = tdbp->B; Fpos = tdbp->Fpos; N = tdbp->N; Done = tdbp->Done; - Pipe = tdbp->Pipe; } // end of TDBMGO copy constructor // Used for update @@ -559,9 +440,7 @@ PCOL TDBMGO::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n) { PMGOCOL colp = new(g) MGOCOL(g, cdp, this, cprec, n); - colp->Mbuf = (char*)PlugSubAlloc(g, NULL, colp->Long + 1); return colp; - //return (colp->ParseJpath(g)) ? NULL : colp; } // end of MakeCol /***********************************************************************/ @@ -578,60 +457,6 @@ PCOL TDBMGO::InsertSpecialColumn(PCOL colp) } // end of InsertSpecialColumn /***********************************************************************/ -/* MONGO Cardinality: returns table size in number of rows. */ -/***********************************************************************/ -int TDBMGO::Cardinality(PGLOBAL g) -{ - if (!g) - return 1; - else if (Cardinal < 0) - if (!Init(g)) { - bson_t *query; - const char *jf = NULL; - - if (Pipe) - return 10; - else if (Filter) - jf = Filter; - - if (jf) { - query = bson_new_from_json((const uint8_t *)jf, -1, &Error); - - if (!query) { - htrc("Wrong filter: %s", Error.message); - return 10; - } // endif Query - - } else - query = bson_new(); - - Cardinal = (int)mongoc_collection_count(Collection, - MONGOC_QUERY_NONE, query, 0, 0, NULL, &Error); - - if (Cardinal < 0) { - htrc("Collection count: %s", Error.message); - Cardinal = 10; - } // endif Cardinal - - bson_destroy(query); - } else - return 10; - - return Cardinal; -} // end of Cardinality - -/***********************************************************************/ -/* MONGO GetMaxSize: returns collection size estimate. */ -/***********************************************************************/ -int TDBMGO::GetMaxSize(PGLOBAL g) -{ - if (MaxSize < 0) - MaxSize = Cardinality(g); - - return MaxSize; -} // end of GetMaxSize - -/***********************************************************************/ /* Init: initialize MongoDB processing. */ /***********************************************************************/ bool TDBMGO::Init(PGLOBAL g) @@ -639,257 +464,44 @@ bool TDBMGO::Init(PGLOBAL g) if (Done) return false; - G = g; - - Uri = mongoc_uri_new(Uristr); - - if (!Uri) { - sprintf(g->Message, "Failed to parse URI: \"%s\"", Uristr); - return true; - } // endif Uri - - // Create a new client pool instance - Pool = mongoc_client_pool_new(Uri); - mongoc_client_pool_set_error_api(Pool, 2); - - // Register the application name so we can track it in the profile logs - // on the server. This can also be done from the URI. - mongoc_client_pool_set_appname(Pool, "Connect"); - - // Create a new client instance - Client = mongoc_client_pool_pop(Pool); - //Client = mongoc_client_new(uristr); - - if (!Client) { - sprintf(g->Message, "Failed to get Client"); - return true; - } // endif Client - - //mongoc_client_set_error_api(Client, 2); - - // Register the application name so we can track it in the profile logs - // on the server. This can also be done from the URI. - //mongoc_client_set_appname(Client, "Connect"); - - // Get a handle on the database Db_name and collection Coll_name - // Database = mongoc_client_get_database(Client, Db_name); - // Collection = mongoc_database_get_collection(Database, Coll_name); - Collection = mongoc_client_get_collection(Client, Db_name, Coll_name); + /*********************************************************************/ + /* Open an C connection for this table. */ + /*********************************************************************/ + if (!Cmgp) + Cmgp = new(g) CMgoConn(g, &Pcg); + else if (Cmgp->IsConnected()) + Cmgp->Close(); - if (!Collection) { - sprintf(g->Message, "Failed to get Collection %s.%s", Db_name, Coll_name); + if (Cmgp->Connect(g)) return true; - } // endif Collection Done = true; return false; } // end of Init /***********************************************************************/ -/* On update the filter can be made by Cond_Push after MakeCursor. */ +/* MONGO Cardinality: returns table size in number of rows. */ /***********************************************************************/ -void TDBMGO::SetFilter(PFIL fp) +int TDBMGO::Cardinality(PGLOBAL g) { - To_Filter = fp; - - if (fp && Cursor && Cnd != Cond) { - mongoc_cursor_t *cursor = MakeCursor(G); - - if (cursor) { - mongoc_cursor_destroy(Cursor); - Cursor = cursor; - } else - htrc("SetFilter: %s\n", G->Message); - - } // endif Cursor + if (!g) + return 1; + else if (Cardinal < 0) + Cardinal = (!Init(g)) ? Cmgp->CollSize(g) : 0; -} // end of SetFilter + return Cardinal; +} // end of Cardinality /***********************************************************************/ -/* OpenDB: Data Base open routine for MONGO access method. */ +/* MONGO GetMaxSize: returns collection size estimate. */ /***********************************************************************/ -mongoc_cursor_t *TDBMGO::MakeCursor(PGLOBAL g) +int TDBMGO::GetMaxSize(PGLOBAL g) { - const char *p; - bool b = false, id = (Mode != MODE_READ), all = false; - mongoc_cursor_t *cursor; - PCOL cp; - PSTRG s = NULL; - - if (Options && !stricmp(Options, "all")) { - Options = NULL; - all = true; - } // endif Options - - for (cp = Columns; cp; cp = cp->GetNext()) - if (!strcmp(cp->GetName(), "_id")) - id = true; - else if (cp->GetFmt() && !strcmp(cp->GetFmt(), "*") && !Options) - all = true; - - if (Pipe) { - if (trace) - htrc("Pipeline: %s\n", Options); - - p = strrchr(Options, ']'); - - if (!p) { - strcpy(g->Message, "Missing ] in pipeline"); - return NULL; - } else - *(char*)p = 0; - - s = new(g) STRING(g, 1023, (PSZ)Options); - - if (To_Filter) { - s->Append(",{\"$match\":"); - - if (To_Filter->MakeSelector(g, s, true)) { - strcpy(g->Message, "Failed making selector"); - return NULL; - } else - s->Append('}'); - - To_Filter = NULL; // Not needed anymore - } // endif To_Filter - - if (!all) { - // Project list - if (Columns) { - s->Append(",{\"$project\":{\""); - - if (!id) - s->Append("_id\":0,\""); - - for (cp = Columns; cp; cp = cp->GetNext()) { - if (b) - s->Append(",\""); - else - b = true; - - s->Append(((PMGOCOL)cp)->GetProjPath(g)); - s->Append("\":1"); - } // endfor cp - - s->Append("}}"); - } else - s->Append(",{\"$project\":{\"_id\":1}}"); - - } // endif all - - s->Append("]}"); - s->Resize(s->GetLength() + 1); - p = s->GetStr(); - - if (trace) - htrc("New Pipeline: %s\n", p); - - Query = bson_new_from_json((const uint8_t *)p, -1, &Error); - - if (!Query) { - sprintf(g->Message, "Wrong pipeline: %s", Error.message); - return NULL; - } // endif Query - - cursor = mongoc_collection_aggregate(Collection, MONGOC_QUERY_NONE, - Query, NULL, NULL); - - if (mongoc_cursor_error(cursor, &Error)) { - sprintf(g->Message, "Mongo aggregate Failure: %s", Error.message); - return NULL; - } // endif cursor - - } else { - if (Filter || To_Filter) { - if (trace) { - if (Filter) - htrc("Filter: %s\n", Filter); - - if (To_Filter) { - char buf[512]; - - To_Filter->Prints(g, buf, 511); - htrc("To_Filter: %s\n", buf); - } // endif To_Filter - - } // endif trace - - s = new(g) STRING(g, 1023, (PSZ)Filter); - - if (To_Filter) { - if (Filter) - s->Append(','); - - if (To_Filter->MakeSelector(g, s, true)) { - strcpy(g->Message, "Failed making selector"); - return NULL; - } // endif Selector - - To_Filter = NULL; // Not needed anymore - } // endif To_Filter - - if (trace) - htrc("selector: %s\n", s->GetStr()); - - s->Resize(s->GetLength() + 1); - Query = bson_new_from_json((const uint8_t *)s->GetStr(), -1, &Error); - - if (!Query) { - sprintf(g->Message, "Wrong filter: %s", Error.message); - return NULL; - } // endif Query - - } else - Query = bson_new(); - - if (!all) { - if (Options && *Options) { - if (trace) - htrc("options=%s\n", Options); - - p = Options; - } else if (Columns) { - // Projection list - if (s) - s->Set("{\"projection\":{\""); - else - s = new(g) STRING(g, 511, "{\"projection\":{\""); - - if (!id) - s->Append("_id\":0,\""); - - for (cp = Columns; cp; cp = cp->GetNext()) { - if (b) - s->Append(",\""); - else - b = true; - - s->Append(((PMGOCOL)cp)->GetProjPath(g)); - s->Append("\":1"); - } // endfor cp - - s->Append("}}"); - s->Resize(s->GetLength() + 1); - p = s->GetStr(); - } else { - // count(*) ? - p = "{\"projection\":{\"_id\":1}}"; - } // endif Options - - Opts = bson_new_from_json((const uint8_t *)p, -1, &Error); - - if (!Opts) { - sprintf(g->Message, "Wrong options: %s", Error.message); - return NULL; - } // endif Opts - - } // endif all - - cursor = mongoc_collection_find_with_opts(Collection, Query, Opts, NULL); - } // endif Pipe + if (MaxSize < 0) + MaxSize = Cardinality(g); - return cursor; -} // end of MakeCursor + return MaxSize; +} // end of GetMaxSize /***********************************************************************/ /* OpenDB: Data Base open routine for MONGO access method. */ @@ -900,10 +512,7 @@ bool TDBMGO::OpenDB(PGLOBAL g) /*******************************************************************/ /* Table already open replace it at its beginning. */ /*******************************************************************/ - mongoc_cursor_t *cursor = mongoc_cursor_clone(Cursor); - - mongoc_cursor_destroy(Cursor); - Cursor = cursor; + Cmgp->Rewind(); Fpos = -1; return false; } // endif Use @@ -911,7 +520,7 @@ bool TDBMGO::OpenDB(PGLOBAL g) /*********************************************************************/ /* First opening. */ /*********************************************************************/ - if (Pipe && Mode != MODE_READ) { + if (Pcg.Pipe && Mode != MODE_READ) { strcpy(g->Message, "Pipeline tables are read only"); return true; } // endif Pipe @@ -919,20 +528,11 @@ bool TDBMGO::OpenDB(PGLOBAL g) if (Init(g)) return true; - if (Mode == MODE_DELETE && !Next) { + if (Mode == MODE_DELETE && !Next) // Delete all documents - Query = bson_new(); - - if (!mongoc_collection_remove(Collection, MONGOC_REMOVE_NONE, - Query, NULL, &Error)) { - sprintf(g->Message, "Mongo remove all: %s", Error.message); - return true; - } // endif remove - - } else if (Mode == MODE_INSERT) - MakeColumnGroups(g); - else if (!(Cursor = MakeCursor(g))) - return true; + return Cmgp->DocDelete(g); + else if (Mode == MODE_INSERT) + Cmgp->MakeColumnGroups(g); return false; } // end of OpenDB @@ -951,211 +551,19 @@ bool TDBMGO::ReadKey(PGLOBAL g, OPVAL op, const key_range *kr) /***********************************************************************/ int TDBMGO::ReadDB(PGLOBAL g) { - int rc = RC_OK; - - if (mongoc_cursor_next(Cursor, &Document)) { - - if (trace > 1) { - bson_iter_t iter; - ShowDocument(&iter, Document, ""); - } else if (trace == 1) { - char *str = bson_as_json(Document, NULL); - htrc("%s\n", str); - bson_free(str); - } // endif trace - - } else if (mongoc_cursor_error(Cursor, &Error)) { - sprintf(g->Message, "Mongo Cursor Failure: %s", Error.message); - rc = RC_FX; - } else { - //mongoc_cursor_destroy(Cursor); - rc = RC_EF; - } // endif's Cursor - - return rc; + return Cmgp->ReadNext(g); } // end of ReadDB /***********************************************************************/ -/* Use to trace restaurants document contains. */ -/***********************************************************************/ -void TDBMGO::ShowDocument(bson_iter_t *iter, const bson_t *doc, const char *k) -{ - if (!doc || bson_iter_init(iter, doc)) { - const char *key; - - while (bson_iter_next(iter)) { - key = bson_iter_key(iter); - htrc("Found element key: \"%s\"\n", key); - - if (BSON_ITER_HOLDS_UTF8(iter)) - htrc("%s.%s=\"%s\"\n", k, key, bson_iter_utf8(iter, NULL)); - else if (BSON_ITER_HOLDS_INT32(iter)) - htrc("%s.%s=%d\n", k, key, bson_iter_int32(iter)); - else if (BSON_ITER_HOLDS_INT64(iter)) - htrc("%s.%s=%lld\n", k, key, bson_iter_int64(iter)); - else if (BSON_ITER_HOLDS_DOUBLE(iter)) - htrc("%s.%s=%g\n", k, key, bson_iter_double(iter)); - else if (BSON_ITER_HOLDS_DATE_TIME(iter)) - htrc("%s.%s=date(%lld)\n", k, key, bson_iter_date_time(iter)); - else if (BSON_ITER_HOLDS_OID(iter)) { - char str[25]; - - bson_oid_to_string(bson_iter_oid(iter), str); - htrc("%s.%s=%s\n", k, key, str); - } else if (BSON_ITER_HOLDS_DECIMAL128(iter)) { - char *str = NULL; - bson_decimal128_t dec; - - bson_iter_decimal128(iter, &dec); - bson_decimal128_to_string(&dec, str); - htrc("%s.%s=%s\n", k, key, str); - } else if (BSON_ITER_HOLDS_DOCUMENT(iter)) { - bson_iter_t child; - - if (bson_iter_recurse(iter, &child)) - ShowDocument(&child, NULL, key); - - } else if (BSON_ITER_HOLDS_ARRAY(iter)) { - bson_t *arr; - bson_iter_t itar; - const uint8_t *data = NULL; - uint32_t len = 0; - - bson_iter_array(iter, &len, &data); - arr = bson_new_from_data(data, len); - ShowDocument(&itar, arr, key); - } // endif's - - } // endwhile bson_iter_next - - } // endif bson_iter_init - -} // end of ShowDocument - -/***********************************************************************/ -/* Group columns for inserting or updating. */ -/***********************************************************************/ -void TDBMGO::MakeColumnGroups(PGLOBAL g) -{ - Fpc = new(g) INCOL; - - for (PCOL colp = Columns; colp; colp = colp->GetNext()) - if (!colp->IsSpecial()) - Fpc->AddCol(g, colp, ((PMGOCOL)colp)->Jpath); - -} // end of MakeColumnGroups - -/***********************************************************************/ -/* DocWrite. */ -/***********************************************************************/ -bool TDBMGO::DocWrite(PGLOBAL g, PINCOL icp) -{ - for (PKC kp = icp->Klist; kp; kp = kp->Next) - if (kp->Incolp) { - BSON_APPEND_DOCUMENT_BEGIN(&icp->Child, kp->Key, &kp->Incolp->Child); - - if (DocWrite(g, kp->Incolp)) - return true; - - bson_append_document_end(&icp->Child, &kp->Incolp->Child); - } else if (((PMGOCOL)kp->Colp)->AddValue(g, &icp->Child, kp->Key, false)) - return true; - - return false; -} // end of DocWrite - -/***********************************************************************/ -/* WriteDB: Data Base write routine for DOS access method. */ +/* WriteDB: Data Base write routine for MGO access method. */ /***********************************************************************/ int TDBMGO::WriteDB(PGLOBAL g) { - int rc = RC_OK; - - if (Mode == MODE_INSERT) { - bson_init(&Fpc->Child); - - if (DocWrite(g, Fpc)) - return RC_FX; - - if (trace) { - char *str = bson_as_json(&Fpc->Child, NULL); - htrc("Inserting: %s\n", str); - bson_free(str); - } // endif trace - - if (!mongoc_collection_insert(Collection, MONGOC_INSERT_NONE, - &Fpc->Child, NULL, &Error)) { - sprintf(g->Message, "Mongo insert: %s", Error.message); - rc = RC_FX; - } // endif insert - - } else { - bool b = false; - bson_iter_t iter; - bson_t *query = bson_new(); - - bson_iter_init(&iter, Document); - - if (bson_iter_find(&iter, "_id")) { - if (BSON_ITER_HOLDS_OID(&iter)) - b = BSON_APPEND_OID(query, "_id", bson_iter_oid(&iter)); - else if (BSON_ITER_HOLDS_INT32(&iter)) - b = BSON_APPEND_INT32(query, "_id", bson_iter_int32(&iter)); - else if (BSON_ITER_HOLDS_INT64(&iter)) - b = BSON_APPEND_INT64(query, "_id", bson_iter_int64(&iter)); - else if (BSON_ITER_HOLDS_DOUBLE(&iter)) - b = BSON_APPEND_DOUBLE(query, "_id", bson_iter_double(&iter)); - else if (BSON_ITER_HOLDS_UTF8(&iter)) - b = BSON_APPEND_UTF8(query, "_id", bson_iter_utf8(&iter, NULL)); - - } // endif iter - - if (b) { - if (trace) { - char *str = bson_as_json(query, NULL); - htrc("update query: %s\n", str); - bson_free(str); - } // endif trace - - if (Mode == MODE_UPDATE) { - bson_t child; - bson_t *update = bson_new(); - - BSON_APPEND_DOCUMENT_BEGIN(update, "$set", &child); - - for (PCOL colp = To_SetCols; colp; colp = colp->GetNext()) - if (((PMGOCOL)colp)->AddValue(g, &child, ((PMGOCOL)colp)->Jpath, true)) - rc = RC_FX; - - bson_append_document_end(update, &child); - - if (rc == RC_OK) - if (!mongoc_collection_update(Collection, MONGOC_UPDATE_NONE, - query, update, NULL, &Error)) { - sprintf(g->Message, "Mongo update: %s", Error.message); - rc = RC_FX; - } // endif update - - bson_destroy(update); - } else if (!mongoc_collection_remove(Collection, - MONGOC_REMOVE_SINGLE_REMOVE, query, NULL, &Error)) { - sprintf(g->Message, "Mongo delete: %s", Error.message); - rc = RC_FX; - } // endif remove - - } else { - strcpy(g->Message, "Mongo update: cannot find _id"); - rc = RC_FX; - } // endif b - - bson_destroy(query); - } // endif Mode - - return rc; + return Cmgp->Write(g); } // end of WriteDB /***********************************************************************/ -/* Data Base delete line routine for ODBC access method. */ +/* Data Base delete line routine for MGO access method. */ /***********************************************************************/ int TDBMGO::DeleteDB(PGLOBAL g, int irc) { @@ -1167,15 +575,7 @@ int TDBMGO::DeleteDB(PGLOBAL g, int irc) /***********************************************************************/ void TDBMGO::CloseDB(PGLOBAL g) { - if (Query) bson_destroy(Query); - if (Opts) bson_destroy(Opts); - if (Cursor) mongoc_cursor_destroy(Cursor); - if (Collection) mongoc_collection_destroy(Collection); - // mongoc_database_destroy(Database); - // mongoc_client_destroy(Client); - if (Client) mongoc_client_pool_push(Pool, Client); - if (Pool) mongoc_client_pool_destroy(Pool); - if (Uri) mongoc_uri_destroy(Uri); + Cmgp->Close(); Done = false; } // end of CloseDB @@ -1189,7 +589,6 @@ MGOCOL::MGOCOL(PGLOBAL g, PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i) { Tmgp = (PTDBMGO)(tdbp->GetOrig() ? tdbp->GetOrig() : tdbp); Jpath = cdp->GetFmt() ? cdp->GetFmt() : cdp->GetName(); - Mbuf = NULL; } // end of MGOCOL constructor /***********************************************************************/ @@ -1200,173 +599,49 @@ MGOCOL::MGOCOL(MGOCOL *col1, PTDB tdbp) : EXTCOL(col1, tdbp) { Tmgp = col1->Tmgp; Jpath = col1->Jpath; - Mbuf = col1->Mbuf; } // end of MGOCOL copy constructor /***********************************************************************/ -/* Get projection path. */ +/* Get path when proj is false or projection path when proj is true. */ /***********************************************************************/ -char *MGOCOL::GetProjPath(PGLOBAL g) +PSZ MGOCOL::GetJpath(PGLOBAL g, bool proj) { if (Jpath) { - char *p1, *p2, *projpath = PlugDup(g, Jpath); - int i = 0; - - for (p1 = p2 = projpath; *p1; p1++) - if (*p1 == '.') { - if (!i) - *p2++ = *p1; - - i = 1; - } else if (i) { - if (!isdigit(*p1)) { + if (proj) { + char *p1, *p2, *projpath = PlugDup(g, Jpath); + int i = 0; + + for (p1 = p2 = projpath; *p1; p1++) + if (*p1 == '.') { + if (!i) + *p2++ = *p1; + + i = 1; + } else if (i) { + if (!isdigit(*p1)) { + *p2++ = *p1; + i = 0; + } // endif p1 + + } else *p2++ = *p1; - i = 0; - } // endif p1 - } else - *p2++ = *p1; + *p2 = 0; + return projpath; + } else + return Jpath; - *p2 = 0; - return projpath; } else - return NULL; + return Name; -} // end of GetProjPath - -/***********************************************************************/ -/* Mini: used to suppress blanks to json strings. */ -/***********************************************************************/ -char *MGOCOL::Mini(PGLOBAL g, const bson_t *bson, bool b) -{ - char *s, *str = NULL; - int i, k = 0; - bool ok = true; - - if (b) - s = str = bson_array_as_json(bson, NULL); - else - s = str = bson_as_json(bson, NULL); - - for (i = 0; i < Long && s[i]; i++) { - switch (s[i]) { - case ' ': - if (ok) continue; - case '"': - ok = !ok; - default: - break; - } // endswitch s[i] - - Mbuf[k++] = s[i]; - } // endfor i - - bson_free(str); - - if (i >= Long) { - sprintf(g->Message, "Value too long for column %s", Name); - throw (int)TYPE_AM_MGO; - } // endif i - - Mbuf[k] = 0; - return Mbuf; -} // end of Mini +} // end of GetJpath /***********************************************************************/ /* ReadColumn: */ /***********************************************************************/ void MGOCOL::ReadColumn(PGLOBAL g) { - - if (!strcmp(Jpath, "*")) { - Value->SetValue_psz(Mini(g, Tmgp->Document, false)); - } else if (bson_iter_init(&Iter, Tmgp->Document) && - bson_iter_find_descendant(&Iter, Jpath, &Desc)) { - if (BSON_ITER_HOLDS_UTF8(&Desc)) - Value->SetValue_psz((PSZ)bson_iter_utf8(&Desc, NULL)); - else if (BSON_ITER_HOLDS_INT32(&Desc)) - Value->SetValue(bson_iter_int32(&Desc)); - else if (BSON_ITER_HOLDS_INT64(&Desc)) - Value->SetValue(bson_iter_int64(&Desc)); - else if (BSON_ITER_HOLDS_DOUBLE(&Desc)) - Value->SetValue(bson_iter_double(&Desc)); - else if (BSON_ITER_HOLDS_DATE_TIME(&Desc)) - Value->SetValue(bson_iter_date_time(&Desc) / 1000); - else if (BSON_ITER_HOLDS_BOOL(&Desc)) { - bool b = bson_iter_bool(&Desc); - - if (Value->IsTypeNum()) - Value->SetValue(b ? 1 : 0); - else - Value->SetValue_psz(b ? "true" : "false"); - - } else if (BSON_ITER_HOLDS_OID(&Desc)) { - char str[25]; - - bson_oid_to_string(bson_iter_oid(&Desc), str); - Value->SetValue_psz(str); - } else if (BSON_ITER_HOLDS_DECIMAL128(&Desc)) { - char *str = NULL; - bson_decimal128_t dec; - - bson_iter_decimal128(&Desc, &dec); - bson_decimal128_to_string(&dec, str); - Value->SetValue_psz(str); - bson_free(str); - } else if (BSON_ITER_HOLDS_DOCUMENT(&Iter)) { - bson_t *doc; - const uint8_t *data = NULL; - uint32_t len = 0; - - bson_iter_document(&Desc, &len, &data); - - if (data) { - doc = bson_new_from_data(data, len); - Value->SetValue_psz(Mini(g, doc, false)); - bson_destroy(doc); - } else - Value->Reset(); - - } else if (BSON_ITER_HOLDS_ARRAY(&Iter)) { - bson_t *arr; - const uint8_t *data = NULL; - uint32_t len = 0; - - bson_iter_array(&Desc, &len, &data); - - if (data) { - arr = bson_new_from_data(data, len); - Value->SetValue_psz(Mini(g, arr, true)); - bson_destroy(arr); - } else { - // This is a bug in returning the wrong type - // This fix is only for document items - bson_t *doc; - - bson_iter_document(&Desc, &len, &data); - - if (data) { - doc = bson_new_from_data(data, len); - Value->SetValue_psz(Mini(g, doc, false)); - bson_destroy(doc); - } else { - //strcpy(g->Message, "bson_iter_array failed (data is null)"); - //throw (int)TYPE_AM_MGO; - Value->Reset(); - } // endif data - - } // endif data - - } else - Value->Reset(); - - } else - Value->Reset(); - - // Set null when applicable - if (Nullable) - Value->SetNull(Value->IsZero()); - + Tmgp->Cmgp->GetColumnValue(g, this); } // end of ReadColumn /***********************************************************************/ @@ -1380,59 +655,6 @@ void MGOCOL::WriteColumn(PGLOBAL g) } // end of WriteColumn -/***********************************************************************/ -/* AddValue: Add column value to the document to insert or update. */ -/***********************************************************************/ -bool MGOCOL::AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd) -{ - bool rc = false; - - if (Value->IsNull()) { - if (upd) - rc = BSON_APPEND_NULL(doc, key); - else - return false; - - } else switch (Buf_Type) { - case TYPE_STRING: - rc = BSON_APPEND_UTF8(doc, key, Value->GetCharValue()); - break; - case TYPE_INT: - case TYPE_SHORT: - rc = BSON_APPEND_INT32(doc, key, Value->GetIntValue()); - break; - case TYPE_TINY: - rc = BSON_APPEND_BOOL(doc, key, Value->GetIntValue()); - break; - case TYPE_BIGINT: - rc = BSON_APPEND_INT64(doc, key, Value->GetBigintValue()); - break; - case TYPE_DOUBLE: - rc = BSON_APPEND_DOUBLE(doc, key, Value->GetFloatValue()); - break; - case TYPE_DECIM: - {bson_decimal128_t dec; - - if (bson_decimal128_from_string(Value->GetCharValue(), &dec)) - rc = BSON_APPEND_DECIMAL128(doc, key, &dec); - - } break; - case TYPE_DATE: - rc = BSON_APPEND_DATE_TIME(doc, key, Value->GetBigintValue() * 1000); - break; - default: - sprintf(g->Message, "Type %d not supported yet", Buf_Type); - return true; - } // endswitch Buf_Type - - if (!rc) { - strcpy(g->Message, "Adding value failed"); - return true; - } else - return false; - -} // end of AddValue - /* ---------------------------TDBGOL class --------------------------- */ /***********************************************************************/ @@ -1441,7 +663,8 @@ bool MGOCOL::AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd) TDBGOL::TDBGOL(PMGODEF tdp) : TDBCAT(tdp) { Topt = tdp->GetTopt(); - Db = (char*)tdp->GetTabschema(); + Uri = tdp->Uri; + Db = tdp->GetTabschema(); } // end of TDBJCL constructor /***********************************************************************/ @@ -1449,7 +672,7 @@ TDBGOL::TDBGOL(PMGODEF tdp) : TDBCAT(tdp) /***********************************************************************/ PQRYRES TDBGOL::GetResult(PGLOBAL g) { - return MGOColumns(g, Db, Topt, false); + return MGOColumns(g, Db, Uri, Topt, false); } // end of GetResult /* -------------------------- End of mongo --------------------------- */ diff --git a/storage/connect/tabmgo.h b/storage/connect/tabmgo.h index 66676794e47..9a4e537eadf 100644 --- a/storage/connect/tabmgo.h +++ b/storage/connect/tabmgo.h @@ -1,43 +1,12 @@ /**************** tabmgo H Declares Source Code File (.H) **************/ -/* Name: tabmgo.h Version 1.0 */ +/* Name: tabmgo.h Version 1.1 */ /* */ /* (C) Copyright to the author Olivier BERTRAND 2017 */ /* */ /* This file contains the MongoDB classes declares. */ /***********************************************************************/ -#include "osutil.h" -#include "block.h" -#include "colblk.h" - -/***********************************************************************/ -/* Include MongoDB library header files. */ -/***********************************************************************/ -#include <bson.h> -#include <bcon.h> -#include <mongoc.h> - -typedef class MGODEF *PMGODEF; -typedef class TDBMGO *PTDBMGO; -typedef class MGOCOL *PMGOCOL; -typedef class INCOL *PINCOL; - -typedef struct _bncol { - struct _bncol *Next; - char *Name; - char *Fmt; - int Type; - int Len; - int Scale; - bool Cbn; - bool Found; -} BCOL, *PBCOL; - -typedef struct KEYCOL { - KEYCOL *Next; - PINCOL Incolp; - PCOL Colp; - char *Key; -} *PKC; +#include "mongo.h" +#include "cmgoconn.h" /***********************************************************************/ /* Class used to get the columns of a mongo collection. */ @@ -48,7 +17,7 @@ public: MGODISC(PGLOBAL g, int *lg); // Functions - int GetColumns(PGLOBAL g, char *db, PTOS topt); + int GetColumns(PGLOBAL g, PCSZ db, PCSZ uri, PTOS topt); bool FindInDoc(PGLOBAL g, bson_iter_t *iter, const bson_t *doc, char *pcn, char *pfmt, int i, int k, bool b); @@ -62,52 +31,7 @@ public: bool all; }; // end of MGODISC -/***********************************************************************/ -/* MongoDB table. */ -/***********************************************************************/ -class DllExport MGODEF : public EXTDEF { /* Table description */ - friend class TDBMGO; - friend class MGOFAM; - friend class MGODISC; - friend PQRYRES MGOColumns(PGLOBAL, char *, PTOS, bool); -public: - // Constructor - MGODEF(void); - - // Implementation - virtual const char *GetType(void) { return "MONGO"; } - - // Methods - virtual bool DefineAM(PGLOBAL g, LPCSTR am, int poff); - virtual PTDB GetTable(PGLOBAL g, MODE m); - -protected: - // Members - PCSZ Uri; /* MongoDB connection URI */ - PCSZ Colist; /* Options list */ - PCSZ Filter; /* Filtering query */ - int Level; /* Used for catalog table */ - int Base; /* The array index base */ - bool Pipe; /* True is Colist is a pipeline */ -}; // end of MGODEF - -/* ------------------------- TDBMGO classes -------------------------- */ - -/***********************************************************************/ -/* Used when inserting values in a MongoDB collection. */ -/***********************************************************************/ -class INCOL : public BLOCK { -public: - // Constructor - INCOL(void) { Klist = NULL; } - - // Methods - void AddCol(PGLOBAL g, PCOL colp, char *jp); - - //Members - bson_t Child; - PKC Klist; -}; // end of INCOL; +/* -------------------------- TDBMGO class --------------------------- */ /***********************************************************************/ /* This is the MongoDB Table Type class declaration. */ @@ -117,10 +41,10 @@ class DllExport TDBMGO : public TDBEXT { friend class MGOCOL; friend class MGODEF; friend class MGODISC; - friend PQRYRES MGOColumns(PGLOBAL, char *, PTOS, bool); + friend PQRYRES MGOColumns(PGLOBAL, PCSZ, PCSZ, PTOS, bool); public: // Constructor - TDBMGO(PMGODEF tdp); + TDBMGO(MGODEF *tdp); TDBMGO(TDBMGO *tdbp); // Implementation @@ -131,7 +55,6 @@ public: virtual PTDB Clone(PTABS t); virtual PCOL MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n); virtual PCOL InsertSpecialColumn(PCOL colp); - virtual void SetFilter(PFIL fp); virtual int RowNumber(PGLOBAL g, bool b = FALSE) {return N;} // Database routines @@ -146,35 +69,15 @@ public: protected: bool Init(PGLOBAL g); - mongoc_cursor_t *MakeCursor(PGLOBAL g); - void ShowDocument(bson_iter_t *i, const bson_t *b, const char *k); - void MakeColumnGroups(PGLOBAL g); - bool DocWrite(PGLOBAL g, PINCOL icp); // Members - PGLOBAL G; // Needed by SetFilter - mongoc_uri_t *Uri; - mongoc_client_pool_t *Pool; // Thread safe client pool - mongoc_client_t *Client; // The MongoDB client - mongoc_database_t *Database; // The MongoDB database - mongoc_collection_t *Collection; // The MongoDB collection - mongoc_cursor_t *Cursor; - const bson_t *Document; - bson_t *Query; // MongoDB cursor filter - bson_t *Opts; // MongoDB cursor options - bson_error_t Error; - PINCOL Fpc; // To insert INCOL classes + CMgoConn *Cmgp; // Points to a C Mongo connection class + CMGOPARM Pcg; // Parms passed to Cmgp const Item *Cnd; // The first condition - const char *Uristr; - const char *Db_name; - const char *Coll_name; - const char *Options; // The MongoDB options - const char *Filter; // The filtering query int Fpos; // The current row index int N; // The current Rownum int B; // Array index base bool Done; // Init done - bool Pipe; // True for pipeline }; // end of class TDBMGO /* --------------------------- MGOCOL class -------------------------- */ @@ -191,26 +94,20 @@ public: MGOCOL(MGOCOL *colp, PTDB tdbp); // Constructor used in copy process // Implementation - virtual int GetAmType(void) { return Tmgp->GetAmType(); } + virtual int GetAmType(void) { return Tmgp->GetAmType(); } // Methods -//virtual bool SetBuffer(PGLOBAL g, PVAL value, bool ok, bool check); - virtual void ReadColumn(PGLOBAL g); - virtual void WriteColumn(PGLOBAL g); - bool AddValue(PGLOBAL g, bson_t *doc, char *key, bool upd); + virtual PSZ GetJpath(PGLOBAL g, bool proj); + virtual void ReadColumn(PGLOBAL g); + virtual void WriteColumn(PGLOBAL g); protected: // Default constructor not to be used MGOCOL(void) {} - char *GetProjPath(PGLOBAL g); - char *Mini(PGLOBAL g, const bson_t *bson, bool b); // Members TDBMGO *Tmgp; // To the MGO table block - bson_iter_t Iter; // Used to retrieve column value - bson_iter_t Desc; // Descendant iter char *Jpath; // The json path - char *Mbuf; // The Mini buffer }; // end of class MGOCOL /***********************************************************************/ @@ -226,6 +123,7 @@ protected: virtual PQRYRES GetResult(PGLOBAL g); // Members - PTOS Topt; - char *Db; + PTOS Topt; + PCSZ Uri; + PCSZ Db; }; // end of class TDBGOL diff --git a/storage/connect/user_connect.cc b/storage/connect/user_connect.cc index ca3557666a4..32119c34900 100644 --- a/storage/connect/user_connect.cc +++ b/storage/connect/user_connect.cc @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2015 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,7 @@ that is a connection with its personnal memory allocation. @note - + Author Olivier Bertrand */ /****************************************************************************/ @@ -144,9 +144,9 @@ void user_connect::SetHandler(ha_connect *hc) /****************************************************************************/ /* Check whether we begin a new query and if so cleanup the previous one. */ /****************************************************************************/ -bool user_connect::CheckCleanup(void) +bool user_connect::CheckCleanup(bool force) { - if (thdp->query_id > last_query_id) { + if (thdp->query_id > last_query_id || force) { uint worksize= GetWorkSize(); PlugCleanup(g, true); @@ -171,7 +171,7 @@ bool user_connect::CheckCleanup(void) g->Mrr = 0; last_query_id= thdp->query_id; - if (trace) + if (trace && !force) printf("=====> Begin new query %llu\n", last_query_id); return true; diff --git a/storage/connect/user_connect.h b/storage/connect/user_connect.h index a883eb85934..983d9adc478 100644 --- a/storage/connect/user_connect.h +++ b/storage/connect/user_connect.h @@ -1,4 +1,4 @@ -/* Copyright (C) Olivier Bertrand 2004 - 2011 +/* Copyright (C) MariaDB Corporation Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -19,6 +19,7 @@ Declaration of the user_connect class. @note + Author Olivier Bertrand @see /sql/handler.h and /storage/connect/user_connect.cc @@ -53,7 +54,7 @@ public: // Implementation bool user_init(); void SetHandler(ha_connect *hc); - bool CheckCleanup(void); + bool CheckCleanup(bool force = false); bool CheckQueryID(void) {return thdp->query_id > last_query_id;} bool CheckQuery(query_id_t vid) {return last_query_id > vid;} |