#!/usr/bin/python -u # Copyright (c) 2010-2012 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import signal import socket import logging from errno import EEXIST, ENOENT from hashlib import md5 from optparse import OptionParser, SUPPRESS_HELP from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join, \ sep as os_path_sep from random import shuffle from sys import argv, exit, stderr, stdout from time import sleep, time, gmtime, strftime from urllib import quote, unquote try: import simplejson as json except ImportError: import json from swiftclient import Connection, RequestException from swiftclient import command_helpers from swiftclient.utils import config_true_value, prt_bytes from swiftclient.multithreading import MultiThreadingManager from swiftclient.exceptions import ClientException from swiftclient import __version__ as client_version def get_conn(options): """ Return a connection building it from the options. """ return Connection(options.auth, options.user, options.key, options.retries, auth_version=options.auth_version, os_options=options.os_options, snet=options.snet, cacert=options.os_cacert, insecure=options.insecure, ssl_compression=options.ssl_compression) def mkdirs(path): try: makedirs(path) except OSError as err: if err.errno != EEXIST: raise def immediate_exit(signum, frame): stderr.write(" Aborted\n") os_exit(2) st_delete_options = '''[-all] [--leave-segments] [--object-threads ] [--container-threads ] [object] ''' st_delete_help = ''' Delete a container or objects within a container Positional arguments: Name of container to delete from [object] Name of object to delete. Specify multiple times for multiple objects Optional arguments: --all Delete all containers and objects --leave-segments Do not delete segments of manifest objects --object-threads Number of threads to use for deleting objects. Default is 10 --container-threads Number of threads to use for deleting containers. Default is 10 '''.strip("\n") def st_delete(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Delete all containers and objects.') parser.add_option( '', '--leave-segments', action='store_true', dest='leave_segments', default=False, help='Do not delete segments of manifest objects.') parser.add_option( '', '--object-threads', type=int, default=10, help='Number of threads to use for deleting objects. ' 'Default is 10') parser.add_option('', '--container-threads', type=int, default=10, help='Number of threads to use for ' 'deleting containers. ' 'Default is 10') (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): thread_manager.error('Usage: %s delete %s\n%s', basename(argv[0]), st_delete_options, st_delete_help) return def _delete_segment((container, obj), conn): conn.delete_object(container, obj) if options.verbose: if conn.attempts > 2: thread_manager.print_msg( '%s/%s [after %d attempts]', container, obj, conn.attempts) else: thread_manager.print_msg('%s/%s', container, obj) def _delete_object((container, obj), conn): try: old_manifest = None query_string = None if not options.leave_segments: try: headers = conn.head_object(container, obj) old_manifest = headers.get('x-object-manifest') if config_true_value( headers.get('x-static-large-object')): query_string = 'multipart-manifest=delete' except ClientException as err: if err.http_status != 404: raise conn.delete_object(container, obj, query_string=query_string) if old_manifest: segment_manager = thread_manager.queue_manager( _delete_segment, options.object_threads, connection_maker=create_connection) segment_queue = segment_manager.queue scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) sprefix = unquote(sprefix).rstrip('/') + '/' for delobj in conn.get_container(scontainer, prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) if not segment_queue.empty(): with segment_manager: pass if options.verbose: path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): path = path[1:] if conn.attempts > 1: thread_manager.print_msg('%s [after %d attempts]', path, conn.attempts) else: thread_manager.print_msg(path) except ClientException as err: if err.http_status != 404: raise thread_manager.error("Object '%s/%s' not found", container, obj) def _delete_container(container, conn, object_queue): try: marker = '' while True: objects = [o['name'] for o in conn.get_container(container, marker=marker)[1]] if not objects: break for obj in objects: object_queue.put((container, obj)) marker = objects[-1] while not object_queue.empty(): sleep(0.05) attempts = 1 while True: try: conn.delete_container(container) break except ClientException as err: if err.http_status != 409: raise if attempts > 10: raise attempts += 1 sleep(1) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) obj_manager = thread_manager.queue_manager( _delete_object, options.object_threads, connection_maker=create_connection) with obj_manager as object_queue: cont_manager = thread_manager.queue_manager( _delete_container, options.container_threads, object_queue, connection_maker=create_connection) with cont_manager as container_queue: if not args: conn = create_connection() try: marker = '' while True: containers = [ c['name'] for c in conn.get_account(marker=marker)[1]] if not containers: break for container in containers: container_queue.put(container) marker = containers[-1] except ClientException as err: if err.http_status != 404: raise thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you ' \ 'might have meant %r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0]) container_queue.put(args[0]) else: for obj in args[1:]: object_queue.put((args[0], obj)) st_download_options = '''[--all] [--marker] [--prefix ] [--output ] [--object-threads ] [--container-threads ] [--no-download] [object] ''' st_download_help = ''' Download objects from containers Positional arguments: Name of container to download from. To download a whole account, omit this and specify --all. [object] Name of object to download. Specify multiple times for multiple objects. Omit this to download all objects from the container. Optional arguments: --all Indicates that you really want to download everything in the account --marker Marker to use when starting a container or account download --prefix Only download items beginning with --output For a single file download, stream the output to . Specifying "-" as will redirect to stdout --object-threads Number of threads to use for downloading objects. Default is 10 --container-threads Number of threads to use for downloading containers. Default is 10 --no-download Perform download(s), but don't actually write anything to disk --header Adds a customized request header to the query, like "Range" or "If-Match". This argument is repeatable. Example --header "content-type:text/plain" --skip-identical Skip downloading files that are identical on both sides '''.strip("\n") def st_download(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to download ' 'everything in the account') parser.add_option( '-m', '--marker', dest='marker', default='', help='Marker to use when starting a container or ' 'account download') parser.add_option( '-p', '--prefix', dest='prefix', help='Only download items beginning with the .') parser.add_option( '-o', '--output', dest='out_file', help='For a single ' 'For a single file download, stream the output to . ' 'Specifying "-" as will redirect to stdout.') parser.add_option( '', '--object-threads', type=int, default=10, help='Number of threads to use for downloading objects') parser.add_option( '', '--container-threads', type=int, default=10, help='Number of threads to use for downloading containers') parser.add_option( '', '--no-download', action='store_true', default=False, help="Perform download(s), but don't actually write anything to disk") parser.add_option( '-H', '--header', action='append', dest='header', default=[], help='Adds a customized request header to the query, like "Range" or ' '"If-Match". This argument is repeatable. ' 'Example: --header "content-type:text/plain"') parser.add_option( '--skip-identical', action='store_true', dest='skip_identical', default=False, help='Skip downloading files that are identical on ' 'both sides') (options, args) = parse_args(parser, args) args = args[1:] if options.out_file == '-': options.verbose = 0 if options.out_file and len(args) != 2: exit('-o option only allowed for single file downloads') if (not args and not options.yes_all) or (args and options.yes_all): thread_manager.error('Usage: %s download %s\n%s', basename(argv[0]), st_download_options, st_download_help) return req_headers = split_headers(options.header, '', thread_manager) def _download_object(queue_arg, conn): if len(queue_arg) == 2: container, obj = queue_arg out_file = None elif len(queue_arg) == 3: container, obj, out_file = queue_arg else: raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) path = options.yes_all and join(container, obj) or obj path = path.lstrip(os_path_sep) if options.skip_identical and out_file != '-': filename = out_file if out_file else path try: fp = open(filename, 'rb') except IOError: pass else: with fp: md5sum = md5() while True: data = fp.read(65536) if not data: break md5sum.update(data) req_headers['If-None-Match'] = md5sum.hexdigest() try: start_time = time() headers, body = \ conn.get_object(container, obj, resp_chunk_size=65536, headers=req_headers) headers_receipt = time() content_type = headers.get('content-type') if 'content-length' in headers: content_length = int(headers.get('content-length')) else: content_length = None etag = headers.get('etag') md5sum = None make_dir = not options.no_download and out_file != "-" if content_type.split(';', 1)[0] == 'text/directory': if make_dir and not isdir(path): mkdirs(path) read_length = 0 if 'x-object-manifest' not in headers and \ 'x-static-large-object' not in headers: md5sum = md5() for chunk in body: read_length += len(chunk) if md5sum: md5sum.update(chunk) else: dirpath = dirname(path) if make_dir and dirpath and not isdir(dirpath): mkdirs(dirpath) if not options.no_download: if out_file == "-": fp = stdout elif out_file: fp = open(out_file, 'wb') else: fp = open(path, 'wb') read_length = 0 if 'x-object-manifest' not in headers and \ 'x-static-large-object' not in headers: md5sum = md5() for chunk in body: if not options.no_download: fp.write(chunk) read_length += len(chunk) if md5sum: md5sum.update(chunk) if not options.no_download: fp.close() if md5sum and md5sum.hexdigest() != etag: thread_manager.error('%s: md5sum != etag, %s != %s', path, md5sum.hexdigest(), etag) if content_length is not None and read_length != content_length: thread_manager.error( '%s: read_length != content_length, %d != %d', path, read_length, content_length) if 'x-object-meta-mtime' in headers and not options.out_file \ and not options.no_download: mtime = float(headers['x-object-meta-mtime']) utime(path, (mtime, mtime)) if options.verbose: finish_time = time() auth_time = conn.auth_end_time - start_time headers_receipt = headers_receipt - start_time total_time = finish_time - start_time download_time = total_time - auth_time time_str = ('auth %.3fs, headers %.3fs, total %.3fs, ' '%.3f MB/s' % ( auth_time, headers_receipt, total_time, float(read_length) / download_time / 1000000)) if conn.attempts > 1: thread_manager.print_msg('%s [%s after %d attempts]', path, time_str, conn.attempts) else: thread_manager.print_msg('%s [%s]', path, time_str) except ClientException as err: if err.http_status == 304 and options.skip_identical: thread_manager.print_msg("Skipped identical file '%s'", path) return if err.http_status != 404: raise thread_manager.error("Object '%s/%s' not found", container, obj) def _download_container(queue_arg, conn): if len(queue_arg) == 2: container, object_queue = queue_arg prefix = None elif len(queue_arg) == 3: container, object_queue, prefix = queue_arg else: raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) try: marker = options.marker while True: objects = [ o['name'] for o in conn.get_container(container, marker=marker, prefix=prefix)[1]] if not objects: break marker = objects[-1] shuffle(objects) for obj in objects: object_queue.put((container, obj)) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) obj_manager = thread_manager.queue_manager( _download_object, options.object_threads, connection_maker=create_connection) with obj_manager as object_queue: cont_manager = thread_manager.queue_manager( _download_container, options.container_threads, connection_maker=create_connection) with cont_manager as container_queue: if not args: # --all case conn = create_connection() try: marker = options.marker while True: containers = [ c['name'] for c in conn.get_account( marker=marker, prefix=options.prefix)[1]] if not containers: break marker = containers[-1] shuffle(containers) for container in containers: container_queue.put((container, object_queue)) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, ( 'WARNING: / in container name; you might have meant ' '%r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0])) container_queue.put((args[0], object_queue, options.prefix)) else: if len(args) == 2: obj = args[1] object_queue.put((args[0], obj, options.out_file)) else: for obj in args[1:]: object_queue.put((args[0], obj)) st_list_options = '''[--long] [--lh] [--totals] [--prefix ] [--delimiter ] ''' st_list_help = ''' Lists the containers for the account or the objects for a container Positional arguments: [container] Name of container to list object in Optional arguments: --long Long listing format, similar to ls -l --lh Report sizes in human readable format similar to ls -lh --totals Used with -l or --lh, only report totals --prefix Only list items beginning with the prefix --delimiter Roll up items with the given delimiter. For containers only. See OpenStack Swift API documentation for what this means. '''.strip('\n') def st_list(parser, args, thread_manager): parser.add_option( '-l', '--long', dest='long', action='store_true', default=False, help='Long listing format, similar to ls -l.') parser.add_option( '--lh', dest='human', action='store_true', default=False, help='Report sizes in human readable format ' "similar to ls -lh.") parser.add_option( '-t', '--totals', dest='totals', help='used with -l or --lh, ' 'only report totals', action='store_true', default=False) parser.add_option( '-p', '--prefix', dest='prefix', help='Only list items beginning with the prefix.') parser.add_option( '-d', '--delimiter', dest='delimiter', help='Roll up items with the given delimiter. ' 'For containers only. See OpenStack Swift API documentation for ' 'what this means)') (options, args) = parse_args(parser, args) args = args[1:] if options.delimiter and not args: exit('-d option only allowed for container listings') if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0: thread_manager.error('Usage: %s list %s\n%s', basename(argv[0]), st_list_options, st_list_help) return conn = get_conn(options) try: marker = '' total_count = total_bytes = 0 while True: if not args: items = \ conn.get_account(marker=marker, prefix=options.prefix)[1] else: items = conn.get_container( args[0], marker=marker, prefix=options.prefix, delimiter=options.delimiter)[1] if not items: break for item in items: item_name = item.get('name') if not options.long and not options.human: thread_manager.print_msg( item.get('name', item.get('subdir'))) else: item_bytes = item.get('bytes') total_bytes += item_bytes if len(args) == 0: # listing containers byte_str = prt_bytes(item_bytes, options.human) count = item.get('count') total_count += count try: meta = conn.head_container(item_name) utc = gmtime(float(meta.get('x-timestamp'))) datestamp = strftime('%Y-%m-%d %H:%M:%S', utc) except ClientException: datestamp = '????-??-?? ??:??:??' if not options.totals: thread_manager.print_msg("%5s %s %s %s", count, byte_str, datestamp, item_name) else: # list container contents subdir = item.get('subdir') if subdir is None: byte_str = prt_bytes(item_bytes, options.human) date, xtime = item.get('last_modified').split('T') xtime = xtime.split('.')[0] else: byte_str = prt_bytes(0, options.human) date = xtime = '' item_name = subdir if not options.totals: thread_manager.print_msg("%s %10s %8s %s", byte_str, date, xtime, item_name) marker = items[-1].get('name', items[-1].get('subdir')) # report totals if options.long or options.human: if len(args) == 0: thread_manager.print_msg( "%5s %s", prt_bytes(total_count, True), prt_bytes(total_bytes, options.human)) else: thread_manager.print_msg(prt_bytes(total_bytes, options.human)) except ClientException as err: if err.http_status != 404: raise if not args: thread_manager.error('Account not found') else: thread_manager.error('Container %r not found', args[0]) st_stat_options = '''[--lh] [container] [object] ''' st_stat_help = ''' Displays information for the account, container, or object Positional arguments: [container] Name of container to stat from [object] Name of object to stat. Specify multiple times for multiple objects Optional arguments: --lh Report sizes in human readable format similar to ls -lh '''.strip('\n') def st_stat(parser, args, thread_manager): parser.add_option( '--lh', dest='human', action='store_true', default=False, help='Report sizes in human readable format similar to ls -lh.') (options, args) = parse_args(parser, args) args = args[1:] conn = get_conn(options) if not args: try: command_helpers.stat_account(conn, options, thread_manager) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ 'meant %r instead of %r.' % \ (args[0].replace('/', ' ', 1), args[0]) try: command_helpers.stat_container(conn, options, args, thread_manager) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Container %r not found', args[0]) elif len(args) == 2: try: command_helpers.stat_object(conn, options, args, thread_manager) except ClientException as err: if err.http_status != 404: raise thread_manager.error("Object %s/%s not found", args[0], args[1]) else: thread_manager.error('Usage: %s stat %s\n%s', basename(argv[0]), st_stat_options, st_stat_help) st_post_options = '''[--read-acl ] [--write-acl ] [--sync-to] [--sync-key ] [--meta ] [--header
] [container] [object] ''' st_post_help = ''' Updates meta information for the account, container, or object. If the container is not found, it will be created automatically. Positional arguments: [container] Name of container to post to [object] Name of object to post. Specify multiple times for multiple objects Optional arguments: --read-acl Read ACL for containers. Quick summary of ACL syntax: .r:*, .r:-.example.com, .r:www.example.com, account1, account2:user2 --write-acl Write ACL for containers. Quick summary of ACL syntax: account1 account2:user2 --sync-to Sync To for containers, for multi-cluster replication --sync-key Sync Key for containers, for multi-cluster replication --meta Sets a meta data item. This option may be repeated. Example: -m Color:Blue -m Size:Large --header
Set request headers. This option may be repeated. Example -H "content-type:text/plain" '''.strip('\n') def st_post(parser, args, thread_manager): parser.add_option( '-r', '--read-acl', dest='read_acl', help='Read ACL for containers. ' 'Quick summary of ACL syntax: .r:*, .r:-.example.com, ' '.r:www.example.com, account1, account2:user2') parser.add_option( '-w', '--write-acl', dest='write_acl', help='Write ACL for ' 'containers. Quick summary of ACL syntax: account1, ' 'account2:user2') parser.add_option( '-t', '--sync-to', dest='sync_to', help='Sets the ' 'Sync To for containers, for multi-cluster replication.') parser.add_option( '-k', '--sync-key', dest='sync_key', help='Sets the ' 'Sync Key for containers, for multi-cluster replication.') parser.add_option( '-m', '--meta', action='append', dest='meta', default=[], help='Sets a meta data item. This option may be repeated. ' 'Example: -m Color:Blue -m Size:Large') parser.add_option( '-H', '--header', action='append', dest='header', default=[], help='Set request headers. This option may be repeated. ' 'Example: -H "content-type:text/plain" ' '-H "Content-Length: 4000"') (options, args) = parse_args(parser, args) args = args[1:] if (options.read_acl or options.write_acl or options.sync_to or options.sync_key) and not args: exit('-r, -w, -t, and -k options only allowed for containers') conn = get_conn(options) if not args: headers = split_headers( options.meta, 'X-Account-Meta-', thread_manager) headers.update(split_headers(options.header, '', thread_manager)) try: conn.post_account(headers=headers) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ 'meant %r instead of %r.' % \ (args[0].replace('/', ' ', 1), args[0]) headers = split_headers(options.meta, 'X-Container-Meta-', thread_manager) headers.update(split_headers(options.header, '', thread_manager)) if options.read_acl is not None: headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: headers['X-Container-Write'] = options.write_acl if options.sync_to is not None: headers['X-Container-Sync-To'] = options.sync_to if options.sync_key is not None: headers['X-Container-Sync-Key'] = options.sync_key try: conn.post_container(args[0], headers=headers) except ClientException as err: if err.http_status != 404: raise conn.put_container(args[0], headers=headers) elif len(args) == 2: headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager) # add header options to the headers object for the request. headers.update(split_headers(options.header, '', thread_manager)) try: conn.post_object(args[0], args[1], headers=headers) except ClientException as err: if err.http_status != 404: raise thread_manager.error("Object '%s/%s' not found", args[0], args[1]) else: thread_manager.error('Usage: %s post %s\n%s', basename(argv[0]), st_post_options, st_post_help) st_upload_options = '''[--changed] [--skip-identical] [--segment-size ] [--segment-container ] [--leave-segments] [--object-threads ] [--segment-threads ] [--header
] [--use-slo] [--object-name ] ''' st_upload_help = ''' Uploads specified files and directories to the given container Positional arguments: Name of container to upload to Name of file or directory to upload. Specify multiple times for multiple uploads Optional arguments: --changed Only upload files that have changed since the last upload --skip-identical Skip uploading files that are identical on both sides --segment-size Upload files in segments no larger than and then create a "manifest" file that will download all the segments as if it were the original file --segment-container Upload the segments into the specified container. If not specified, the segments will be uploaded to a _segments container so as to not pollute the main listings. --leave-segments Indicates that you want the older segments of manifest objects left alone (in the case of overwrites) --object-threads Number of threads to use for uploading full objects. Default is 10. --segment-threads Number of threads to use for uploading object segments. Default is 10. --header
Set request headers with the syntax header:value. This option may be repeated. Example -H "content-type:text/plain". --use-slo When used in conjunction with --segment-size will create a Static Large Object instead of the default Dynamic Large Object. --object-name Upload file and name object to or upload dir and use as object prefix instead of folder name '''.strip('\n') def st_upload(parser, args, thread_manager): parser.add_option( '-c', '--changed', action='store_true', dest='changed', default=False, help='Only upload files that have changed since ' 'the last upload.') parser.add_option( '--skip-identical', action='store_true', dest='skip_identical', default=False, help='Skip uploading files that are identical on ' 'both sides') parser.add_option( '-S', '--segment-size', dest='segment_size', help='Upload files ' 'in segments no larger than and then create a "manifest" ' 'file that will download all the segments as if it were the original ' 'file.') parser.add_option( '-C', '--segment-container', dest='segment_container', help='Upload the segments into the specified container. ' 'If not specified, the segments will be uploaded to a ' '_segments container so as to not pollute the main ' ' listings.') parser.add_option( '', '--leave-segments', action='store_true', dest='leave_segments', default=False, help='Indicates that you want ' 'the older segments of manifest objects left alone (in the case of ' 'overwrites)') parser.add_option( '', '--object-threads', type=int, default=10, help='Number of threads to use for uploading full objects. ' 'Default is 10.') parser.add_option( '', '--segment-threads', type=int, default=10, help='Number of threads to use for uploading object segments. ' 'Default is 10.') parser.add_option( '-H', '--header', action='append', dest='header', default=[], help='Set request headers with the syntax header:value. ' ' This option may be repeated. Example -H "content-type:text/plain" ' '-H "Content-Length: 4000"') parser.add_option( '', '--use-slo', action='store_true', default=False, help='When used in conjunction with --segment-size will ' 'create a Static Large Object instead of the default ' 'Dynamic Large Object.') parser.add_option( '', '--object-name', dest='object_name', help='Upload file and name object to or upload dir and ' 'use as object prefix instead of folder name.') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: thread_manager.error( 'Usage: %s upload %s\n%s', basename(argv[0]), st_upload_options, st_upload_help) return def _segment_job(job, conn): if job.get('delete', False): conn.delete_object(job['container'], job['obj']) else: fp = open(job['path'], 'rb') fp.seek(job['segment_start']) seg_container = args[0] + '_segments' if options.segment_container: seg_container = options.segment_container etag = conn.put_object(job.get('container', seg_container), job['obj'], fp, content_length=job['segment_size']) job['segment_location'] = '/%s/%s' % (seg_container, job['obj']) job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: thread_manager.print_msg('%s [after %d attempts]', job['log_line'], conn.attempts) else: thread_manager.print_msg(job['log_line']) return job def _object_job(job, conn): path = job['path'] container = job.get('container', args[0]) dir_marker = job.get('dir_marker', False) object_name = job['object_name'] try: if object_name is not None: object_name.replace("\\", "/") obj = object_name else: obj = path if obj.startswith('./') or obj.startswith('.\\'): obj = obj[2:] if obj.startswith('/'): obj = obj[1:] put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)} if dir_marker: if options.changed: try: headers = conn.head_object(container, obj) ct = headers.get('content-type') cl = int(headers.get('content-length')) et = headers.get('etag') mt = headers.get('x-object-meta-mtime') if ct.split(';', 1)[0] == 'text/directory' and \ cl == 0 and \ et == 'd41d8cd98f00b204e9800998ecf8427e' and \ mt == put_headers['x-object-meta-mtime']: return except ClientException as err: if err.http_status != 404: raise conn.put_object(container, obj, '', content_length=0, content_type='text/directory', headers=put_headers) else: # We need to HEAD all objects now in case we're overwriting a # manifest object and need to delete the old segments # ourselves. old_manifest = None old_slo_manifest_paths = [] new_slo_manifest_paths = set() if options.changed or options.skip_identical \ or not options.leave_segments: if options.skip_identical: checksum = None try: fp = open(path, 'rb') except IOError: pass else: with fp: md5sum = md5() while True: data = fp.read(65536) if not data: break md5sum.update(data) checksum = md5sum.hexdigest() try: headers = conn.head_object(container, obj) cl = int(headers.get('content-length')) mt = headers.get('x-object-meta-mtime') if (options.skip_identical and checksum == headers.get('etag')): thread_manager.print_msg( "Skipped identical file '%s'", path) return if options.changed and cl == getsize(path) and \ mt == put_headers['x-object-meta-mtime']: return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') if config_true_value( headers.get('x-static-large-object')): headers, manifest_data = conn.get_object( container, obj, query_string='multipart-manifest=get') for old_seg in json.loads(manifest_data): seg_path = old_seg['name'].lstrip('/') if isinstance(seg_path, unicode): seg_path = seg_path.encode('utf-8') old_slo_manifest_paths.append(seg_path) except ClientException as err: if err.http_status != 404: raise # Merge the command line header options to the put_headers put_headers.update(split_headers(options.header, '', thread_manager)) # Don't do segment job if object is not big enough if options.segment_size and \ getsize(path) > int(options.segment_size): seg_container = container + '_segments' if options.segment_container: seg_container = options.segment_container full_size = getsize(path) slo_segments = [] error_counter = [0] segment_manager = thread_manager.queue_manager( _segment_job, options.segment_threads, store_results=slo_segments, error_counter=error_counter, connection_maker=create_connection) with segment_manager as segment_queue: segment = 0 segment_start = 0 while segment_start < full_size: segment_size = int(options.segment_size) if segment_start + segment_size > full_size: segment_size = full_size - segment_start if options.use_slo: segment_name = '%s/slo/%s/%s/%s/%08d' % ( obj, put_headers['x-object-meta-mtime'], full_size, options.segment_size, segment) else: segment_name = '%s/%s/%s/%s/%08d' % ( obj, put_headers['x-object-meta-mtime'], full_size, options.segment_size, segment) segment_queue.put( {'path': path, 'obj': segment_name, 'segment_start': segment_start, 'segment_size': segment_size, 'segment_index': segment, 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size if error_counter[0]: raise ClientException( 'Aborting manifest creation ' 'because not all segments could be uploaded. %s/%s' % (container, obj)) if options.use_slo: slo_segments.sort(key=lambda d: d['segment_index']) for seg in slo_segments: seg_loc = seg['segment_location'].lstrip('/') if isinstance(seg_loc, unicode): seg_loc = seg_loc.encode('utf-8') new_slo_manifest_paths.add(seg_loc) manifest_data = json.dumps([ {'path': d['segment_location'], 'etag': d['segment_etag'], 'size_bytes': d['segment_size']} for d in slo_segments]) put_headers['x-static-large-object'] = 'true' conn.put_object(container, obj, manifest_data, headers=put_headers, query_string='multipart-manifest=put') else: new_object_manifest = '%s/%s/%s/%s/%s/' % ( quote(seg_container), quote(obj), put_headers['x-object-meta-mtime'], full_size, options.segment_size) if old_manifest and old_manifest.rstrip('/') == \ new_object_manifest.rstrip('/'): old_manifest = None put_headers['x-object-manifest'] = new_object_manifest conn.put_object(container, obj, '', content_length=0, headers=put_headers) else: conn.put_object( container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) if old_manifest or old_slo_manifest_paths: segment_manager = thread_manager.queue_manager( _segment_job, options.segment_threads, connection_maker=create_connection) segment_queue = segment_manager.queue if old_manifest: scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) sprefix = unquote(sprefix).rstrip('/') + '/' for delobj in conn.get_container(scontainer, prefix=sprefix)[1]: segment_queue.put( {'delete': True, 'container': scontainer, 'obj': delobj['name']}) if old_slo_manifest_paths: for seg_to_delete in old_slo_manifest_paths: if seg_to_delete in new_slo_manifest_paths: continue scont, sobj = \ seg_to_delete.split('/', 1) segment_queue.put( {'delete': True, 'container': scont, 'obj': sobj}) if not segment_queue.empty(): with segment_manager: pass if options.verbose: if conn.attempts > 1: thread_manager.print_msg('%s [after %d attempts]', obj, conn.attempts) else: thread_manager.print_msg(obj) except OSError as err: if err.errno != ENOENT: raise thread_manager.error('Local file %r not found', path) def _upload_dir(path, object_queue, object_name): names = listdir(path) if not names: object_queue.put({'path': path, 'object_name': object_name, 'dir_marker': True}) else: for name in listdir(path): subpath = join(path, name) subobjname = None if object_name is not None: subobjname = join(object_name, name) if isdir(subpath): _upload_dir(subpath, object_queue, subobjname) else: object_queue.put({'path': subpath, 'object_name': subobjname}) create_connection = lambda: get_conn(options) conn = create_connection() # Try to create the container, just in case it doesn't exist. If this # fails, it might just be because the user doesn't have container PUT # permissions, so we'll ignore any error. If there's really a problem, # it'll surface on the first object PUT. try: conn.put_container(args[0]) if options.segment_size is not None: seg_container = args[0] + '_segments' if options.segment_container: seg_container = options.segment_container conn.put_container(seg_container) except ClientException as err: msg = ' '.join(str(x) for x in (err.http_status, err.http_reason)) if err.http_response_content: if msg: msg += ': ' msg += err.http_response_content[:60] thread_manager.error( 'Error trying to create container %r: %s', args[0], msg) except Exception as err: thread_manager.error( 'Error trying to create container %r: %s', args[0], err) if options.object_name is not None: if len(args[1:]) > 1: thread_manager.error('object-name only be used with 1 file or dir') return object_name = options.object_name object_manager = thread_manager.queue_manager( _object_job, options.object_threads, connection_maker=create_connection) with object_manager as object_queue: try: for arg in args[1:]: if isdir(arg): _upload_dir(arg, object_queue, object_name) else: object_queue.put({'path': arg, 'object_name': object_name}) except ClientException as err: if err.http_status != 404: raise thread_manager.error('Account not found') st_capabilities_options = "[]" st_capabilities_help = ''' Retrieve capability of the proxy Optional positional arguments: proxy URL of the cluster to retrieve capabilities ''' def st_capabilities(parser, args, thread_manager): def _print_compo_cap(name, capabilities): for feature, options in sorted(capabilities.items(), key=lambda x: x[0]): thread_manager.print_msg("%s: %s" % (name, feature)) if options: thread_manager.print_msg(" Options:") for key, value in sorted(options.items(), key=lambda x: x[0]): thread_manager.print_msg(" %s: %s" % (key, value)) (options, args) = parse_args(parser, args) if (args and len(args) > 2): thread_manager.error('Usage: %s capabilities %s\n%s', basename(argv[0]), st_capabilities_options, st_capabilities_help) return conn = get_conn(options) url = None if len(args) == 2: url = args[1] capabilities = conn.get_capabilities(url) _print_compo_cap('Core', {'swift': capabilities['swift']}) del capabilities['swift'] _print_compo_cap('Additional middleware', capabilities) def split_headers(options, prefix='', thread_manager=None): """ Splits 'Key: Value' strings and returns them as a dictionary. :param options: An array of 'Key: Value' strings :param prefix: String to prepend to all of the keys in the dictionary. :param thread_manager: MultiThreadingManager for thread safe error reporting. """ headers = {} for item in options: split_item = item.split(':', 1) if len(split_item) == 2: headers[(prefix + split_item[0]).title()] = split_item[1] else: error_string = "Metadata parameter %s must contain a ':'.\n%s" \ % (item, st_post_help) if thread_manager: thread_manager.error(error_string) else: exit(error_string) return headers def parse_args(parser, args, enforce_requires=True): if not args: args = ['-h'] (options, args) = parser.parse_args(args) if (not (options.auth and options.user and options.key)): # Use 2.0 auth if none of the old args are present options.auth_version = '2.0' # Use new-style args if old ones not present if not options.auth and options.os_auth_url: options.auth = options.os_auth_url if not options.user and options.os_username: options.user = options.os_username if not options.key and options.os_password: options.key = options.os_password # Specific OpenStack options options.os_options = { 'tenant_id': options.os_tenant_id, 'tenant_name': options.os_tenant_name, 'service_type': options.os_service_type, 'endpoint_type': options.os_endpoint_type, 'auth_token': options.os_auth_token, 'object_storage_url': options.os_storage_url, 'region_name': options.os_region_name, } if len(args) > 1 and args[0] == "capabilities": return options, args if (options.os_options.get('object_storage_url') and options.os_options.get('auth_token') and options.auth_version == '2.0'): return options, args if enforce_requires and \ not (options.auth and options.user and options.key): exit(''' Auth version 1.0 requires ST_AUTH, ST_USER, and ST_KEY environment variables to be set or overridden with -A, -U, or -K. Auth version 2.0 requires OS_AUTH_URL, OS_USERNAME, OS_PASSWORD, and OS_TENANT_NAME OS_TENANT_ID to be set or overridden with --os-auth-url, --os-username, --os-password, --os-tenant-name or os-tenant-id. Note: adding "-V 2" is necessary for this.'''.strip('\n')) return options, args if __name__ == '__main__': version = client_version parser = OptionParser(version='%%prog %s' % version, usage=''' usage: %%prog [--version] [--help] [--snet] [--verbose] [--debug] [--info] [--quiet] [--auth ] [--auth-version ] [--user ] [--key ] [--retries ] [--os-username ] [--os-password ] [--os-tenant-id ] [--os-tenant-name ] [--os-auth-url ] [--os-auth-token ] [--os-storage-url ] [--os-region-name ] [--os-service-type ] [--os-endpoint-type ] [--os-cacert ] [--insecure] [--no-ssl-compression] ... Command-line interface to the OpenStack Swift API. Positional arguments: delete Delete a container or objects within a container download Download objects from containers list Lists the containers for the account or the objects for a container post Updates meta information for the account, container, or object; creates containers if not present stat Displays information for the account, container, or object upload Uploads files or directories to the given container capabilities List cluster capabilities Examples: %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K api_key stat -v %%prog --os-auth-url https://api.example.com/v2.0 --os-tenant-name tenant \\ --os-username user --os-password password list %%prog --os-auth-token 6ee5eb33efad4e45ab46806eac010566 \\ --os-storage-url https://10.1.5.2:8080/v1/AUTH_ced809b6a4baea7aeab61a \\ list %%prog list --lh '''.strip('\n') % globals()) parser.add_option('-s', '--snet', action='store_true', dest='snet', default=False, help='Use SERVICENET internal network') parser.add_option('-v', '--verbose', action='count', dest='verbose', default=1, help='Print more info') parser.add_option('--debug', action='store_true', dest='debug', default=False, help='Show the curl commands and results ' 'of all http queries regardless of result status.') parser.add_option('--info', action='store_true', dest='info', default=False, help='Show the curl commands and results ' ' of all http queries which return an error.') parser.add_option('-q', '--quiet', action='store_const', dest='verbose', const=0, default=1, help='Suppress status output') parser.add_option('-A', '--auth', dest='auth', default=environ.get('ST_AUTH'), help='URL for obtaining an auth token') parser.add_option('-V', '--auth-version', dest='auth_version', default=environ.get('ST_AUTH_VERSION', '1.0'), type=str, help='Specify a version for authentication. ' 'Defaults to 1.0.') parser.add_option('-U', '--user', dest='user', default=environ.get('ST_USER'), help='User name for obtaining an auth token.') parser.add_option('-K', '--key', dest='key', default=environ.get('ST_KEY'), help='Key for obtaining an auth token.') parser.add_option('-R', '--retries', type=int, default=5, dest='retries', help='The number of times to retry a failed connection.') parser.add_option('--os-username', metavar='', default=environ.get('OS_USERNAME'), help='OpenStack username. Defaults to env[OS_USERNAME].') parser.add_option('--os_username', help=SUPPRESS_HELP) parser.add_option('--os-password', metavar='', default=environ.get('OS_PASSWORD'), help='OpenStack password. Defaults to env[OS_PASSWORD].') parser.add_option('--os_password', help=SUPPRESS_HELP) parser.add_option('--os-tenant-id', metavar='', default=environ.get('OS_TENANT_ID'), help='OpenStack tenant ID. ' 'Defaults to env[OS_TENANT_ID]') parser.add_option('--os_tenant_id', help=SUPPRESS_HELP) parser.add_option('--os-tenant-name', metavar='', default=environ.get('OS_TENANT_NAME'), help='OpenStack tenant name. ' 'Defaults to env[OS_TENANT_NAME].') parser.add_option('--os_tenant_name', help=SUPPRESS_HELP) parser.add_option('--os-auth-url', metavar='', default=environ.get('OS_AUTH_URL'), help='OpenStack auth URL. Defaults to env[OS_AUTH_URL].') parser.add_option('--os_auth_url', help=SUPPRESS_HELP) parser.add_option('--os-auth-token', metavar='', default=environ.get('OS_AUTH_TOKEN'), help='OpenStack token. Defaults to env[OS_AUTH_TOKEN]. ' 'Used with --os-storage-url to bypass the ' 'usual username/password authentication.') parser.add_option('--os_auth_token', help=SUPPRESS_HELP) parser.add_option('--os-storage-url', metavar='', default=environ.get('OS_STORAGE_URL'), help='OpenStack storage URL. ' 'Defaults to env[OS_STORAGE_URL]. ' 'Overrides the storage url returned during auth. ' 'Will bypass authentication when used with ' '--os-auth-token.') parser.add_option('--os_storage_url', help=SUPPRESS_HELP) parser.add_option('--os-region-name', metavar='', default=environ.get('OS_REGION_NAME'), help='OpenStack region name. ' 'Defaults to env[OS_REGION_NAME]') parser.add_option('--os_region_name', help=SUPPRESS_HELP) parser.add_option('--os-service-type', metavar='', default=environ.get('OS_SERVICE_TYPE'), help='OpenStack Service type. ' 'Defaults to env[OS_SERVICE_TYPE]') parser.add_option('--os_service_type', help=SUPPRESS_HELP) parser.add_option('--os-endpoint-type', metavar='', default=environ.get('OS_ENDPOINT_TYPE'), help='OpenStack Endpoint type. ' 'Defaults to env[OS_ENDPOINT_TYPE]') parser.add_option('--os-cacert', metavar='', default=environ.get('OS_CACERT'), help='Specify a CA bundle file to use in verifying a ' 'TLS (https) server certificate. ' 'Defaults to env[OS_CACERT]') default_val = config_true_value(environ.get('SWIFTCLIENT_INSECURE')) parser.add_option('--insecure', action="store_true", dest="insecure", default=default_val, help='Allow swiftclient to access servers without ' 'having to verify the SSL certificate. ' 'Defaults to env[SWIFTCLIENT_INSECURE] ' '(set to \'true\' to enable).') parser.add_option('--no-ssl-compression', action='store_false', dest='ssl_compression', default=True, help='This option is deprecated and not used anymore. ' 'SSL compression should be disabled by default ' 'by the system SSL library') parser.disable_interspersed_args() (options, args) = parse_args(parser, argv[1:], enforce_requires=False) parser.enable_interspersed_args() commands = ('delete', 'download', 'list', 'post', 'stat', 'upload', 'capabilities') if not args or args[0] not in commands: parser.print_usage() if args: exit('no such command: %s' % args[0]) exit() signal.signal(signal.SIGINT, immediate_exit) if options.debug or options.info: logger = logging.getLogger("swiftclient") if options.debug: logging.basicConfig(level=logging.DEBUG) elif options.info: logging.basicConfig(level=logging.INFO) had_error = False with MultiThreadingManager() as thread_manager: parser.usage = globals()['st_%s_help' % args[0]] try: globals()['st_%s' % args[0]](parser, argv[1:], thread_manager) except (ClientException, RequestException, socket.error) as err: thread_manager.error(str(err)) had_error = thread_manager.error_count if had_error: exit(1)