diff options
Diffstat (limited to 'util/db_load.c')
-rw-r--r-- | util/db_load.c | 207 |
1 files changed, 183 insertions, 24 deletions
diff --git a/util/db_load.c b/util/db_load.c index af4c7800..aac774fc 100644 --- a/util/db_load.c +++ b/util/db_load.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -14,7 +14,7 @@ #ifndef lint static const char copyright[] = - "Copyright (c) 1996, 2012 Oracle and/or its affiliates. All rights reserved.\n"; + "Copyright (c) 1996, 2015 Oracle and/or its affiliates. All rights reserved.\n"; #endif typedef struct { /* XXX: Globals. */ @@ -25,10 +25,12 @@ typedef struct { /* XXX: Globals. */ int endodata; /* Reached the end of a database. */ int endofile; /* Reached the end of the input. */ int version; /* Input version. */ + char *blob_dir; /* Blob directory. */ char *home; /* Env home. */ char *passwd; /* Env passwd. */ int private; /* Private env. */ u_int32_t cache; /* Env cache size. */ + u_int32_t blob_threshold; /* Blob threshold. */ } LDG; int badend __P((DB_ENV *)); @@ -36,7 +38,7 @@ void badnum __P((DB_ENV *)); int configure __P((DB_ENV *, DB *, char **, char **, int *)); int convprintable __P((DB_ENV *, char *, char **)); int db_init __P((DB_ENV *, char *, u_int32_t, int *)); -int dbt_rdump __P((DB_ENV *, DBT *)); +int dbt_rdump __P((DB_ENV *, DBT *, u_int32_t, int *)); int dbt_rprint __P((DB_ENV *, DBT *)); int dbt_rrecno __P((DB_ENV *, DBT *, int)); int dbt_to_recno __P((DB_ENV *, DBT *, db_recno_t *)); @@ -44,6 +46,7 @@ int env_create __P((DB_ENV **, LDG *)); void free_keys __P((DBT *part_keys)); int load __P((DB_ENV *, char *, DBTYPE, char **, u_int, LDG *, int *)); int main __P((int, char *[])); +int putdata __P((DB *, DBC *, DB_TXN *, DBT *, DBT *, u_int32_t, int)); int rheader __P((DB_ENV *, DB *, DBTYPE *, char **, int *, int *, DBT **)); int usage __P((void)); int version_check __P((void)); @@ -56,6 +59,7 @@ const char *progname; #define LDF_NOHEADER 0x01 /* No dump header. */ #define LDF_NOOVERWRITE 0x02 /* Don't overwrite existing rows. */ #define LDF_PASSWORD 0x04 /* Encrypt created databases. */ +#define BLOB_LOADING_SIZE 1048576 /* Load blob files X bytes at a time.*/ int main(argc, argv) @@ -86,6 +90,8 @@ main(argc, argv) ldg.hdrbuf = NULL; ldg.home = NULL; ldg.passwd = NULL; + ldg.blob_dir = NULL; + ldg.blob_threshold = 0; if ((exitval = version_check()) != 0) goto done; @@ -95,6 +101,17 @@ main(argc, argv) exitval = existed = 0; dbtype = DB_UNKNOWN; + /* + * We will allocate (argc + 1) bytes memory. + * Check if (argc + 1) will introduce interger overflow error. + */ + if (argc == INT_MAX) { + fprintf(stderr, "%s: %s\n", ldg.progname, "The number of\ + arguments exceeds the maximum unsigned integer value"); + exitval = 1; + goto done; + } + /* Allocate enough room for configuration arguments. */ if ((clp = clist = (char **)calloc((size_t)argc + 1, sizeof(char *))) == NULL) { @@ -110,8 +127,11 @@ main(argc, argv) * db_load because we don't have a better place to put it, and we * don't want to create a new utility for just that functionality. */ - while ((ch = getopt(argc, argv, "c:f:h:nP:r:Tt:V")) != EOF) + while ((ch = getopt(argc, argv, "b:c:f:h:o:nP:r:Tt:V")) != EOF) switch (ch) { + case 'b': + ldg.blob_dir = optarg; + break; case 'c': if (mode != NOTSET && mode != STANDARD_LOAD) { exitval = usage(); @@ -148,6 +168,9 @@ main(argc, argv) ldf |= LDF_NOOVERWRITE; break; + case 'o': + ldg.blob_threshold = (u_int32_t)atoi(optarg); + break; case 'P': ldg.passwd = strdup(optarg); memset(optarg, 0, strlen(optarg)); @@ -303,7 +326,8 @@ load(dbenv, name, argtype, clist, flags, ldg, existedp) DB_TXN *ctxn, *txn; db_recno_t recno, datarecno; u_int32_t put_flags; - int ascii_recno, checkprint, hexkeys, keyflag, keys, resize, ret, rval; + int ascii_recno, checkprint, hexkeys; + int keyflag, keys, resize, ret, rval, streaming; char *subdb; put_flags = LF_ISSET(LDF_NOOVERWRITE) ? DB_NOOVERWRITE : 0; @@ -313,6 +337,7 @@ load(dbenv, name, argtype, clist, flags, ldg, existedp) subdb = NULL; ctxn = txn = NULL; part_keys = NULL; + streaming = 0; memset(&key, 0, sizeof(DBT)); memset(&data, 0, sizeof(DBT)); memset(&rkey, 0, sizeof(DBT)); @@ -386,13 +411,19 @@ retry_db: goto err; } - if (dbtype == DB_RECNO || dbtype == DB_QUEUE) + if (dbtype == DB_RECNO || dbtype == DB_QUEUE) { if (keyflag != 1 && argtype != DB_RECNO && argtype != DB_QUEUE) { dbenv->errx(dbenv, DB_STR("5077", "improper database type conversion specified")); goto err; } + if (ldg->blob_threshold != 0) { + dbenv->errx(dbenv, DB_STR("5142", + "Queue and recno databases cannot support blobs")); + goto err; + } + } dbtype = argtype; } @@ -506,12 +537,14 @@ key_data: if ((readp->data = malloc(readp->ulen = 1024)) == NULL) { /* Get each key/data pair and add them to the database. */ for (recno = 1; !__db_util_interrupted(); ++recno) { + streaming = 0; if (!keyflag) { if (checkprint) { if (dbt_rprint(dbenv, &data)) goto err; } else { - if (dbt_rdump(dbenv, &data)) + if (dbt_rdump(dbenv, + &data, ldg->blob_threshold, &streaming)) goto err; } } else { @@ -529,10 +562,12 @@ key_data: if ((readp->data = malloc(readp->ulen = 1024)) == NULL) { if (dbt_rrecno(dbenv, readp, hexkeys)) goto err; } else - if (dbt_rdump(dbenv, readp)) + if (dbt_rdump( + dbenv, readp, 0, &streaming)) goto err; - if (!G(endodata) && dbt_rdump(dbenv, &data)) { + if (!G(endodata) && dbt_rdump(dbenv, + &data, ldg->blob_threshold, &streaming)) { odd_count: dbenv->errx(dbenv, DB_STR("5079", "odd number of key/data pairs")); goto err; @@ -545,9 +580,9 @@ retry: if (put_flags != 0 && txn != NULL) if ((ret = dbenv->txn_begin(dbenv, txn, &ctxn, 0)) != 0) goto err; - switch (ret = ((put_flags == 0) ? - dbc->put(dbc, writep, &data, DB_KEYLAST) : - dbp->put(dbp, ctxn, writep, &data, put_flags))) { + ret = putdata(dbp, + dbc, ctxn, writep, &data, put_flags, streaming); + switch (ret) { case 0: if (ctxn != NULL) { if ((ret = @@ -564,7 +599,7 @@ retry: !keyflag ? recno : recno * 2 - 1); (void)dbenv->prdbt(&key, - checkprint, 0, stderr, __db_pr_callback, 0, 0); + checkprint, 0, stderr, __db_pr_callback, 0, 0, 0); break; case DB_LOCK_DEADLOCK: /* If we have a child txn, retry--else it's fatal. */ @@ -630,6 +665,94 @@ err: rval = 1; } /* + * putdata -- + * Put data read from the load file into the database. + */ +int +putdata(dbp, dbc, txn, writep, data, put_flags, streaming) + DB *dbp; + DBC *dbc; + DB_TXN *txn; + DBT *writep; + DBT *data; + u_int32_t put_flags; + int streaming; +{ + DBC *local; + DBT partial; + DB_ENV *dbenv; + DB_STREAM *dbs; + db_off_t offset; + int ret, t_ret; + u_int32_t flags; + + local = NULL; + dbenv = dbp->dbenv; + dbs = NULL; + offset = data->size; + flags = 0; + ret = t_ret = 0; + memset(&partial, 0, sizeof(DBT)); + partial.flags |= DB_DBT_PARTIAL; + + if (streaming != 0) + data->flags |= DB_DBT_BLOB; + + ret = ((put_flags == 0) ? + dbc->put(dbc, writep, data, DB_KEYLAST | flags) : + dbp->put(dbp, txn, writep, data, put_flags | flags)); + + if (ret != 0 || streaming == 0) + return (ret); + + /* Stream the rest of the data into the data item. */ + F_CLR(data, DB_DBT_BLOB); + if (dbc != NULL) + local = dbc; + else { + if ((ret = dbp->cursor(dbp, txn, &local, 0)) != 0) + goto err; + if ((ret = local->get(local, writep, &partial, DB_SET)) != 0) + goto err; + } + + if (data->ulen < MEGABYTE) { + if ((data->data = realloc( + data->data, data->ulen = MEGABYTE)) == NULL) { + dbp->dbenv->err(dbp->dbenv, ENOMEM, NULL); + goto err; + } + } + + if ((ret = local->db_stream(local, &dbs, DB_STREAM_WRITE)) != 0) + goto err; + + /* Load the blob piecemeal into the database. */ + while (streaming != 0 && !G(endodata)) { + if (dbt_rdump(dbenv, data, data->ulen, &streaming)) { + ret = EIO; + goto err; + } + + if ((ret = dbs->write(dbs, data, offset, 0)) != 0) + goto err; + + offset += data->size; + } + +err: if (dbs != NULL){ + if ((t_ret = dbs->close(dbs, 0)) != 0 && ret == 0) + ret = t_ret; + } + if (local != NULL && local != dbc) { + if ((t_ret = local->close(local)) != 0 && ret == 0) + ret = t_ret; + } + + return (ret); +} + +/* * env_create -- * Create the environment and initialize it for error reporting. */ @@ -654,6 +777,20 @@ env_create(dbenvp, ldg) dbenv->err(dbenv, ret, "set_passwd"); return (ret); } + + /* Configure blobs. */ + if (ldg->blob_threshold != 0 && + (ret = dbenv->set_blob_threshold( + dbenv, ldg->blob_threshold, 0)) != 0) { + dbenv->err(dbenv, ret, "set_blob_threshold"); + return (ret); + } + if (ldg->blob_dir != NULL && + (ret = dbenv->set_blob_dir(dbenv, ldg->blob_dir)) != 0) { + dbenv->err(dbenv, ret, "set_blob_dir"); + return (ret); + } + if ((ret = db_init(dbenv, ldg->home, ldg->cache, &ldg->private)) != 0) return (ret); dbenv->app_private = ldg; @@ -681,7 +818,7 @@ db_init(dbenv, home, cache, is_private) DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN; if ((ret = dbenv->open(dbenv, home, flags, 0)) == 0) return (0); - if (ret == DB_VERSION_MISMATCH) + if (ret == DB_VERSION_MISMATCH || ret == DB_REP_LOCKOUT) goto err; /* @@ -881,7 +1018,7 @@ rheader(dbenv, dbp, dbtypep, subdbp, checkprintp, keysp, part_keyp) DBT *keys, *kp; size_t buflen, linelen, start; long val; - int ch, first, hdr, ret; + int ch, first, hdr, ret, streaming; char *buf, *name, *p, *value; u_int32_t heap_bytes, heap_gbytes, i, nparts; @@ -1057,13 +1194,14 @@ rheader(dbenv, dbp, dbtypep, subdbp, checkprintp, keysp, part_keyp) } nparts = (u_int32_t) val; if ((keys = - malloc(nparts * sizeof(DBT))) == NULL) { + calloc(nparts, sizeof(DBT))) == NULL) { dbenv->err(dbenv, ENOMEM, NULL); goto err; } keys[nparts - 1].data = NULL; kp = keys; for (i = 1; i < nparts; kp++, i++) { + streaming = 0; if ((kp->data = malloc(kp->ulen = 1024)) == NULL) { dbenv->err(dbenv, ENOMEM, NULL); @@ -1073,7 +1211,7 @@ rheader(dbenv, dbp, dbtypep, subdbp, checkprintp, keysp, part_keyp) if (dbt_rprint(dbenv, kp)) goto err; } else { - if (dbt_rdump(dbenv, kp)) + if (dbt_rdump(dbenv, kp, 0, &streaming)) goto err; } } @@ -1094,12 +1232,14 @@ rheader(dbenv, dbp, dbtypep, subdbp, checkprintp, keysp, part_keyp) NULL, value, 0, LONG_MAX, &val)) != 0) goto nameerr; heap_bytes = (u_int32_t)val; + continue; } if (strcmp(name, "heap_gbytes") == 0) { if ((ret = __db_getlong(dbenv, NULL, value, 0, LONG_MAX, &val)) != 0) goto nameerr; heap_gbytes = (u_int32_t)val; + continue; } dbp->errx(dbp, DB_STR_A("5086", @@ -1135,10 +1275,8 @@ err: ret = 1; } if (name != NULL) free(name); - if (ret != 0) { - *part_keyp = NULL; - free_keys(keys); - } + *part_keyp = NULL; + free_keys(keys); return (ret); } @@ -1321,9 +1459,11 @@ dbt_rprint(dbenv, dbtp) * Read a byte dump line into a DBT structure. */ int -dbt_rdump(dbenv, dbtp) +dbt_rdump(dbenv, dbtp, blob_threshold, streaming) DB_ENV *dbenv; DBT *dbtp; + u_int32_t blob_threshold; + int *streaming; { u_int32_t len; u_int8_t *p; @@ -1332,7 +1472,11 @@ dbt_rdump(dbenv, dbtp) ++G(lineno); - first = 1; + if (*streaming != 0) + first = 0; + else + first = 1; + *streaming = 0; for (p = dbtp->data, len = 0; (c1 = getchar()) != '\n';) { if (c1 == EOF) { if (len == 0) { @@ -1359,7 +1503,16 @@ dbt_rdump(dbenv, dbtp) if ((c2 = getchar()) == EOF) return (badend(dbenv)); if (len >= dbtp->ulen - 10) { - dbtp->ulen *= 2; + if (dbtp->ulen == UINT32_MAX) { + dbenv->errx(dbenv, DB_STR("5143", + "Encountered a data item too large to store in the database. " + "Enable blob_threshold to store it.")); + return (DB_BUFFER_SMALL); + } + if (dbtp->ulen > UINT32_MAX/2) + dbtp->ulen = UINT32_MAX; + else + dbtp->ulen *= 2; if ((dbtp->data = realloc(dbtp->data, dbtp->ulen)) == NULL) { dbenv->err(dbenv, ENOMEM, NULL); @@ -1369,6 +1522,10 @@ dbt_rdump(dbenv, dbtp) } ++len; DIGITIZE(*p++, c1, c2); + if (blob_threshold != 0 && len > BLOB_LOADING_SIZE) { + *streaming = 1; + break; + } } dbtp->size = len; @@ -1486,6 +1643,8 @@ usage() "[-h home] [-P password] [-t btree | hash | recno | queue] db_file"); (void)fprintf(stderr, "usage: %s %s\n", progname, "-r lsn | fileid [-h home] [-P password] db_file"); + (void)fprintf(stderr, "usage: %s %s\n", + progname, "-b [blob_dir] -o [blob_threshold] db_file"); return (EXIT_FAILURE); } |