path: root/dev/run
diff options
Diffstat (limited to 'dev/run')
1 files changed, 0 insertions, 862 deletions
diff --git a/dev/run b/dev/run
deleted file mode 100755
index 05ed16abb..000000000
--- a/dev/run
+++ /dev/null
@@ -1,862 +0,0 @@
-#!/usr/bin/env python3
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-import atexit
-import base64
-import contextlib
-import functools
-import glob
-import inspect
-import json
-import ntpath
-import optparse
-import os
-import posixpath
-import re
-import socket
-import subprocess as sp
-import sys
-import time
-import uuid
-import traceback
-from configparser import ConfigParser
-from pbkdf2 import pbkdf2_hex
-COMMON_SALT = uuid.uuid4().hex
- from urllib.request import urlopen
-except ImportError:
- from urllib.request import urlopen
- import http.client as httpclient
-except ImportError:
- import http.client as httpclient
-def toposixpath(path):
- if os.sep == ntpath.sep:
- return path.replace(ntpath.sep, posixpath.sep)
- else:
- return path
-def log(msg):
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- def print_(chars):
- if log.verbose:
- sys.stdout.write(chars)
- sys.stdout.flush()
- argnames = list(inspect.signature(func).parameters.keys())
- callargs = dict(list(zip(argnames, args)))
- callargs.update(kwargs)
- print_("[ * ] " + msg.format(**callargs) + " ... ")
- try:
- res = func(*args, **kwargs)
- except KeyboardInterrupt:
- print_("ok\n")
- except Exception as err:
- print_("failed: %s\n" % err)
- raise
- else:
- print_("ok\n")
- return res
- return wrapper
- return decorator
-log.verbose = True
-def main():
- ctx = setup()
- startup(ctx)
- if ctx["cmd"]:
- run_command(ctx, ctx["cmd"])
- else:
- join(ctx, cluster_port(ctx, 1), *ctx["admin"])
-def setup():
- opts, args = setup_argparse()
- ctx = setup_context(opts, args)
- setup_logging(ctx)
- setup_dirs(ctx)
- check_beams(ctx)
- check_boot_script(ctx)
- setup_configs(ctx)
- return ctx
-def setup_logging(ctx):
- log.verbose = ctx["verbose"]
-def setup_argparse():
- parser = get_args_parser()
- return parser.parse_args()
-def get_args_parser():
- parser = optparse.OptionParser(description="Runs CouchDB 2.0 dev cluster")
- parser.add_option(
- "-a",
- "--admin",
- metavar="USER:PASS",
- default=None,
- help="Add an admin account to the development cluster",
- )
- parser.add_option(
- "-n",
- "--nodes",
- metavar="nodes",
- default=3,
- type=int,
- help="Number of development nodes to be spun up",
- )
- parser.add_option(
- "-q",
- "--quiet",
- action="store_false",
- dest="verbose",
- default=True,
- help="Don't print anything to STDOUT",
- )
- parser.add_option(
- "--with-admin-party-please",
- dest="with_admin_party",
- default=False,
- action="store_true",
- help="Runs a dev cluster with admin party mode on",
- )
- parser.add_option(
- "--enable-erlang-views",
- action="store_true",
- help="Enables the Erlang view server",
- )
- parser.add_option(
- "--no-join",
- dest="no_join",
- default=False,
- action="store_true",
- help="Do not join nodes on boot",
- )
- parser.add_option(
- "--with-haproxy",
- dest="with_haproxy",
- default=False,
- action="store_true",
- help="Use HAProxy",
- )
- parser.add_option(
- "--haproxy", dest="haproxy", default="haproxy", help="HAProxy executable path"
- )
- parser.add_option(
- "--haproxy-port", dest="haproxy_port", default="5984", help="HAProxy port"
- )
- parser.add_option(
- "--node-number",
- dest="node_number",
- type=int,
- default=1,
- help="The node number to seed them when creating the node(s)",
- )
- parser.add_option(
- "-c",
- "--config-overrides",
- action="append",
- default=[],
- help="Optional key=val config overrides. Can be repeated",
- )
- parser.add_option(
- "--erlang-config",
- dest="erlang_config",
- default="rel/files/sys.config",
- help="Specify an alternative Erlang application configuration",
- )
- parser.add_option(
- "--degrade-cluster",
- dest="degrade_cluster",
- type=int,
- default=0,
- help="The number of nodes that should be stopped after cluster config",
- )
- parser.add_option(
- "--no-eval",
- action="store_true",
- default=False,
- help="Do not eval subcommand output",
- )
- parser.add_option(
- "--auto-ports",
- dest="auto_ports",
- default=False,
- action="store_true",
- help="Select available ports for nodes automatically",
- )
- parser.add_option(
- "--extra_args",
- dest="extra_args",
- default=None,
- help="Extra arguments to pass to beam process",
- )
- parser.add_option(
- "-l",
- "--locald-config",
- dest="locald_configs",
- action="append",
- default=[],
- help="Path to config to place in 'local.d'. Can be repeated",
- )
- return parser
-def setup_context(opts, args):
- fpath = os.path.abspath(__file__)
- return {
- "N": opts.nodes,
- "no_join": opts.no_join,
- "with_admin_party": opts.with_admin_party,
- "enable_erlang_views": opts.enable_erlang_views,
- "admin": opts.admin.split(":", 1) if opts.admin else None,
- "nodes": ["node%d" % (i + opts.node_number) for i in range(opts.nodes)],
- "node_number": opts.node_number,
- "degrade_cluster": opts.degrade_cluster,
- "devdir": os.path.dirname(fpath),
- "rootdir": os.path.dirname(os.path.dirname(fpath)),
- "cmd": " ".join(args),
- "verbose": opts.verbose,
- "with_haproxy": opts.with_haproxy,
- "haproxy": opts.haproxy,
- "haproxy_port": opts.haproxy_port,
- "config_overrides": opts.config_overrides,
- "erlang_config": opts.erlang_config,
- "no_eval": opts.no_eval,
- "extra_args": opts.extra_args,
- "reset_logs": True,
- "procs": [],
- "auto_ports": opts.auto_ports,
- "locald_configs": opts.locald_configs,
- }
-@log("Setup environment")
-def setup_dirs(ctx):
- ensure_dir_exists(ctx["devdir"], "logs")
-def ensure_dir_exists(root, *segments):
- path = os.path.join(root, *segments)
- if not os.path.exists(path):
- os.makedirs(path)
- return path
-@log("Ensure CouchDB is built")
-def check_beams(ctx):
- for fname in glob.glob(os.path.join(ctx["devdir"], "*.erl")):
- sp.check_call(["erlc", "-o", ctx["devdir"] + os.sep, fname])
-@log("Ensure Erlang boot script exists")
-def check_boot_script(ctx):
- if not os.path.exists(os.path.join(ctx["devdir"], "devnode.boot")):
- env = os.environ.copy()
- env["ERL_LIBS"] = os.path.join(ctx["rootdir"], "src")
- sp.check_call(["escript", "make_boot_script"], env=env, cwd=ctx["devdir"])
-@log("Prepare configuration files")
-def setup_configs(ctx):
- for idx, node in enumerate(ctx["nodes"]):
- cluster_port, backend_port, prometheus_port = get_ports(
- ctx, idx + ctx["node_number"]
- )
- env = {
- "prefix": toposixpath(ctx["rootdir"]),
- "package_author_name": "The Apache Software Foundation",
- "data_dir": toposixpath(
- ensure_dir_exists(ctx["devdir"], "lib", node, "data")
- ),
- "view_index_dir": toposixpath(
- ensure_dir_exists(ctx["devdir"], "lib", node, "data")
- ),
- "state_dir": toposixpath(
- ensure_dir_exists(ctx["devdir"], "lib", node, "data")
- ),
- "node_name": "-name %s@" % node,
- "cluster_port": cluster_port,
- "backend_port": backend_port,
- "prometheus_port": prometheus_port,
- "uuid": "fake_uuid_for_dev",
- "_default": "",
- }
- write_config(ctx, node, env)
- write_locald_configs(ctx, node, env)
- generate_haproxy_config(ctx)
-def write_locald_configs(ctx, node, env):
- for locald_config in ctx["locald_configs"]:
- config_src = os.path.join(ctx["rootdir"], locald_config)
- if os.path.exists(config_src):
- config_filename = os.path.basename(config_src)
- config_tgt = os.path.join(
- ctx["devdir"], "lib", node, "etc", "local.d", config_filename
- )
- with open(config_src) as handle:
- content =
- with open(config_tgt, "w") as handle:
- handle.write(content)
-def generate_haproxy_config(ctx):
- haproxy_config = os.path.join(ctx["devdir"], "lib", "haproxy.cfg")
- template = os.path.join(ctx["rootdir"], "rel", "haproxy.cfg")
- with open(template) as handle:
- config = handle.readlines()
- out = []
- for line in config:
- match = re.match("(.*?)<<(.*?)>>(.*?)", line, re.S)
- if match:
- prefix, template, suffix = match.groups()
- for node in ctx["nodes"]:
- node_idx = int(node.replace("node", ""))
- text = template.format(
- **{"node_idx": node_idx, "port": cluster_port(ctx, node_idx)}
- )
- out.append(prefix + text + suffix)
- else:
- out.append(line)
- with open(haproxy_config, "w") as handle:
- handle.write("\n".join(out))
-def apply_config_overrides(ctx, content):
- for kv_str in ctx["config_overrides"]:
- key, val = kv_str.split("=")
- key, val = key.strip(), val.strip()
- match = "[;=]{0,2}%s.*" % key
- repl = "%s = %s" % (key, val)
- content = re.sub(match, repl, content)
- return content
-def get_ports(ctx, idnode):
- assert idnode
- if idnode <= 5 and not ctx["auto_ports"]:
- return (
- (10000 * idnode) + 5984,
- (10000 * idnode) + 5986,
- (10000 * idnode) + 7986,
- )
- else:
- return tuple(get_available_ports(2))
-def get_available_ports(num):
- ports = []
- while len(ports) < num + 1:
- with contextlib.closing(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ) as soc:
- soc.bind(("localhost", 0))
- _, port = soc.getsockname()
- if port not in ports:
- ports.append(port)
- return ports
-def get_node_config(ctx, node_idx):
- node = "node{}".format(node_idx)
- config_dir = os.path.join(ctx["devdir"], "lib", node, "etc")
- config = ConfigParser()
- [os.path.join(config_dir, "default.ini"), os.path.join(config_dir, "local.ini")]
- )
- return config
-def backend_port(ctx, n):
- return int(get_node_config(ctx, n).get("httpd", "port"))
-def cluster_port(ctx, n):
- return int(get_node_config(ctx, n).get("chttpd", "port"))
-def write_config(ctx, node, env):
- etc_src = os.path.join(ctx["rootdir"], "rel", "overlay", "etc")
- etc_tgt = ensure_dir_exists(ctx["devdir"], "lib", node, "etc")
- for fname in glob.glob(os.path.join(etc_src, "*")):
- base = os.path.basename(fname)
- tgt = os.path.join(etc_tgt, base)
- if os.path.isdir(fname):
- continue
- with open(fname) as handle:
- content =
- for key in env:
- content = re.sub("{{%s}}" % key, str(env[key]), content)
- if base == "default.ini":
- content = hack_default_ini(ctx, node, content)
- content = apply_config_overrides(ctx, content)
- elif base == "local.ini":
- content = hack_local_ini(ctx, content)
- with open(tgt, "w") as handle:
- handle.write(content)
- ensure_dir_exists(etc_tgt, "local.d")
-def boot_haproxy(ctx):
- if not ctx["with_haproxy"]:
- return
- config = os.path.join(ctx["devdir"], "lib", "haproxy.cfg")
- cmd = [ctx["haproxy"], "-f", config]
- logfname = os.path.join(ctx["devdir"], "logs", "haproxy.log")
- log = open(logfname, "w")
- env = os.environ.copy()
- if "HAPROXY_PORT" not in env:
- env["HAPROXY_PORT"] = ctx["haproxy_port"]
- return sp.Popen(
- " ".join(cmd), shell=True, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env
- )
-def hack_default_ini(ctx, node, contents):
- contents = re.sub(
- "^\[httpd\]$",
- "[httpd]\nenable = true",
- contents,
- flags=re.MULTILINE,
- )
- if ctx["enable_erlang_views"]:
- contents = re.sub(
- "^\[native_query_servers\]$",
- "[native_query_servers]\nerlang = {couch_native_process, start_link, []}",
- contents,
- flags=re.MULTILINE,
- )
- contents = re.sub("n=3", "n=%s" % ctx["N"], contents)
- return contents
-def hack_local_ini(ctx, contents):
- # make sure all three nodes have the same secret
- secret_line = "secret = %s\n" % COMMON_SALT
- previous_line = "; require_valid_user = false\n"
- contents = contents.replace(previous_line, previous_line + secret_line)
- if ctx["with_admin_party"]:
- ctx["admin"] = ("Admin Party!", "You do not need any password.")
- return contents
- # handle admin credentials passed from cli or generate own one
- if ctx["admin"] is None:
- ctx["admin"] = user, pswd = "root", gen_password()
- else:
- user, pswd = ctx["admin"]
- return contents + "\n%s = %s" % (user, hashify(pswd))
-def gen_password():
- # TODO: figure how to generate something more friendly here
- return base64.b64encode(os.urandom(6)).decode()
-def hashify(pwd, salt=COMMON_SALT, iterations=10, keylen=20):
- """
- Implements password hashing according to:
- -
- -
- This test uses 'candeira:candeira'
- >>> hashify(candeira)
- -pbkdf2-99eb34d97cdaa581e6ba7b5386e112c265c5c670,d1d2d4d8909c82c81b6c8184429a0739,10
- """
- derived_key = pbkdf2_hex(pwd, salt, iterations, keylen)
- return "-pbkdf2-%s,%s,%s" % (derived_key, salt, iterations)
-def startup(ctx):
- atexit.register(kill_processes, ctx)
- boot_nodes(ctx)
- ensure_all_nodes_alive(ctx)
- if ctx["no_join"]:
- return
- if ctx["with_admin_party"]:
- cluster_setup_with_admin_party(ctx)
- else:
- cluster_setup(ctx)
- if ctx["degrade_cluster"] > 0:
- degrade_cluster(ctx)
-def kill_processes(ctx):
- for proc in ctx["procs"]:
- if proc and proc.returncode is None:
- proc.kill()
-def degrade_cluster(ctx):
- if ctx["with_haproxy"]:
- haproxy_proc = ctx["procs"].pop()
- for i in range(0, ctx["degrade_cluster"]):
- proc = ctx["procs"].pop()
- if proc is not None:
- kill_process(proc)
- if ctx["with_haproxy"]:
- ctx["procs"].append(haproxy_proc)
-@log("Stoping proc {}")
-def kill_process(proc):
- if proc and proc.returncode is None:
- proc.kill()
-def boot_nodes(ctx):
- for node in ctx["nodes"]:
- ctx["procs"].append(boot_node(ctx, node))
- haproxy_proc = boot_haproxy(ctx)
- if haproxy_proc is not None:
- ctx["procs"].append(haproxy_proc)
-def ensure_all_nodes_alive(ctx):
- status = dict((num, False) for num in list(range(ctx["N"])))
- for _ in range(10):
- for num in range(ctx["N"]):
- if status[num]:
- continue
- local_port = cluster_port(ctx, num + 1)
- url = "{0}/".format(local_port)
- try:
- check_node_alive(url)
- except:
- pass
- else:
- status[num] = True
- if all(status.values()):
- return
- time.sleep(1)
- if not all(status.values()):
- print("Failed to start all the nodes." " Check the dev/logs/*.log for errors.")
- sys.exit(1)
-@log("Check node at {url}")
-def check_node_alive(url):
- error = None
- for _ in range(10):
- try:
- with contextlib.closing(urlopen(url)):
- pass
- except Exception as exc:
- error = exc
- time.sleep(1)
- else:
- error = None
- break
- if error is not None:
- raise error
-def set_boot_env(ctx):
- # fudge fauxton path
- if os.path.exists("src/fauxton/dist/release"):
- fauxton_root = "src/fauxton/dist/release"
- else:
- fauxton_root = "share/www"
- os.environ["COUCHDB_FAUXTON_DOCROOT"] = fauxton_root
- # fudge default query server paths
- couchjs = os.path.join(ctx["rootdir"], "src", "couch", "priv", "couchjs")
- mainjs = os.path.join(ctx["rootdir"], "share", "server", "main.js")
- coffeejs = os.path.join(ctx["rootdir"], "share", "server", "main-coffee.js")
- qs_javascript = toposixpath("%s %s" % (couchjs, mainjs))
- qs_coffescript = toposixpath("%s %s" % (couchjs, coffeejs))
- os.environ["COUCHDB_QUERY_SERVER_JAVASCRIPT"] = qs_javascript
- os.environ["COUCHDB_QUERY_SERVER_COFFEESCRIPT"] = qs_coffescript
-@log("Start node {node}")
-def boot_node(ctx, node):
- set_boot_env(ctx)
- env = os.environ.copy()
- env["ERL_LIBS"] = os.path.join(ctx["rootdir"], "src")
- node_etcdir = os.path.join(ctx["devdir"], "lib", node, "etc")
- reldir = os.path.join(ctx["rootdir"], "rel")
- cmd = [
- "erl",
- "-args_file",
- os.path.join(node_etcdir, "vm.args"),
- "-config",
- os.path.join(ctx["rootdir"], ctx["erlang_config"]),
- "-couch_ini",
- os.path.join(node_etcdir, "default.ini"),
- os.path.join(node_etcdir, "local.ini"),
- os.path.join(node_etcdir, "local.d"),
- "-reltool_config",
- os.path.join(reldir, "reltool.config"),
- "-parent_pid",
- str(os.getpid()),
- "-boot",
- os.path.join(ctx["devdir"], "devnode"),
- "-pa",
- ctx["devdir"],
- "-s monitor_parent",
- ]
- if ctx["reset_logs"]:
- mode = "wb"
- else:
- mode = "r+b"
- logfname = os.path.join(ctx["devdir"], "logs", "%s.log" % node)
- log = open(logfname, mode)
- if "extra_args" in ctx and ctx["extra_args"]:
- cmd += ctx["extra_args"].split(" ")
- cmd = [toposixpath(x) for x in cmd]
- return sp.Popen(cmd, stdin=sp.PIPE, stdout=log, stderr=sp.STDOUT, env=env)
-@log("Running cluster setup")
-def cluster_setup(ctx):
- lead_port = cluster_port(ctx, 1)
- if enable_cluster(ctx["N"], lead_port, *ctx["admin"]):
- for num in range(1, ctx["N"]):
- node_port = cluster_port(ctx, num + 1)
- node_name = ctx["nodes"][num]
- enable_cluster(ctx["N"], node_port, *ctx["admin"])
- add_node(lead_port, node_name, node_port, *ctx["admin"])
- finish_cluster(lead_port, *ctx["admin"])
- return lead_port
-def enable_cluster(node_count, port, user, pswd):
- conn = httpclient.HTTPConnection("", port)
- conn.request(
- "POST",
- "/_cluster_setup",
- json.dumps(
- {
- "action": "enable_cluster",
- "bind_address": "",
- "username": user,
- "password": pswd,
- "node_count": node_count,
- }
- ),
- {
- "Authorization": basic_auth_header(user, pswd),
- "Content-Type": "application/json",
- },
- )
- resp = conn.getresponse()
- if resp.status == 400:
- resp.close()
- return False
- assert resp.status == 201,
- resp.close()
- return True
-def add_node(lead_port, node_name, node_port, user, pswd):
- conn = httpclient.HTTPConnection("", lead_port)
- conn.request(
- "POST",
- "/_cluster_setup",
- json.dumps(
- {
- "action": "add_node",
- "host": "",
- "port": node_port,
- "name": node_name,
- "username": user,
- "password": pswd,
- }
- ),
- {
- "Authorization": basic_auth_header(user, pswd),
- "Content-Type": "application/json",
- },
- )
- resp = conn.getresponse()
- assert resp.status in (201, 409),
- resp.close()
-def set_cookie(port, user, pswd):
- conn = httpclient.HTTPConnection("", port)
- conn.request(
- "POST",
- "/_cluster_setup",
- json.dumps({"action": "receive_cookie", "cookie": generate_cookie()}),
- {
- "Authorization": basic_auth_header(user, pswd),
- "Content-Type": "application/json",
- },
- )
- resp = conn.getresponse()
- assert resp.status == 201,
- resp.close()
-def finish_cluster(port, user, pswd):
- conn = httpclient.HTTPConnection("", port)
- conn.request(
- "POST",
- "/_cluster_setup",
- json.dumps({"action": "finish_cluster"}),
- {
- "Authorization": basic_auth_header(user, pswd),
- "Content-Type": "application/json",
- },
- )
- resp = conn.getresponse()
- # 400 for already set up'ed cluster
- assert resp.status in (201, 400),
- resp.close()
-def basic_auth_header(user, pswd):
- return "Basic " + base64.b64encode((user + ":" + pswd).encode()).decode()
-def generate_cookie():
- return base64.b64encode(os.urandom(12)).decode()
-def cluster_setup_with_admin_party(ctx):
- connect_nodes(ctx)
- host, port = "", cluster_port(ctx, 1)
- create_system_databases(host, port)
-def connect_nodes(ctx):
- host, port = "", backend_port(ctx, 1)
- for node in ctx["nodes"]:
- path = "/_nodes/%s@" % node
- try_request(
- host,
- port,
- "PUT",
- path,
- (200, 201, 202, 409),
- body="{}",
- error="Failed to join %s into cluster:\n" % node,
- )
-def try_request(
- host, port, meth, path, success_codes, body=None, retries=10, retry_dt=1, error=""
- while True:
- conn = httpclient.HTTPConnection(host, port)
- conn.request(meth, path, body=body)
- resp = conn.getresponse()
- if resp.status in success_codes:
- return resp.status,
- elif retries <= 0:
- assert resp.status in success_codes, "%s%s" % (error,
- retries -= 1
- time.sleep(retry_dt)
-def create_system_databases(host, port):
- for dbname in ["_users", "_replicator", "_global_changes"]:
- conn = httpclient.HTTPConnection(host, port)
- conn.request("HEAD", "/" + dbname)
- resp = conn.getresponse()
- if resp.status == 404:
- try_request(
- host,
- port,
- "PUT",
- "/" + dbname,
- (201, 202, 412),
- error="Failed to create '%s' database:\n" % dbname,
- )
- "Developers cluster is set up at{lead_port}.\n"
- "Admin username: {user}\n"
- "Password: {password}\n"
- "Time to hack!"
-def join(ctx, lead_port, user, password):
- while True:
- for proc in ctx["procs"]:
- if proc is not None and proc.returncode is not None:
- exit(1)
- time.sleep(2)
-@log("Exec command {cmd}")
-def run_command(ctx, cmd):
- if ctx["no_eval"]:
- p = sp.Popen(cmd, shell=True)
- p.wait()
- exit(p.returncode)
- else:
- p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sys.stderr)
- while True:
- line = p.stdout.readline()
- if not line:
- break
- eval(line)
- p.wait()
- exit(p.returncode)
-@log("Restart all nodes")
-def reboot_nodes(ctx):
- ctx["reset_logs"] = False
- kill_processes(ctx)
- boot_nodes(ctx)
- ensure_all_nodes_alive(ctx)
-if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- pass