From b05b17225e5de0b6d979c88b41a57edfade564dc Mon Sep 17 00:00:00 2001 From: Joan Touzet Date: Wed, 19 Apr 2017 17:26:56 -0400 Subject: New couchup 1.x -> 2.x database migration tool This commit adds a new Python-based database migration tool, couchup. It is intended to be used at the command-line on the server being upgraded, before bringing the node (or cluster) into service. couchup provides 4 subcommands to assist in the migration process: * list - lists all CouchDB 1.x databases * replicate - replicates one or more 1.x databases to CouchDB 2.x * rebuild - rebuilds one or more CouchDB 2.x views * delete - deletes one or more CouchDB 1.x databases A typical workflow for a single-node upgrade process would look like: ```sh $ couchdb list $ couchdb replicate -a $ couchdb rebuild -a $ couchdb delete -a ``` A clustered upgrade process would be the same, but must be preceded by setting up all the nodes in the cluster first. Various optional arguments provide for admin login/password, overriding ports, quiet mode and so on. Of special note is that `couchup rebuild` supports an optional flag, `-f`, to filter deleted documents during the replication process. I struggled some with the naming convention. For those in the know, a '1.x database' is a node-local database appearing only on port 5986, and a '2.x database' is a clustered database appearing on port 5984, and in raw, sharded form on port 5986. --- rel/overlay/bin/couchup | 480 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 480 insertions(+) create mode 100755 rel/overlay/bin/couchup diff --git a/rel/overlay/bin/couchup b/rel/overlay/bin/couchup new file mode 100755 index 000000000..858ccc836 --- /dev/null +++ b/rel/overlay/bin/couchup @@ -0,0 +1,480 @@ +#!/usr/bin/env python +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import argparse +import base64 +import json +import textwrap +import threading +import time +import sys +try: + from urllib import quote +except ImportError: + from urllib.parse import quote +import requests +try: + import progressbar + HAVE_BAR = True +except ImportError: + HAVE_BAR = False + +def _tojson(req): + """Support requests v0.x as well as 1.x+""" + if requests.__version__[0] == '0': + return json.loads(req.content) + return req.json() + +def _args(args): + args = vars(args) + if args['password']: + args['creds'] = (args['login'], args['password']) + else: + args['creds'] = None + return args + +def _do_list(args): + port = str(args['local_port']) + req = requests.get('http://127.0.0.1:' + port + '/_all_dbs', + auth=args['creds']) + req.raise_for_status() + dbs = _tojson(req) + local_dbs = [x for x in dbs if "shards" not in x + and x not in ['_dbs', '_nodes']] + clustered_dbs = list(set( + [x.split('/')[2].split('.')[0] for x in dbs if "shards" in x] + )) + if not args['include_system_dbs']: + # list comprehension to eliminate dbs starting with underscore + local_dbs = [x for x in local_dbs if x[0] != '_'] + clustered_dbs = [x for x in clustered_dbs if x[0] != '_'] + local_dbs.sort() + clustered_dbs.sort() + if args.get('clustered'): + return clustered_dbs + return local_dbs + +def _list(args): + args = _args(args) + ret = _do_list(args) + print(", ".join(ret)) + +def _watch_replication(db, + local_port=5986, + clustered_port=5984, + creds=None, + hide_progress_bar=False, + quiet=False, + timeout=30): + """Watches replication, optionally with a progressbar.""" + time.sleep(1) + if not quiet: + print("Replication started.") + url = "http://127.0.0.1:{}/{}".format(local_port, db) + try: + req = requests.get(url, auth=creds) + req.raise_for_status() + req = _tojson(req) + # here, local means node-local, i.e. source (1.x) database + local_docs = req['doc_count'] + local_size = req['data_size'] + except requests.exceptions.HTTPError: + raise Exception('Cannot retrieve {} doc_count!'.format(db)) + if local_size == 0: + return + if HAVE_BAR and not hide_progress_bar and not quiet: + widgets = [ + db, + ' ', progressbar.Percentage(), + ' ', progressbar.Bar(marker=progressbar.RotatingMarker()), + ' ', progressbar.ETA(), + ' ', progressbar.FileTransferSpeed(), + ] + progbar = progressbar.ProgressBar(widgets=widgets, + maxval=local_size).start() + count = 0 + stall_count = 0 + url = "http://127.0.0.1:{}/{}".format(clustered_port, db) + while count < local_docs: + try: + req = requests.get(url, auth=creds) + req.raise_for_status() + req = _tojson(req) + # here, cluster means clustered port, i.e. port 5984 + clus_count = req['doc_count'] + clus_size = req['data_size'] + except requests.exceptions.HTTPError as exc: + if exc.response.status_code == 404: + clus_count = 0 + clus_size = 0 + else: + raise Exception('Cannot retrieve {} doc_count!'.format(db)) + if count == clus_count: + stall_count += 1 + else: + stall_count = 0 + if stall_count == timeout: + if not quiet: + print( + "Replication is stalled. Increase timeout or reduce load.") + exit(1) + if HAVE_BAR and not hide_progress_bar and not quiet: + if clus_size > local_size: + clus_size = local_size + progbar.update(clus_size) + count = clus_count + time.sleep(1) + if HAVE_BAR and not hide_progress_bar and not quiet: + progbar.finish() + return 0 + +def _put_filter(args, db=None): + """Adds _design/repl_filters tombstone replication filter to DB.""" + ddoc = { + '_id': '_design/repl_filters', + 'filters': { + 'no_deleted': 'function(doc,req){return !doc._deleted;};' + } + } + try: + req = requests.get( + 'http://127.0.0.1:{}/{}/_design/repl_filters'.format( + args['local_port'], db), + auth=args['creds']) + req.raise_for_status() + doc = _tojson(req) + del doc['_rev'] + if doc != ddoc: + if not args['quiet']: + print('Source replication filter does not match! Aborting.') + exit(1) + except requests.exceptions.HTTPError as exc: + if exc.response.status_code == 404: + if not args['quiet']: + print('Adding replication filter to source database...') + req = requests.put( + 'http://127.0.0.1:{}/{}/_design/repl_filters'.format( + args['local_port'], db), + data=json.dumps(ddoc), + auth=args['creds']) + req.raise_for_status() + elif not args['quiet']: + print(exc.response.text) + exit(1) + +def _replicate(args): + args = _args(args) + if args['all_dbs']: + dbs = _do_list(args) + else: + dbs = args['dbs'] + + for db in dbs: + if args['filter_deleted']: + _put_filter(args, db) + + if not args['quiet']: + print('Starting replication for ' + db + '...') + db = quote(db, safe='') + doc = { + 'continuous': False, + 'create_target': True, + 'source': { + 'url': 'http://127.0.0.1:{}/{}'.format( + args['local_port'], db) + }, + 'target': { + 'url': 'http://127.0.0.1:{}/{}'.format( + args['clustered_port'], db) + } + } + if args['filter_deleted']: + doc['filter'] = 'repl_filters/no_deleted' + if args['creds']: + auth = 'Basic ' + base64.b64encode(':'.join(args['creds'])) + headers = { + 'authorization': auth + } + doc['source']['headers'] = headers + doc['target']['headers'] = headers + watch_args = {y: args[y] for y in [ + 'local_port', 'clustered_port', 'creds', 'hide_progress_bar', + 'timeout', 'quiet']} + watch_args['db'] = db + watch = threading.Thread(target=_watch_replication, kwargs=watch_args) + watch.start() + try: + req = requests.post('http://127.0.0.1:{}/_replicate'.format( + args['clustered_port']), + auth=args['creds'], + data=json.dumps(doc), + headers={'Content-type': 'application/json'}) + req.raise_for_status() + req = _tojson(req) + except requests.exceptions.HTTPError as exc: + if not args['quiet']: + print(exc.response.text) + exit(1) + watch.join() + if req.get('no_changes'): + if not args['quiet']: + print("No changes, replication is caught up.") + if not args['quiet']: + print("Replication complete.") + +def _rebuild(args): + args = _args(args) + if args['all_dbs']: + if args['views']: + if not args['quiet']: + print("Cannot take list of views for more than 1 database.") + exit(1) + args['clustered'] = True + dbs = _do_list(args) + else: + dbs = [args['db']] + for db in dbs: + if args['views']: + views = args['views'] + else: + try: + req = requests.get('http://127.0.0.1:{}/{}/_all_docs'.format( + args['clustered_port'], db), + params={ + 'start_key': '"_design/"', + 'end_key': '"_design0"' + }, + auth=args['creds']) + req.raise_for_status() + req = _tojson(req) + except requests.exceptions.HTTPError as exc: + if not args['quiet']: + print(exc.response.text) + exit(1) + req = req['rows'] + ddocs = [x['id'].split('/')[1] for x in req] + for ddoc in ddocs: + try: + req = requests.get('http://127.0.0.1:{}/{}/_design/{}'.format( + args['clustered_port'], db, ddoc), + auth=args['creds']) + req.raise_for_status() + doc = _tojson(req) + except requests.exceptions.HTTPError as exc: + if not args['quiet']: + print(exc.response.text) + exit(1) + if 'views' not in doc: + if not args['quiet']: + print("Skipping {}/{}, no views found".format(db, ddoc)) + continue + # only need to refresh a single view per ddoc + if not args['quiet']: + print("Refreshing views in {}/{}...".format(db, ddoc)) + view = list(doc['views'].keys())[0] + try: + req = requests.get( + 'http://127.0.0.1:{}/{}/_design/{}/_view/{}'.format( + args['clustered_port'], db, ddoc, view), + params={'limit': 1}, + auth=args['creds'], + timeout=args['timeout']) + except requests.exceptions.Timeout: + if not args['quiet']: + print("Timeout, view is processing. Moving on.") + except requests.exceptions.HTTPError as exc: + if not args['quiet']: + print(exc.response.text) + exit(1) + +def _delete(args): + args = _args(args) + if args['all_dbs']: + args['include_system_dbs'] = False + dbs = _do_list(args) + else: + dbs = args['dbs'] + for db in dbs: + db = quote(db, safe='') + local_url = 'http://127.0.0.1:{}/{}'.format(args['local_port'], db) + clus_url = 'http://127.0.0.1:{}/{}'.format(args['clustered_port'], db) + try: + req = requests.get(local_url, auth=args['creds']) + req.raise_for_status() + req = _tojson(req) + local_docs = req['doc_count'] + req = requests.get(clus_url, auth=args['creds']) + req.raise_for_status() + req = _tojson(req) + clus_docs = req['doc_count'] + if clus_docs < local_docs and not args['force']: + if not args['quiet']: + print('Clustered DB has less docs than local version!' + + ' Skipping...') + continue + if not args['quiet']: + print('Deleting ' + db + '...') + req = requests.delete('http://127.0.0.1:{}/{}'.format( + args['local_port'], db), + auth=args['creds']) + req.raise_for_status() + except requests.exceptions.HTTPError as exc: + if not args['quiet']: + print(exc.response.text) + exit(1) + +def main(argv): + """Kindly do the needful.""" + parser = argparse.ArgumentParser(prog='couchup', + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent('''\ + Migrate CouchDB 1.x databases to CouchDB 2.x. + + Specify a subcommand and -h or --help for more help. + ''')) + + subparsers = parser.add_subparsers() + + parser_list = subparsers.add_parser('list', + help='lists all CouchDB 1.x databases', + formatter_class=argparse.RawTextHelpFormatter, + description=textwrap.dedent('''\ + Examples: + couchup list + couchup list -c -i -p mysecretpassword + ''')) + parser_list.add_argument('-c', '--clustered', action='store_true', + help='show clustered (2.x) databases instead') + parser_list.add_argument('-i', '--include-system-dbs', + action='store_true', + help='include system databases (_users, _replicator, etc.)') + parser_list.add_argument('-l', '--login', default='admin', + help='specify login (default admin)') + parser_list.add_argument('-p', '--password', + help='specify password') + parser_list.add_argument('--local-port', default=5986, + help='override local port (default 5986)') + parser_list.add_argument('--clustered-port', default=5984, + help='override clustered port (default 5984)') + parser_list.set_defaults(func=_list) + + parser_replicate = subparsers.add_parser('replicate', + help='replicates one or more 1.x databases to CouchDB 2.x', + formatter_class=argparse.RawTextHelpFormatter, + description=textwrap.dedent('''\ + Examples: + couchup replicate movies + couchup replicate -f lots_of_deleted_docs_db + couchup replicate -i -q -n _users + + Note: + The -f/--filter-deleted option adds a replication filter + to the source database, _design/repl_filters, that + is used during replication to filter out deleted + documents. This can greatly reduce the size of your + 2.x database if there are many deleted documents. + + It is IMPORTANT that no documents be deleted from the 1.x + database during this process, or those deletions may not + successfully replicate to the 2.x database. + ''')) + parser_replicate.add_argument('-a', '--all_dbs', action='store_true', + help='act on all databases available') + parser_replicate.add_argument('-i', '--include-system-dbs', + action='store_true', + help='include system databases (_users, _replicator, etc.)') + parser_replicate.add_argument('-q', '--quiet', action='store_true', + help='suppress all output') + parser_replicate.add_argument('-n', '--hide-progress-bar', + action='store_true', + help='suppress progress bar display') + parser_replicate.add_argument('-f', '--filter-deleted', + action='store_true', + help='filter deleted document tombstones during replication') + parser_replicate.add_argument('-t', '--timeout', default=30, + help='stalled replication timeout threshhold in s (def: 30)') + parser_replicate.add_argument('-l', '--login', default='admin', + help='specify login (default admin)') + parser_replicate.add_argument('-p', '--password', + help='specify password') + parser_replicate.add_argument('--local-port', default=5986, + help='override local port (default 5986)') + parser_replicate.add_argument('--clustered-port', default=5984, + help='override clustered port (default 5984)') + parser_replicate.add_argument('dbs', metavar='db', type=str, nargs="*", + help="database(s) to be processed") + parser_replicate.set_defaults(func=_replicate) + + parser_rebuild = subparsers.add_parser('rebuild', + help='rebuilds one or more CouchDB 2.x views', + formatter_class=argparse.RawTextHelpFormatter, + description=textwrap.dedent('''\ + Examples: + couchup rebuild movies + couchup rebuild movies by_name + couchup rebuild -a -q -p mysecretpassword + ''')) + parser_rebuild.add_argument('-a', '--all-dbs', action='store_true', + help='act on all databases available') + parser_rebuild.add_argument('-q', '--quiet', action='store_true', + help='suppress all output') + parser_rebuild.add_argument('-t', '--timeout', default=5, + help='timeout for waiting for view rebuild in s (default: 5)') + parser_rebuild.add_argument('-i', '--include-system-dbs', + action='store_true', + help='include system databases (_users, _replicator, etc.)') + parser_rebuild.add_argument('-l', '--login', default='admin', + help='specify login (default admin)') + parser_rebuild.add_argument('-p', '--password', + help='specify password') + parser_rebuild.add_argument('--local-port', default=5986, + help='override local port (default 5986)') + parser_rebuild.add_argument('--clustered-port', default=5984, + help='override clustered port (default 5984)') + parser_rebuild.add_argument('db', metavar='db', type=str, nargs="?", + help="database to be processed") + parser_rebuild.add_argument('views', metavar='view', type=str, nargs="*", + help="view(s) to be processed (all by default)") + parser_rebuild.set_defaults(func=_rebuild) + + parser_delete = subparsers.add_parser('delete', + help='deletes one or more CouchDB 1.x databases', + formatter_class=argparse.RawTextHelpFormatter, + description=textwrap.dedent('''\ + Examples: + couchup delete movies + couchup delete -q -p mysecretpassword movies + ''')) + parser_delete.add_argument('-a', '--all-dbs', action='store_true', + help='act on all databases available') + parser_delete.add_argument('-f', '--force', action='store_true', + help='force deletion even if 1.x and 2.x databases are not identical') + parser_delete.add_argument('-q', '--quiet', action='store_true', + help='suppress all output') + parser_delete.add_argument('-l', '--login', default='admin', + help='specify login (default admin)') + parser_delete.add_argument('-p', '--password', + help='specify password') + parser_delete.add_argument('--local-port', default=5986, + help='override local port (default 5986)') + parser_delete.add_argument('--clustered-port', default=5984, + help='override clustered port (default 5984)') + parser_delete.add_argument('dbs', metavar='db', type=str, nargs="*", + help="database(s) to be processed") + parser_delete.set_defaults(func=_delete) + + args = parser.parse_args(argv[1:]) + args.func(args) + +if __name__ == '__main__': + main(sys.argv) -- cgit v1.2.1