summaryrefslogtreecommitdiff
path: root/test/integration/cleanup_gce.py
blob: c807ebb81a682d1cb35c68e7a4b79ba52474b870 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
'''
Find and delete GCE resources matching the provided --match string.  Unless
--yes|-y is provided, the prompt for confirmation prior to deleting resources.
Please use caution, you can easily delete your *ENTIRE* GCE infrastructure.
'''

import os
import re
import sys
import optparse
import yaml

try:
    from libcloud.compute.types import Provider
    from libcloud.compute.providers import get_driver
    from libcloud.common.google import GoogleBaseError, QuotaExceededError, \
            ResourceExistsError, ResourceInUseError, ResourceNotFoundError
    _ = Provider.GCE
except ImportError:
    print("failed=True " + \
        "msg='libcloud with GCE support (0.13.3+) required for this module'")
    sys.exit(1)

import gce_credentials


def delete_gce_resources(get_func, attr, opts):
    for item in get_func():
        val = getattr(item, attr)
        if re.search(opts.match_re, val, re.IGNORECASE):
            prompt_and_delete(item, "Delete matching %s? [y/n]: " % (item,), opts.assumeyes)

def prompt_and_delete(item, prompt, assumeyes):
    if not assumeyes:
        assumeyes = raw_input(prompt).lower() == 'y'
    assert hasattr(item, 'destroy'), "Class <%s> has no delete attribute" % item.__class__
    if assumeyes:
        item.destroy()
        print ("Deleted %s" % item)

def parse_args():
    parser = optparse.OptionParser(usage="%s [options]" % (sys.argv[0],),
                description=__doc__)
    gce_credentials.add_credentials_options(parser)
    parser.add_option("--yes", "-y",
        action="store_true", dest="assumeyes",
        default=False,
        help="Don't prompt for confirmation")
    parser.add_option("--match",
        action="store", dest="match_re",
        default="^ansible-testing-",
        help="Regular expression used to find GCE resources (default: %default)")

    (opts, args) = parser.parse_args()
    gce_credentials.check_required(opts, parser)
    return (opts, args)

if __name__ == '__main__':

    (opts, args) = parse_args()

    # Connect to GCE
    gce = gce_credentials.get_gce_driver(opts)

    try:
      # Delete matching instances
      delete_gce_resources(gce.list_nodes, 'name', opts)
      # Delete matching snapshots
      def get_snapshots():
        for volume in gce.list_volumes():
          for snapshot in gce.list_volume_snapshots(volume):
            yield snapshot
      delete_gce_resources(get_snapshots, 'name', opts)
      # Delete matching disks
      delete_gce_resources(gce.list_volumes, 'name', opts)
    except KeyboardInterrupt as e:
        print("\nExiting on user command.")