summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoan Touzet <joant@atypical.net>2017-04-19 17:26:56 -0400
committerJoan Touzet <joant@atypical.net>2017-04-23 15:33:59 -0400
commitb05b17225e5de0b6d979c88b41a57edfade564dc (patch)
tree40a25671ca3cb0c071904c8a6f8105fa4a068ea9
parentc3ff4086b26067a253f5f2c2ecc7cbee23d62f95 (diff)
downloadcouchdb-feat-couchup.tar.gz
New couchup 1.x -> 2.x database migration toolfeat-couchup
This commit adds a new Python-based database migration tool, couchup. It is intended to be used at the command-line on the server being upgraded, before bringing the node (or cluster) into service. couchup provides 4 subcommands to assist in the migration process: * list - lists all CouchDB 1.x databases * replicate - replicates one or more 1.x databases to CouchDB 2.x * rebuild - rebuilds one or more CouchDB 2.x views * delete - deletes one or more CouchDB 1.x databases A typical workflow for a single-node upgrade process would look like: ```sh $ couchdb list $ couchdb replicate -a $ couchdb rebuild -a $ couchdb delete -a ``` A clustered upgrade process would be the same, but must be preceded by setting up all the nodes in the cluster first. Various optional arguments provide for admin login/password, overriding ports, quiet mode and so on. Of special note is that `couchup rebuild` supports an optional flag, `-f`, to filter deleted documents during the replication process. I struggled some with the naming convention. For those in the know, a '1.x database' is a node-local database appearing only on port 5986, and a '2.x database' is a clustered database appearing on port 5984, and in raw, sharded form on port 5986.
-rwxr-xr-xrel/overlay/bin/couchup480
1 files changed, 480 insertions, 0 deletions
diff --git a/rel/overlay/bin/couchup b/rel/overlay/bin/couchup
new file mode 100755
index 000000000..858ccc836
--- /dev/null
+++ b/rel/overlay/bin/couchup
@@ -0,0 +1,480 @@
+#!/usr/bin/env python
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import argparse
+import base64
+import json
+import textwrap
+import threading
+import time
+import sys
+try:
+ from urllib import quote
+except ImportError:
+ from urllib.parse import quote
+import requests
+try:
+ import progressbar
+ HAVE_BAR = True
+except ImportError:
+ HAVE_BAR = False
+
+def _tojson(req):
+ """Support requests v0.x as well as 1.x+"""
+ if requests.__version__[0] == '0':
+ return json.loads(req.content)
+ return req.json()
+
+def _args(args):
+ args = vars(args)
+ if args['password']:
+ args['creds'] = (args['login'], args['password'])
+ else:
+ args['creds'] = None
+ return args
+
+def _do_list(args):
+ port = str(args['local_port'])
+ req = requests.get('http://127.0.0.1:' + port + '/_all_dbs',
+ auth=args['creds'])
+ req.raise_for_status()
+ dbs = _tojson(req)
+ local_dbs = [x for x in dbs if "shards" not in x
+ and x not in ['_dbs', '_nodes']]
+ clustered_dbs = list(set(
+ [x.split('/')[2].split('.')[0] for x in dbs if "shards" in x]
+ ))
+ if not args['include_system_dbs']:
+ # list comprehension to eliminate dbs starting with underscore
+ local_dbs = [x for x in local_dbs if x[0] != '_']
+ clustered_dbs = [x for x in clustered_dbs if x[0] != '_']
+ local_dbs.sort()
+ clustered_dbs.sort()
+ if args.get('clustered'):
+ return clustered_dbs
+ return local_dbs
+
+def _list(args):
+ args = _args(args)
+ ret = _do_list(args)
+ print(", ".join(ret))
+
+def _watch_replication(db,
+ local_port=5986,
+ clustered_port=5984,
+ creds=None,
+ hide_progress_bar=False,
+ quiet=False,
+ timeout=30):
+ """Watches replication, optionally with a progressbar."""
+ time.sleep(1)
+ if not quiet:
+ print("Replication started.")
+ url = "http://127.0.0.1:{}/{}".format(local_port, db)
+ try:
+ req = requests.get(url, auth=creds)
+ req.raise_for_status()
+ req = _tojson(req)
+ # here, local means node-local, i.e. source (1.x) database
+ local_docs = req['doc_count']
+ local_size = req['data_size']
+ except requests.exceptions.HTTPError:
+ raise Exception('Cannot retrieve {} doc_count!'.format(db))
+ if local_size == 0:
+ return
+ if HAVE_BAR and not hide_progress_bar and not quiet:
+ widgets = [
+ db,
+ ' ', progressbar.Percentage(),
+ ' ', progressbar.Bar(marker=progressbar.RotatingMarker()),
+ ' ', progressbar.ETA(),
+ ' ', progressbar.FileTransferSpeed(),
+ ]
+ progbar = progressbar.ProgressBar(widgets=widgets,
+ maxval=local_size).start()
+ count = 0
+ stall_count = 0
+ url = "http://127.0.0.1:{}/{}".format(clustered_port, db)
+ while count < local_docs:
+ try:
+ req = requests.get(url, auth=creds)
+ req.raise_for_status()
+ req = _tojson(req)
+ # here, cluster means clustered port, i.e. port 5984
+ clus_count = req['doc_count']
+ clus_size = req['data_size']
+ except requests.exceptions.HTTPError as exc:
+ if exc.response.status_code == 404:
+ clus_count = 0
+ clus_size = 0
+ else:
+ raise Exception('Cannot retrieve {} doc_count!'.format(db))
+ if count == clus_count:
+ stall_count += 1
+ else:
+ stall_count = 0
+ if stall_count == timeout:
+ if not quiet:
+ print(
+ "Replication is stalled. Increase timeout or reduce load.")
+ exit(1)
+ if HAVE_BAR and not hide_progress_bar and not quiet:
+ if clus_size > local_size:
+ clus_size = local_size
+ progbar.update(clus_size)
+ count = clus_count
+ time.sleep(1)
+ if HAVE_BAR and not hide_progress_bar and not quiet:
+ progbar.finish()
+ return 0
+
+def _put_filter(args, db=None):
+ """Adds _design/repl_filters tombstone replication filter to DB."""
+ ddoc = {
+ '_id': '_design/repl_filters',
+ 'filters': {
+ 'no_deleted': 'function(doc,req){return !doc._deleted;};'
+ }
+ }
+ try:
+ req = requests.get(
+ 'http://127.0.0.1:{}/{}/_design/repl_filters'.format(
+ args['local_port'], db),
+ auth=args['creds'])
+ req.raise_for_status()
+ doc = _tojson(req)
+ del doc['_rev']
+ if doc != ddoc:
+ if not args['quiet']:
+ print('Source replication filter does not match! Aborting.')
+ exit(1)
+ except requests.exceptions.HTTPError as exc:
+ if exc.response.status_code == 404:
+ if not args['quiet']:
+ print('Adding replication filter to source database...')
+ req = requests.put(
+ 'http://127.0.0.1:{}/{}/_design/repl_filters'.format(
+ args['local_port'], db),
+ data=json.dumps(ddoc),
+ auth=args['creds'])
+ req.raise_for_status()
+ elif not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+
+def _replicate(args):
+ args = _args(args)
+ if args['all_dbs']:
+ dbs = _do_list(args)
+ else:
+ dbs = args['dbs']
+
+ for db in dbs:
+ if args['filter_deleted']:
+ _put_filter(args, db)
+
+ if not args['quiet']:
+ print('Starting replication for ' + db + '...')
+ db = quote(db, safe='')
+ doc = {
+ 'continuous': False,
+ 'create_target': True,
+ 'source': {
+ 'url': 'http://127.0.0.1:{}/{}'.format(
+ args['local_port'], db)
+ },
+ 'target': {
+ 'url': 'http://127.0.0.1:{}/{}'.format(
+ args['clustered_port'], db)
+ }
+ }
+ if args['filter_deleted']:
+ doc['filter'] = 'repl_filters/no_deleted'
+ if args['creds']:
+ auth = 'Basic ' + base64.b64encode(':'.join(args['creds']))
+ headers = {
+ 'authorization': auth
+ }
+ doc['source']['headers'] = headers
+ doc['target']['headers'] = headers
+ watch_args = {y: args[y] for y in [
+ 'local_port', 'clustered_port', 'creds', 'hide_progress_bar',
+ 'timeout', 'quiet']}
+ watch_args['db'] = db
+ watch = threading.Thread(target=_watch_replication, kwargs=watch_args)
+ watch.start()
+ try:
+ req = requests.post('http://127.0.0.1:{}/_replicate'.format(
+ args['clustered_port']),
+ auth=args['creds'],
+ data=json.dumps(doc),
+ headers={'Content-type': 'application/json'})
+ req.raise_for_status()
+ req = _tojson(req)
+ except requests.exceptions.HTTPError as exc:
+ if not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+ watch.join()
+ if req.get('no_changes'):
+ if not args['quiet']:
+ print("No changes, replication is caught up.")
+ if not args['quiet']:
+ print("Replication complete.")
+
+def _rebuild(args):
+ args = _args(args)
+ if args['all_dbs']:
+ if args['views']:
+ if not args['quiet']:
+ print("Cannot take list of views for more than 1 database.")
+ exit(1)
+ args['clustered'] = True
+ dbs = _do_list(args)
+ else:
+ dbs = [args['db']]
+ for db in dbs:
+ if args['views']:
+ views = args['views']
+ else:
+ try:
+ req = requests.get('http://127.0.0.1:{}/{}/_all_docs'.format(
+ args['clustered_port'], db),
+ params={
+ 'start_key': '"_design/"',
+ 'end_key': '"_design0"'
+ },
+ auth=args['creds'])
+ req.raise_for_status()
+ req = _tojson(req)
+ except requests.exceptions.HTTPError as exc:
+ if not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+ req = req['rows']
+ ddocs = [x['id'].split('/')[1] for x in req]
+ for ddoc in ddocs:
+ try:
+ req = requests.get('http://127.0.0.1:{}/{}/_design/{}'.format(
+ args['clustered_port'], db, ddoc),
+ auth=args['creds'])
+ req.raise_for_status()
+ doc = _tojson(req)
+ except requests.exceptions.HTTPError as exc:
+ if not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+ if 'views' not in doc:
+ if not args['quiet']:
+ print("Skipping {}/{}, no views found".format(db, ddoc))
+ continue
+ # only need to refresh a single view per ddoc
+ if not args['quiet']:
+ print("Refreshing views in {}/{}...".format(db, ddoc))
+ view = list(doc['views'].keys())[0]
+ try:
+ req = requests.get(
+ 'http://127.0.0.1:{}/{}/_design/{}/_view/{}'.format(
+ args['clustered_port'], db, ddoc, view),
+ params={'limit': 1},
+ auth=args['creds'],
+ timeout=args['timeout'])
+ except requests.exceptions.Timeout:
+ if not args['quiet']:
+ print("Timeout, view is processing. Moving on.")
+ except requests.exceptions.HTTPError as exc:
+ if not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+
+def _delete(args):
+ args = _args(args)
+ if args['all_dbs']:
+ args['include_system_dbs'] = False
+ dbs = _do_list(args)
+ else:
+ dbs = args['dbs']
+ for db in dbs:
+ db = quote(db, safe='')
+ local_url = 'http://127.0.0.1:{}/{}'.format(args['local_port'], db)
+ clus_url = 'http://127.0.0.1:{}/{}'.format(args['clustered_port'], db)
+ try:
+ req = requests.get(local_url, auth=args['creds'])
+ req.raise_for_status()
+ req = _tojson(req)
+ local_docs = req['doc_count']
+ req = requests.get(clus_url, auth=args['creds'])
+ req.raise_for_status()
+ req = _tojson(req)
+ clus_docs = req['doc_count']
+ if clus_docs < local_docs and not args['force']:
+ if not args['quiet']:
+ print('Clustered DB has less docs than local version!' +
+ ' Skipping...')
+ continue
+ if not args['quiet']:
+ print('Deleting ' + db + '...')
+ req = requests.delete('http://127.0.0.1:{}/{}'.format(
+ args['local_port'], db),
+ auth=args['creds'])
+ req.raise_for_status()
+ except requests.exceptions.HTTPError as exc:
+ if not args['quiet']:
+ print(exc.response.text)
+ exit(1)
+
+def main(argv):
+ """Kindly do the needful."""
+ parser = argparse.ArgumentParser(prog='couchup',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=textwrap.dedent('''\
+ Migrate CouchDB 1.x databases to CouchDB 2.x.
+
+ Specify a subcommand and -h or --help for more help.
+ '''))
+
+ subparsers = parser.add_subparsers()
+
+ parser_list = subparsers.add_parser('list',
+ help='lists all CouchDB 1.x databases',
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=textwrap.dedent('''\
+ Examples:
+ couchup list
+ couchup list -c -i -p mysecretpassword
+ '''))
+ parser_list.add_argument('-c', '--clustered', action='store_true',
+ help='show clustered (2.x) databases instead')
+ parser_list.add_argument('-i', '--include-system-dbs',
+ action='store_true',
+ help='include system databases (_users, _replicator, etc.)')
+ parser_list.add_argument('-l', '--login', default='admin',
+ help='specify login (default admin)')
+ parser_list.add_argument('-p', '--password',
+ help='specify password')
+ parser_list.add_argument('--local-port', default=5986,
+ help='override local port (default 5986)')
+ parser_list.add_argument('--clustered-port', default=5984,
+ help='override clustered port (default 5984)')
+ parser_list.set_defaults(func=_list)
+
+ parser_replicate = subparsers.add_parser('replicate',
+ help='replicates one or more 1.x databases to CouchDB 2.x',
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=textwrap.dedent('''\
+ Examples:
+ couchup replicate movies
+ couchup replicate -f lots_of_deleted_docs_db
+ couchup replicate -i -q -n _users
+
+ Note:
+ The -f/--filter-deleted option adds a replication filter
+ to the source database, _design/repl_filters, that
+ is used during replication to filter out deleted
+ documents. This can greatly reduce the size of your
+ 2.x database if there are many deleted documents.
+
+ It is IMPORTANT that no documents be deleted from the 1.x
+ database during this process, or those deletions may not
+ successfully replicate to the 2.x database.
+ '''))
+ parser_replicate.add_argument('-a', '--all_dbs', action='store_true',
+ help='act on all databases available')
+ parser_replicate.add_argument('-i', '--include-system-dbs',
+ action='store_true',
+ help='include system databases (_users, _replicator, etc.)')
+ parser_replicate.add_argument('-q', '--quiet', action='store_true',
+ help='suppress all output')
+ parser_replicate.add_argument('-n', '--hide-progress-bar',
+ action='store_true',
+ help='suppress progress bar display')
+ parser_replicate.add_argument('-f', '--filter-deleted',
+ action='store_true',
+ help='filter deleted document tombstones during replication')
+ parser_replicate.add_argument('-t', '--timeout', default=30,
+ help='stalled replication timeout threshhold in s (def: 30)')
+ parser_replicate.add_argument('-l', '--login', default='admin',
+ help='specify login (default admin)')
+ parser_replicate.add_argument('-p', '--password',
+ help='specify password')
+ parser_replicate.add_argument('--local-port', default=5986,
+ help='override local port (default 5986)')
+ parser_replicate.add_argument('--clustered-port', default=5984,
+ help='override clustered port (default 5984)')
+ parser_replicate.add_argument('dbs', metavar='db', type=str, nargs="*",
+ help="database(s) to be processed")
+ parser_replicate.set_defaults(func=_replicate)
+
+ parser_rebuild = subparsers.add_parser('rebuild',
+ help='rebuilds one or more CouchDB 2.x views',
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=textwrap.dedent('''\
+ Examples:
+ couchup rebuild movies
+ couchup rebuild movies by_name
+ couchup rebuild -a -q -p mysecretpassword
+ '''))
+ parser_rebuild.add_argument('-a', '--all-dbs', action='store_true',
+ help='act on all databases available')
+ parser_rebuild.add_argument('-q', '--quiet', action='store_true',
+ help='suppress all output')
+ parser_rebuild.add_argument('-t', '--timeout', default=5,
+ help='timeout for waiting for view rebuild in s (default: 5)')
+ parser_rebuild.add_argument('-i', '--include-system-dbs',
+ action='store_true',
+ help='include system databases (_users, _replicator, etc.)')
+ parser_rebuild.add_argument('-l', '--login', default='admin',
+ help='specify login (default admin)')
+ parser_rebuild.add_argument('-p', '--password',
+ help='specify password')
+ parser_rebuild.add_argument('--local-port', default=5986,
+ help='override local port (default 5986)')
+ parser_rebuild.add_argument('--clustered-port', default=5984,
+ help='override clustered port (default 5984)')
+ parser_rebuild.add_argument('db', metavar='db', type=str, nargs="?",
+ help="database to be processed")
+ parser_rebuild.add_argument('views', metavar='view', type=str, nargs="*",
+ help="view(s) to be processed (all by default)")
+ parser_rebuild.set_defaults(func=_rebuild)
+
+ parser_delete = subparsers.add_parser('delete',
+ help='deletes one or more CouchDB 1.x databases',
+ formatter_class=argparse.RawTextHelpFormatter,
+ description=textwrap.dedent('''\
+ Examples:
+ couchup delete movies
+ couchup delete -q -p mysecretpassword movies
+ '''))
+ parser_delete.add_argument('-a', '--all-dbs', action='store_true',
+ help='act on all databases available')
+ parser_delete.add_argument('-f', '--force', action='store_true',
+ help='force deletion even if 1.x and 2.x databases are not identical')
+ parser_delete.add_argument('-q', '--quiet', action='store_true',
+ help='suppress all output')
+ parser_delete.add_argument('-l', '--login', default='admin',
+ help='specify login (default admin)')
+ parser_delete.add_argument('-p', '--password',
+ help='specify password')
+ parser_delete.add_argument('--local-port', default=5986,
+ help='override local port (default 5986)')
+ parser_delete.add_argument('--clustered-port', default=5984,
+ help='override clustered port (default 5984)')
+ parser_delete.add_argument('dbs', metavar='db', type=str, nargs="*",
+ help="database(s) to be processed")
+ parser_delete.set_defaults(func=_delete)
+
+ args = parser.parse_args(argv[1:])
+ args.func(args)
+
+if __name__ == '__main__':
+ main(sys.argv)