diff options
author | Ronny Berndt <ronny@apache.org> | 2022-11-29 15:37:04 +0100 |
---|---|---|
committer | Ronny Berndt <ronny@apache.org> | 2023-02-28 09:42:39 +0100 |
commit | 177219f68daf5926c46cb8c75ddfa757beaa6afa (patch) | |
tree | fb1fc852b2a336d540870741594f4da03638992a | |
parent | 54879f9a5d093b8000d64070e7de323e155f2a2a (diff) | |
download | couchdb-177219f68daf5926c46cb8c75ddfa757beaa6afa.tar.gz |
Deno Test
Simple test branch with work from Jan.
-rwxr-xr-x | dev/run | 4 | ||||
-rw-r--r-- | share/server/main-deno.js | 475 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_updater.erl | 10 |
3 files changed, 485 insertions, 4 deletions
@@ -604,14 +604,18 @@ def set_boot_env(ctx): # fudge default query server paths couchjs = os.path.join(ctx["rootdir"], "src", "couch", "priv", "couchjs") mainjs = os.path.join(ctx["rootdir"], "share", "server", "main.js") + denojs = os.path.join(ctx["rootdir"], "share", "server", "main-deno.js") coffeejs = os.path.join(ctx["rootdir"], "share", "server", "main-coffee.js") qs_javascript = toposixpath("%s %s" % (couchjs, mainjs)) qs_coffescript = toposixpath("%s %s" % (couchjs, coffeejs)) + qs_deno = toposixpath("%s %s" % ("deno run --allow-write", denojs)) + os.environ["COUCHDB_QUERY_SERVER_JAVASCRIPT"] = qs_javascript os.environ["COUCHDB_QUERY_SERVER_COFFEESCRIPT"] = qs_coffescript + os.environ["COUCHDB_QUERY_SERVER_DENO"] = qs_deno @log("Start node {node}") def boot_node(ctx, node): diff --git a/share/server/main-deno.js b/share/server/main-deno.js new file mode 100644 index 000000000..d06f24b54 --- /dev/null +++ b/share/server/main-deno.js @@ -0,0 +1,475 @@ +const MAX_LINE = 10240 + +class IO { + constructor () { + // this.encoder = new TextEncoder() + } + + async readline () { + let buffer = new Uint8Array(MAX_LINE) + let bytes + while (bytes = await Deno.read(Deno.stdin.rid, buffer)) { + let text = new TextDecoder().decode(buffer).substr(0, bytes - 1) // strip newline + return text + } + } + + async print (msg) { + this.encoder = new TextEncoder() + await Deno.write(Deno.stdout.rid, this.encoder.encode(msg + '\n')) + // console.log(msg) + } + + writeline (obj) { + try { + this.print(JSON.stringify(obj)) + } catch (e) { + this.log("Error converting object to JSON: " + e.toString()) + this.log("error on obj: " + (obj.toSource ? obj.toSource() : obj.toString())) + } + } + + log (message) { + if (typeof message == "xml") { + message = message.toXMLString(); + } else if (typeof message != "string") { + message = JSON.stringify(message); + } + this.writeline(["log", String(message)]); + } +} + +class State { + constructor (couch, io) { + this.couch = couch + this.io = io + this.funs = [] + this.lib = null + this.query_config = {} + } + + reset (config) { + // clear the globals and run gc + this.funs = [] + this.lib = null + this.query_config = config || {} + this.io.print("true"); // indicates success + } + + addFun (newFun) { + // Compile to a function and add it to funs array + this.funs.push(this.couch.compileFunction(newFun, {views : {lib : this.lib}})) + Debug.debug(`this.funs`) + Debug.debug(this.funs) + this.io.print("true") + } + + addLib (lib) { // TODO: take out + this.lib = lib + this.io.print("true") + } +} + +class Filter { + constructor (couch, io) { + this.couch = couch + this.io = io + this.view_emit = false; + } + + emit (key, value) { + view_emit = true; + } + + createFilterSandbox () { + var sandbox = this.couch.createSandbox(); + sandbox.emit = this.emit; + return sandbox; + } + + filter (fun, ddoc, args) { + var results = []; + var docs = args[0]; + var req = args[1]; + for (var i=0; i < docs.length; i++) { + results.push((fun.apply(ddoc, [docs[i], req]) && true) || false); + }; + this.io.writeline([true, results]); + } + + filter_view (fun, ddoc, args) { + // recompile + var sandbox = this.createFlterSandbox(); + var source = fun.toSource ? fun.toSource() : '(' + fun.toString() + ')'; + + const emit = sandbox.emit; + const log = sandbox.log; + const JSON = sandbox.JSON; + + fun = eval(source); + + var results = []; + var docs = args[0]; + for (var i=0; i < docs.length; i++) { + view_emit = false; + fun(docs[i]); + results.push((view_emit && true) || false); + }; + this.io.writeline([true, results]); + } +} + + +class DesignDoc { + constructor (couch, io) { + this.couch = couch + this.io = io + this.validate = new Validate(io) + this.filter = new Filter(couch, io) + this.ddoc_dispatch = { + "filters": [this.filter, this.filter.filter], + "views": [this.filter, this.filter.filter_view], + "validate_doc_update": [this.validate, this.validate.validate] + } + this.ddocs = {} + } + + ddoc () { + Debug.debug('> ddoc()') + var args = [] + for (var i=0; i < arguments.length; i++) { + args.push(arguments[i]) + } + Debug.debug('ddoc args' + JSON.stringify(args)) + var ddocId = args.shift() + if (ddocId == "new") { + // get the real ddocId. + ddocId = args.shift() + // store the ddoc, functions are lazily compiled. + this.ddocs[ddocId] = args.shift() + Debug.debug('ddocs: ' + JSON.stringify(this.ddocs)) + this.io.print("true") + } else { + // Couch makes sure we know this ddoc already. + Debug.debug('ddocs: known') + var ddoc = this.ddocs[ddocId] + if (!ddoc) throw(["fatal", "query_protocol_error", "uncached design doc: "+ddocId]) + var funPath = args.shift() + var cmd = funPath[0] + // the first member of the fun path determines the type of operation + var funArgs = args.shift() + if (this.ddoc_dispatch[cmd]) { + // get the function, call the command with it + var point = ddoc + for (var i=0; i < funPath.length; i++) { + if (i+1 == funPath.length) { + var fun = point[funPath[i]] + if (!fun) { + throw(["error","not_found", + "missing " + funPath[0] + " function " + funPath[i] + + " on design doc " + ddocId]) + } + if (typeof fun != "function") { + fun = this.couch.compileFunction(fun, ddoc, funPath.join('.')) + // cache the compiled fun on the ddoc + point[funPath[i]] = fun + } + } else { + point = point[funPath[i]] + } + } + + // run the correct responder with the cmd body + const [object, method] = this.ddoc_dispatch[cmd] + method.apply(object, [fun, ddoc, funArgs]) + } else { + // unknown command, quit and hope the restarted version is better + throw(["fatal", "unknown_command", "unknown ddoc command '" + cmd + "'"]) + } + } + } +} + +class Views { + constructor (couch, io, state) { + this.couch = couch + this.io = io + this.map_results = [] // holds temporary emitted values during doc map + this.state = state + } + + runReduce (reduceFuns, keys, values, rereduce) { + var code_size = 0 + for (var i in reduceFuns) { + var fun_body = reduceFuns[i] + code_size += fun_body.length + reduceFuns[i] = this.couch.compileFunction(fun_body) + } + var reductions = new Array(reduceFuns.length) + for(var i = 0; i < reduceFuns.length; i++) { + try { + reductions[i] = reduceFuns[i](keys, values, rereduce) + } catch (err) { + Debug.debug(`reduce err`) + Debug.debug(JSON.stringify(doc)) + handleViewError(err) + // if the error is not fatal, ignore the results and continue + reductions[i] = null + } + } + var reduce_line = JSON.stringify(reductions) + var reduce_length = reduce_line.length + var input_length = this.state.line_length - code_size + // TODO make reduce_limit config into a number + if (this.state.query_config && this.state.query_config.reduce_limit && + reduce_length > 4096 && ((reduce_length * 2) > input_length)) { + var log_message = [ + "Reduce output must shrink more rapidly:", + "input size:", input_length, + "output size:", reduce_length + ].join(" ") + if (this.state.query_config.reduce_limit === "log") { + this.io.log("reduce_overflow_error: " + log_message) + this.io.writeline("[true," + reduce_line + "]") + } else { + throw(["error", "reduce_overflow_error", log_message]) + } + } else { + this.io.print("[true," + reduce_line + "]") + } + } + + handleViewError(err, doc) { + if (err == "fatal_error") { + // Only if it's a "fatal_error" do we exit. What's a fatal error? + // That's for the query to decide. + // + // This will make it possible for queries to completely error out, + // by catching their own local exception and rethrowing a + // fatal_error. But by default if they don't do error handling we + // just eat the exception and carry on. + // + // In this case we abort map processing but don't destroy the + // JavaScript process. If you need to destroy the JavaScript + // process, throw the error form matched by the block below. + throw(["error", "map_runtime_error", "function raised 'fatal_error'"]) + } else if (err[0] == "fatal") { + // Throwing errors of the form ["fatal","error_key","reason"] + // will kill the OS process. This is not normally what you want. + throw(err) + } + var message = "function raised exception " + + (err.toSource ? err.toSource() : err.stack) + if (doc) message += " with doc._id " + doc._id + this.io.log(message) + } + + // view helper function + emit (key, value) { + Debug.debug(`emit called`) + Debug.debug(key) + Debug.debug(value) + this.map_results.push([key, value]) + } + + sum (values) { + var rv = 0 + for (var i in values) { + rv += values[i] + } + return rv + } + + reduce (reduceFuns, kvs) { + var keys = new Array(kvs.length) + var values = new Array(kvs.length) + for(var i = 0; i < kvs.length; i++) { + keys[i] = kvs[i][0] + values[i] = kvs[i][1] + } + this.runReduce(reduceFuns, keys, values, false) + } + + rereduce (reduceFuns, values) { + this.runReduce(reduceFuns, null, values, true) + } + + mapDoc (doc) { + // Compute all the map functions against the document. + // + // Each function can output multiple key/value pairs for each document. + // + // Example output of map_doc after three functions set by add_fun cmds: + // [ + // [["Key","Value"]], <- fun 1 returned 1 key value + // [], <- fun 2 returned 0 key values + // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values + // ] + // + + // Couch.recursivelySeal(doc) + + var buf = [] + for (var fun in this.state.funs) { + this.map_results = [] + try { + this.state.funs[fun](doc) + buf.push(this.map_results) + } catch (err) { + Debug.debug(`map_doc err ${err}`) + this.handleViewError(err, doc) + // If the error is not fatal, we treat the doc as if it + // did not emit anything, by buffering an empty array. + buf.push([]) + } + } + + this.io.writeline(buf) + } +} + +class Validate { + constructor (io) { + this.io = io + } + + validate (fun, ddoc, args) { + try { + fun.apply(ddoc, args); + this.io.writeline(1); + } catch (error) { + if (error.name && error.stack) { + throw error; + } + this.io.writeline(error); + } + } +}; + + +class Debug { + static async debug(message) { + // await Deno.writeTextFileSync('/tmp/deno-qs.log', message.toString() + '\n') + } +} + +class Couch { + constructor (qs) { + this.qs = qs + } + + compileFunction (source, ddoc, name) { + if (!source) throw (["error", "not_found", "missing function"]); + + var functionObject = null; + var sandbox = this.qs.createSandbox(); + + try { + var rewrittenFun = `(${source})` //rewriteFunInt(source); + const emit = sandbox.emit; + const sum = sandbox.sum; + const log = sandbox.log; + const JSON = sandbox.JSON; + /* const index = sandbox.index */; + var newRew = rewrittenFun + // console.log(newRew) + Debug.debug('nnnL: ' + newRew) + functionObject = eval(newRew); + } catch (err) { + throw ([ + "error", + "compilation_error", + (err.toSource ? err.toSource() : err.stack) + " (" + source + ")" + ]); + }; + if (typeof(functionObject) == "function") { + Debug.debug(`compiled fun ${functionObject}`) + + return functionObject; + } else { + throw (["error", "compilation_error", + "Expression does not eval to a function. (" + source.toString() + ")" + ]); + }; + } +} + +class QueryServer { + constructor () { + this.io = new IO() + this.couch = new Couch(this) + this.state = new State(this.couch, this.io) + this.ddoc = new DesignDoc(this.couch, this.io) + this.views = new Views(this.couch, this.io, this.state) + + this.dispatch = { + "ddoc": [this.ddoc, this.ddoc.ddoc], + "reset": [this.state, this.state.reset], + "add_fun": [this.state, this.state.addFun], + "map_doc": [this.views, this.views.mapDoc], + // "index_doc": Dreyfus.indexDoc, + "reduce": [this.views, this.views.reduce], + "rereduce": [this.views, this.views.rereduce] + } + } + + createSandbox () { + // if possible, use evalcx (not always available) + var sandbox = {}; //eval(''); + sandbox.emit = (k, v) => this.views.emit(k, v); + sandbox.sum = (k, v, r) => this.views.sum(k, v); + // sandbox.log = log; + sandbox.JSON = JSON; + // sandbox.index = Dreyfus.index; + return sandbox; + } + + handleError (e) { + Debug.debug(e) + Debug.debug(e.stack) + var type = e[0] + if (type == "fatal") { + e[0] = "error"; // we tell the client it was a fatal error by dying + this.io.writeline(e) + Deno.exit(-1) + } else if (type == "error") { + this.io.writeline(e) + } else if (e.error && e.reason) { + // compatibility with old error format + this.io.writeline(["error", e.error, e.reason]) + } else if (e.name) { + this.io.writeline(["error", e.name, e]) + } else { + this.io.writeline(["error", "unnamed_error", e.toSource ? e.toSource() : e.stack]) + } + } + + async loop () { + let cmd + let cmdkey + let line + while (line = await this.io.readline()) { + cmd = JSON.parse(line) + Debug.debug(`line: ` + JSON.stringify(cmd) + "\n") + this.state.line_length = line.length + try { + cmdkey = cmd.shift() + Debug.debug(`cmdkey: ` + JSON.stringify(cmdkey) + "\n") + Debug.debug(`cmd: ` + JSON.stringify(cmd) + "\n") + if (this.dispatch[cmdkey]) { + // run the correct responder with the cmd body + const [object, method] = this.dispatch[cmdkey] + method.apply(object, cmd) + } else { + // unknown command, quit and hope the restarted version is better + throw (["fatal", "unknown_command", "unknown command '" + cmdkey + "'"]) + } + } catch (e) { + this.handleError(e) + } + } + } +} + +const query_server = new QueryServer() +query_server.loop() diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl index fe2150505..e77d7bfed 100644 --- a/src/couch_index/src/couch_index_updater.erl +++ b/src/couch_index/src/couch_index_updater.erl @@ -28,7 +28,8 @@ -record(st, { idx, mod, - pid = nil + pid = nil, + start_ts = 0 }). start_link(Index, Module) -> @@ -60,7 +61,7 @@ handle_call({update, IdxState}, _From, #st{idx = Idx, mod = Mod} = State) -> Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], couch_log:info("Starting index update for db: ~s idx: ~s", Args), Pid = spawn_link(?MODULE, update, [Idx, Mod, IdxState]), - {reply, ok, State#st{pid = Pid}}; + {reply, ok, State#st{pid = Pid, start_ts = erlang:system_time(seconds)}}; handle_call({restart, IdxState}, _From, #st{idx = Idx, mod = Mod} = State) -> Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], couch_log:info("Restarting index update for db: ~s idx: ~s", Args), @@ -88,8 +89,9 @@ handle_cast(_Mesg, State) -> handle_info({'EXIT', _, {updated, Pid, IdxState}}, #st{pid = Pid} = State) -> Mod = State#st.mod, - Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], - couch_log:info("Index update finished for db: ~s idx: ~s", Args), + Index_time = erlang:system_time(seconds) - State#st.start_ts, + Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState), Index_time], + couch_log:info("Index update finished for db: ~s idx: ~s time: ~p s", Args), ok = gen_server:cast(State#st.idx, {updated, IdxState}), {noreply, State#st{pid = undefined}}; handle_info({'EXIT', _, {reset, Pid}}, #st{idx = Idx, pid = Pid} = State) -> |