summaryrefslogtreecommitdiff
path: root/cinder/backup/manager.py
blob: 0ac16102e5f13d4584f07d8236668fcc00d0f3f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Backup manager manages volume backups.

Volume Backups are full copies of persistent volumes stored in a backup
store e.g. an object store or any other backup store if and when support is
added. They are usable without the original object being available. A
volume backup can be restored to the original volume it was created from or
any other available volume with a minimum size of the original volume.
Volume backups can be created, restored, deleted and listed.

**Related Flags**

:backup_manager:  The module name of a class derived from
                          :class:`manager.Manager` (default:
                          :class:`cinder.backup.manager.Manager`).

"""

import contextlib
import os

from castellan import key_manager
from eventlet import tpool
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils

from cinder.backup import rpcapi as backup_rpcapi
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.keymgr import migration as key_migration
from cinder import manager
from cinder.message import api as message_api
from cinder.message import message_field
from cinder import objects
from cinder.objects import fields
from cinder import quota
from cinder import utils
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_utils

LOG = logging.getLogger(__name__)

backup_manager_opts = [
    cfg.StrOpt('backup_driver',
               default='cinder.backup.drivers.swift.SwiftBackupDriver',
               help='Driver to use for backups.',),
    cfg.IntOpt('backup_driver_init_check_interval',
               default=60,
               min=5,
               help='Time in seconds between checks to see if the backup '
                    'driver has been successfully initialized, any time '
                    'the driver is restarted.'),
    cfg.IntOpt('backup_driver_stats_polling_interval',
               default=60,
               min=10,
               deprecated_name='backup_driver_status_check_interval',
               help='Time in seconds between checks of the backup driver '
                    'status.  If does not report as working, it is '
                    'restarted.'),
    cfg.BoolOpt('backup_service_inithost_offload',
                default=True,
                help='Offload pending backup delete during '
                     'backup service startup. If false, the backup service '
                     'will remain down until all pending backups are '
                     'deleted.',),
    cfg.IntOpt('backup_native_threads_pool_size',
               default=60,
               min=20,
               help='Size of the native threads pool for the backups.  '
                    'Most backup drivers rely heavily on this, it can be '
                    'decreased for specific drivers that don\'t.'),
]

CONF = cfg.CONF
CONF.register_opts(backup_manager_opts)
CONF.import_opt('use_multipath_for_image_xfer', 'cinder.volume.driver')
CONF.import_opt('num_volume_device_scan_tries', 'cinder.volume.driver')
QUOTAS = quota.QUOTAS
MAPPING = {
    # Module name "google" conflicts with google library namespace inside the
    # driver when it imports google.auth
    'cinder.backup.drivers.google.GoogleBackupDriver':
    'cinder.backup.drivers.gcs.GoogleBackupDriver',
}
SERVICE_PGRP = '' if os.name == 'nt' else os.getpgrp()


# TODO(geguileo): Once Eventlet issue #432 gets fixed we can just tpool.execute
# the whole call to the driver's backup and restore methods instead of proxy
# wrapping the device_file and having the drivers also proxy wrap their
# writes/reads and the compression/decompression calls.
# (https://github.com/eventlet/eventlet/issues/432)

class BackupManager(manager.SchedulerDependentManager):
    """Manages backup of block storage devices."""

    RPC_API_VERSION = backup_rpcapi.BackupAPI.RPC_API_VERSION

    target = messaging.Target(version=RPC_API_VERSION)

    def __init__(self, *args, **kwargs):
        self.az = CONF.storage_availability_zone
        self.backup_rpcapi = backup_rpcapi.BackupAPI()
        self.volume_rpcapi = volume_rpcapi.VolumeAPI()
        super(BackupManager, self).__init__(*args, **kwargs)
        self.is_initialized = False
        self._set_tpool_size(CONF.backup_native_threads_pool_size)
        self._process_number = kwargs.get('process_number', 1)
        self._semaphore = kwargs.get('semaphore', contextlib.suppress())
        self.driver_name = CONF.backup_driver
        if self.driver_name in MAPPING:
            new_name = MAPPING[self.driver_name]
            LOG.warning('Backup driver path %s is deprecated, update your '
                        'configuration to the new path %s',
                        self.driver_name, new_name)
            self.driver_name = new_name
        self.service = importutils.import_class(self.driver_name)
        self.message_api = message_api.API()

    def init_host(self, **kwargs):
        """Run initialization needed for a standalone service."""
        ctxt = context.get_admin_context()
        self.setup_backup_backend(ctxt)

        try:
            self._cleanup_incomplete_backup_operations(ctxt)
        except Exception:
            # Don't block startup of the backup service.
            LOG.exception("Problem cleaning incomplete backup operations.")

        # Migrate any ConfKeyManager keys based on fixed_key to the currently
        # configured key manager.
        backups = objects.BackupList.get_all_by_host(ctxt, self.host)
        self._add_to_threadpool(key_migration.migrate_fixed_key,
                                backups=backups)
        self.publish_service_capabilities(ctxt)

    def _setup_backup_driver(self, ctxt):
        backup_service = self.service(context=ctxt)
        backup_service.check_for_setup_error()
        self.is_initialized = True
        raise loopingcall.LoopingCallDone()

    def setup_backup_backend(self, ctxt):
        try:
            init_loop = loopingcall.FixedIntervalLoopingCall(
                self._setup_backup_driver, ctxt)
            init_loop.start(interval=CONF.backup_driver_init_check_interval)
        except loopingcall.LoopingCallDone:
            LOG.info("Backup driver was successfully initialized.")
        except Exception:
            LOG.exception("Failed to initialize driver.",
                          resource={'type': 'driver',
                                    'id': self.__class__.__name__})

    def reset(self):
        super(BackupManager, self).reset()
        self.backup_rpcapi = backup_rpcapi.BackupAPI()
        self.volume_rpcapi = volume_rpcapi.VolumeAPI()

    @utils.synchronized('cleanup_incomplete_backups_%s' % SERVICE_PGRP,
                        external=True, delay=0.1)
    def _cleanup_incomplete_backup_operations(self, ctxt):
        # Only the first launched process should do the cleanup, the others
        # have waited on the lock for the first one to finish the cleanup and
        # can now continue with the start process.
        if self._process_number != 1:
            LOG.debug("Process #%s %sskips cleanup.",
                      self._process_number,
                      '(pgid=%s) ' % SERVICE_PGRP if SERVICE_PGRP else '')
            return

        LOG.info("Cleaning up incomplete backup operations.")

        # TODO(smulcahy) implement full resume of backup and restore
        # operations on restart (rather than simply resetting)
        backups = objects.BackupList.get_all_by_host(ctxt, self.host)
        for backup in backups:
            try:
                self._cleanup_one_backup(ctxt, backup)
            except Exception:
                LOG.exception("Problem cleaning up backup %(bkup)s.",
                              {'bkup': backup['id']})
            try:
                self._cleanup_temp_volumes_snapshots_for_one_backup(ctxt,
                                                                    backup)
            except Exception:
                LOG.exception("Problem cleaning temp volumes and "
                              "snapshots for backup %(bkup)s.",
                              {'bkup': backup['id']})

    def _cleanup_one_volume(self, ctxt, volume):
        if volume['status'] == 'backing-up':
            self._detach_all_attachments(ctxt, volume)
            LOG.info('Resetting volume %(vol_id)s to previous '
                     'status %(status)s (was backing-up).',
                     {'vol_id': volume['id'],
                      'status': volume['previous_status']})
            self.db.volume_update(ctxt, volume['id'],
                                  {'status': volume['previous_status']})
        elif volume['status'] == 'restoring-backup':
            self._detach_all_attachments(ctxt, volume)
            LOG.info('Setting volume %s to error_restoring '
                     '(was restoring-backup).', volume['id'])
            self.db.volume_update(ctxt, volume['id'],
                                  {'status': 'error_restoring'})

    def _cleanup_one_backup(self, ctxt, backup):
        if backup['status'] == fields.BackupStatus.CREATING:
            LOG.info('Resetting backup %s to error (was creating).',
                     backup['id'])

            volume = objects.Volume.get_by_id(ctxt, backup.volume_id)
            self._cleanup_one_volume(ctxt, volume)

            err = 'incomplete backup reset on manager restart'
            volume_utils.update_backup_error(backup, err)
        elif backup['status'] == fields.BackupStatus.RESTORING:
            LOG.info('Resetting backup %s to '
                     'available (was restoring).',
                     backup['id'])
            volume = objects.Volume.get_by_id(ctxt, backup.restore_volume_id)
            self._cleanup_one_volume(ctxt, volume)

            backup.status = fields.BackupStatus.AVAILABLE
            backup.save()
        elif backup['status'] == fields.BackupStatus.DELETING:
            # Don't resume deleting the backup of an encrypted volume. The
            # admin context won't be sufficient to delete the backup's copy
            # of the encryption key ID (a real user context is required).
            if backup.encryption_key_id is None:
                LOG.info('Resuming delete on backup: %s.', backup.id)
                if CONF.backup_service_inithost_offload:
                    # Offload all the pending backup delete operations to the
                    # threadpool to prevent the main backup service thread
                    # from being blocked.
                    self._add_to_threadpool(self.delete_backup, ctxt, backup)
                else:
                    # Delete backups sequentially
                    self.delete_backup(ctxt, backup)
            else:
                LOG.info('Unable to resume deleting backup of an encrypted '
                         'volume, resetting backup %s to error_deleting '
                         '(was deleting).',
                         backup.id)
                backup.status = fields.BackupStatus.ERROR_DELETING
                backup.save()

    def _detach_all_attachments(self, ctxt, volume):
        attachments = volume['volume_attachment'] or []
        for attachment in attachments:
            if (attachment['attached_host'] == self.host and
                    attachment['instance_uuid'] is None):
                try:
                    rpcapi = self.volume_rpcapi
                    rpcapi.detach_volume(ctxt, volume, attachment['id'])
                except Exception:
                    LOG.exception("Detach attachment %(attach_id)s failed.",
                                  {'attach_id': attachment['id']},
                                  resource=volume)

    def _delete_temp_volume(self, ctxt, backup):
        try:
            temp_volume = objects.Volume.get_by_id(
                ctxt, backup.temp_volume_id)
            self.volume_rpcapi.delete_volume(ctxt, temp_volume)
        except exception.VolumeNotFound:
            LOG.debug("Could not find temp volume %(vol)s to clean up "
                      "for backup %(backup)s.",
                      {'vol': backup.temp_volume_id,
                       'backup': backup.id})
        backup.temp_volume_id = None
        backup.save()

    def _delete_temp_snapshot(self, ctxt, backup):
        try:
            temp_snapshot = objects.Snapshot.get_by_id(
                ctxt, backup.temp_snapshot_id)
            # We may want to consider routing those calls through the
            # cinder API.
            temp_snapshot.status = fields.SnapshotStatus.DELETING
            temp_snapshot.save()
            self.volume_rpcapi.delete_snapshot(ctxt, temp_snapshot)
        except exception.SnapshotNotFound:
            LOG.debug("Could not find temp snapshot %(snap)s to clean "
                      "up for backup %(backup)s.",
                      {'snap': backup.temp_snapshot_id,
                       'backup': backup.id})
        backup.temp_snapshot_id = None
        backup.save()

    def _cleanup_temp_volumes_snapshots_for_one_backup(self, ctxt, backup):
        # NOTE(xyang): If the service crashes or gets restarted during the
        # backup operation, there could be temporary volumes or snapshots
        # that are not deleted. Make sure any temporary volumes or snapshots
        # create by the backup job are deleted when service is started.
        if (backup.temp_volume_id
                and backup.status == fields.BackupStatus.ERROR):
            self._delete_temp_volume(ctxt, backup)

        if (backup.temp_snapshot_id
                and backup.status == fields.BackupStatus.ERROR):
            self._delete_temp_snapshot(ctxt, backup)

    def _cleanup_temp_volumes_snapshots_when_backup_created(
            self, ctxt, backup):
        # Delete temp volumes or snapshots when backup creation is completed.
        if backup.temp_volume_id:
            self._delete_temp_volume(ctxt, backup)

        if backup.temp_snapshot_id:
            self._delete_temp_snapshot(ctxt, backup)

    @utils.limit_operations
    def create_backup(self, context, backup):
        """Create volume backups using configured backup service."""
        volume_id = backup.volume_id
        snapshot_id = backup.snapshot_id
        volume = objects.Volume.get_by_id(context, volume_id)
        snapshot = objects.Snapshot.get_by_id(
            context, snapshot_id) if snapshot_id else None
        previous_status = volume.get('previous_status', None)
        context.message_resource_id = backup.id
        context.message_resource_type = message_field.Resource.VOLUME_BACKUP
        context.message_action = message_field.Action.BACKUP_CREATE
        if snapshot_id:
            log_message = ('Create backup started, backup: %(backup_id)s '
                           'volume: %(volume_id)s snapshot: %(snapshot_id)s.'
                           % {'backup_id': backup.id,
                              'volume_id': volume_id,
                              'snapshot_id': snapshot_id})
        else:
            log_message = ('Create backup started, backup: %(backup_id)s '
                           'volume: %(volume_id)s.'
                           % {'backup_id': backup.id,
                              'volume_id': volume_id})
        LOG.info(log_message)

        self._notify_about_backup_usage(context, backup, "create.start")

        expected_status = "backing-up"
        if snapshot_id:
            actual_status = snapshot['status']
            if actual_status != expected_status:
                err = _('Create backup aborted, expected snapshot status '
                        '%(expected_status)s but got %(actual_status)s.') % {
                    'expected_status': expected_status,
                    'actual_status': actual_status,
                }
                volume_utils.update_backup_error(backup, err)
                raise exception.InvalidSnapshot(reason=err)
        else:
            actual_status = volume['status']
            if actual_status != expected_status:
                err = _('Create backup aborted, expected volume status '
                        '%(expected_status)s but got %(actual_status)s.') % {
                    'expected_status': expected_status,
                    'actual_status': actual_status,
                }
                volume_utils.update_backup_error(backup, err)
                raise exception.InvalidVolume(reason=err)

        expected_status = fields.BackupStatus.CREATING
        actual_status = backup.status
        if actual_status != expected_status:
            err = _('Create backup aborted, expected backup status '
                    '%(expected_status)s but got %(actual_status)s.') % {
                'expected_status': expected_status,
                'actual_status': actual_status,
            }
            volume_utils.update_backup_error(backup, err)
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.BACKUP_INVALID_STATE)
            raise exception.InvalidBackup(reason=err)

        try:
            if not self.is_working():
                err = _('Create backup aborted due to backup service is down.')
                volume_utils.update_backup_error(backup, err)
                self.message_api.create_from_request_context(
                    context,
                    detail=message_field.Detail.BACKUP_SERVICE_DOWN)
                raise exception.InvalidBackup(reason=err)

            backup.service = self.driver_name
            backup.save()

            # Start backup, then continue_backup, then finish_backup
            self._start_backup(context, backup, volume)
        except Exception as err:
            with excutils.save_and_reraise_exception():
                if snapshot_id:
                    snapshot.status = fields.SnapshotStatus.AVAILABLE
                    snapshot.save()
                else:
                    try:
                        self.db.volume_update(
                            context, volume_id,
                            {'status': previous_status,
                             'previous_status': 'error_backing-up'})
                    except exception.VolumeNotFound:
                        # If the volume was deleted we cannot update its
                        # status but we still want to set the backup to error.
                        pass
                volume_utils.update_backup_error(backup, str(err))

    def _start_backup(self, context, backup, volume):
        """This starts the backup process.

           First we have to get the backup device from the volume manager.
           This can take a long time to complete.  Once the volume manager
           is done creating/getting the backup device, then we get a callback
           to complete the process of backing up the volume.

        """
        # Save a copy of the encryption key ID in case the volume is deleted.
        if (volume.encryption_key_id is not None and
                backup.encryption_key_id is None):
            backup.encryption_key_id = volume_utils.clone_encryption_key(
                context,
                key_manager.API(CONF),
                volume.encryption_key_id)
            backup.save()

        # This is an async call to the volume manager.  We will get a
        # callback from the volume manager to continue once it's done.
        LOG.info("Call Volume Manager to get_backup_device for %s", backup)
        self.volume_rpcapi.get_backup_device(context, backup, volume)

    def continue_backup(self, context, backup, backup_device):
        """This is the callback from the volume manager to continue."""
        message_created = False
        volume_id = backup.volume_id
        volume = objects.Volume.get_by_id(context, volume_id)
        snapshot_id = backup.snapshot_id
        snapshot = objects.Snapshot.get_by_id(
            context, snapshot_id) if snapshot_id else None
        previous_status = volume.get('previous_status', None)

        backup_service = self.service(context)
        properties = volume_utils.brick_get_connector_properties(
            CONF.use_multipath_for_image_xfer, enforce_multipath=False)

        updates = {}
        try:
            try:
                attach_info = self._attach_device(context,
                                                  backup_device.device_obj,
                                                  properties,
                                                  backup_device.is_snapshot)
            except Exception:
                with excutils.save_and_reraise_exception():
                    if not message_created:
                        message_created = True
                        self.message_api.create_from_request_context(
                            context,
                            detail=message_field.Detail.ATTACH_ERROR)

            try:
                device_path = attach_info['device']['path']
                if (isinstance(device_path, str) and
                        not os.path.isdir(device_path)):
                    if backup_device.secure_enabled:
                        with open(device_path, 'rb') as device_file:
                            updates = backup_service.backup(
                                backup, tpool.Proxy(device_file))
                    else:
                        with utils.temporary_chown(device_path):
                            with open(device_path, 'rb') as device_file:
                                updates = backup_service.backup(
                                    backup, tpool.Proxy(device_file))
                # device_path is already file-like so no need to open it
                else:
                    updates = backup_service.backup(backup,
                                                    tpool.Proxy(device_path))
            except Exception:
                with excutils.save_and_reraise_exception():
                    if not message_created:
                        message_created = True
                        self.message_api.create_from_request_context(
                            context,
                            detail=
                            message_field.Detail.BACKUP_CREATE_DRIVER_ERROR)
            finally:
                try:
                    self._detach_device(context, attach_info,
                                        backup_device.device_obj, properties,
                                        backup_device.is_snapshot, force=True,
                                        ignore_errors=True)
                except Exception:
                    with excutils.save_and_reraise_exception():
                        if not message_created:
                            message_created = True
                            self.message_api.create_from_request_context(
                                context,
                                detail=
                                message_field.Detail.DETACH_ERROR)
        except Exception as err:
            with excutils.save_and_reraise_exception():
                if snapshot_id:
                    snapshot.status = fields.SnapshotStatus.AVAILABLE
                    snapshot.save()
                else:
                    self.db.volume_update(
                        context, volume_id,
                        {'status': previous_status,
                         'previous_status': 'error_backing-up'})
                volume_utils.update_backup_error(backup, str(err))
        finally:
            with backup.as_read_deleted():
                backup.refresh()
            try:
                self._cleanup_temp_volumes_snapshots_when_backup_created(
                    context, backup)
            except Exception:
                with excutils.save_and_reraise_exception():
                    if not message_created:
                        self.message_api.create_from_request_context(
                            context,
                            detail=
                            message_field.Detail.BACKUP_CREATE_CLEANUP_ERROR)

        self._finish_backup(context, backup, volume, updates)

    def _finish_backup(self, context, backup, volume, updates):
        volume_id = backup.volume_id
        snapshot_id = backup.snapshot_id
        previous_status = volume.get('previous_status', None)

        # Restore the original status.
        if snapshot_id:
            self.db.snapshot_update(
                context, snapshot_id,
                {'status': fields.SnapshotStatus.AVAILABLE})
        else:
            self.db.volume_update(context, volume_id,
                                  {'status': previous_status,
                                   'previous_status': 'backing-up'})

        # continue_backup method above updated the status for the backup, so
        # it will reflect latest status, even if it is deleted
        completion_msg = 'finished'
        if backup.status in (fields.BackupStatus.DELETING,
                             fields.BackupStatus.DELETED):
            completion_msg = 'aborted'
        else:
            backup.status = fields.BackupStatus.AVAILABLE
            backup.size = volume['size']

            if updates:
                backup.update(updates)
            backup.save()

            # Handle the num_dependent_backups of parent backup when child
            # backup has created successfully.
            if backup.parent_id:
                parent_backup = objects.Backup.get_by_id(context,
                                                         backup.parent_id)
                parent_backup.num_dependent_backups += 1
                parent_backup.save()
        LOG.info('Create backup %s. backup: %s.', completion_msg, backup.id)
        self._notify_about_backup_usage(context, backup, "create.end")

    def _is_our_backup(self, backup):
        # Accept strings and Service OVO
        if not isinstance(backup, str):
            backup = backup.service

        if not backup:
            return True

        # TODO(tommylikehu): We upgraded the 'driver_name' from module
        # to class name, so we use 'in' here to match two namings,
        # this can be replaced with equal sign during next
        # release (Rocky).
        if self.driver_name.startswith(backup):
            return True

        # We support renaming of drivers, so check old names as well
        for key, value in MAPPING.items():
            if key.startswith(backup) and self.driver_name.startswith(value):
                return True

        return False

    @utils.limit_operations
    def restore_backup(self, context, backup, volume_id, volume_is_new):
        """Restore volume backups from configured backup service.

        :param context: RequestContext for the restore operation
        :param backup: Backup that we're restoring
        :param volume_id: The ID of the volume into which we're restoring
        :param volume_is_new: The volume does not have stale data, so
                              sparse backups can be restored as such.
        """
        context.message_resource_id = backup.id
        context.message_resource_type = message_field.Resource.VOLUME_BACKUP
        context.message_action = message_field.Action.BACKUP_RESTORE
        LOG.info('Restore backup started, backup: %(backup_id)s '
                 'volume: %(volume_id)s.',
                 {'backup_id': backup.id, 'volume_id': volume_id})

        volume = objects.Volume.get_by_id(context, volume_id)
        self._notify_about_backup_usage(context, backup, "restore.start")

        expected_status = [fields.VolumeStatus.RESTORING_BACKUP,
                           fields.VolumeStatus.CREATING]
        volume_previous_status = volume['status']
        if volume_previous_status not in expected_status:
            err = (_('Restore backup aborted, expected volume status '
                     '%(expected_status)s but got %(actual_status)s.') %
                   {'expected_status': ','.join(expected_status),
                    'actual_status': volume_previous_status})
            backup.status = fields.BackupStatus.AVAILABLE
            backup.save()
            self.db.volume_update(
                context, volume_id,
                {'status':
                 (fields.VolumeStatus.ERROR if
                  volume_previous_status == fields.VolumeStatus.CREATING else
                  fields.VolumeStatus.ERROR_RESTORING)})
            self.message_api.create(
                context,
                action=message_field.Action.BACKUP_RESTORE,
                resource_type=message_field.Resource.VOLUME_BACKUP,
                resource_uuid=volume.id,
                detail=message_field.Detail.VOLUME_INVALID_STATE)
            raise exception.InvalidVolume(reason=err)

        expected_status = fields.BackupStatus.RESTORING
        actual_status = backup['status']
        if actual_status != expected_status:
            err = (_('Restore backup aborted: expected backup status '
                     '%(expected_status)s but got %(actual_status)s.') %
                   {'expected_status': expected_status,
                    'actual_status': actual_status})
            volume_utils.update_backup_error(backup, err)
            self.db.volume_update(context, volume_id,
                                  {'status': fields.VolumeStatus.ERROR})
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.BACKUP_INVALID_STATE)
            raise exception.InvalidBackup(reason=err)

        if volume['size'] > backup['size']:
            LOG.info('Volume: %(vol_id)s, size: %(vol_size)d is '
                     'larger than backup: %(backup_id)s, '
                     'size: %(backup_size)d, continuing with restore.',
                     {'vol_id': volume['id'],
                      'vol_size': volume['size'],
                      'backup_id': backup['id'],
                      'backup_size': backup['size']})

        if not self._is_our_backup(backup):
            err = _('Restore backup aborted, the backup service currently'
                    ' configured [%(configured_service)s] is not the'
                    ' backup service that was used to create this'
                    ' backup [%(backup_service)s].') % {
                'configured_service': self.driver_name,
                'backup_service': backup.service,
            }
            backup.status = fields.BackupStatus.AVAILABLE
            backup.save()
            self.db.volume_update(context, volume_id,
                                  {'status': fields.VolumeStatus.ERROR})
            raise exception.InvalidBackup(reason=err)

        canceled = False
        try:
            self._run_restore(context, backup, volume, volume_is_new)
        except exception.BackupRestoreCancel:
            canceled = True
        except Exception:
            with excutils.save_and_reraise_exception():
                self.db.volume_update(
                    context, volume_id,
                    {'status': (fields.VolumeStatus.ERROR if
                                actual_status == fields.VolumeStatus.CREATING
                                else fields.VolumeStatus.ERROR_RESTORING)})
                backup.status = fields.BackupStatus.AVAILABLE
                backup.save()

        if canceled:
            volume.status = fields.VolumeStatus.ERROR
        else:
            volume.status = fields.VolumeStatus.AVAILABLE
            # NOTE(tommylikehu): If previous status is 'creating', this is
            # just a new created volume and we need update the 'launched_at'
            # attribute as well.
            if volume_previous_status == fields.VolumeStatus.CREATING:
                volume['launched_at'] = timeutils.utcnow()
        old_src_backup_id = self.db.volume_metadata_get(
            context, volume_id).get("src_backup_id", None)
        if backup.volume_id != volume.id or (
                old_src_backup_id and old_src_backup_id != backup.id):
            self.db.volume_metadata_update(
                context,
                volume.id,
                {'src_backup_id': backup.id},
                False)

        volume.save()
        backup.status = fields.BackupStatus.AVAILABLE
        backup.save()
        LOG.info('%(result)s restoring backup %(backup_id)s to volume '
                 '%(volume_id)s.',
                 {'result': 'Canceled' if canceled else 'Finished',
                  'backup_id': backup.id,
                  'volume_id': volume_id})
        self._notify_about_backup_usage(context, backup, "restore.end")

    def _run_restore(self, context, backup, volume, volume_is_new):
        message_created = False
        orig_key_id = volume.encryption_key_id
        backup_service = self.service(context)

        properties = volume_utils.brick_get_connector_properties(
            CONF.use_multipath_for_image_xfer, enforce_multipath=False)
        secure_enabled = (
            self.volume_rpcapi.secure_file_operations_enabled(context,
                                                              volume))
        try:
            attach_info = self._attach_device(context, volume, properties)
        except Exception:
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.ATTACH_ERROR)
            raise

        # NOTE(geguileo): Not all I/O disk operations properly do greenthread
        # context switching and may end up blocking the greenthread, so we go
        # with native threads proxy-wrapping the device file object.
        try:
            device_path = attach_info['device']['path']
            open_mode = 'rb+' if os.name == 'nt' else 'wb'
            if (isinstance(device_path, str) and
                    not os.path.isdir(device_path)):
                if secure_enabled:
                    with open(device_path, open_mode) as device_file:
                        backup_service.restore(backup, volume.id,
                                               tpool.Proxy(device_file),
                                               volume_is_new)
                else:
                    with utils.temporary_chown(device_path):
                        with open(device_path, open_mode) as device_file:
                            backup_service.restore(backup, volume.id,
                                                   tpool.Proxy(device_file),
                                                   volume_is_new)
            # device_path is already file-like so no need to open it
            else:
                backup_service.restore(backup, volume.id,
                                       tpool.Proxy(device_path),
                                       volume_is_new)
        except exception.BackupRestoreCancel:
            raise
        except Exception:
            LOG.exception('Restoring backup %(backup_id)s to volume '
                          '%(volume_id)s failed.', {'backup_id': backup.id,
                                                    'volume_id': volume.id})
            # We set message_create to True before creating the
            # message because if the message create call fails
            # and is catched by the base/outer exception handler
            # then we will end up storing a wrong message
            message_created = True
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.BACKUP_RESTORE_ERROR)
            raise
        finally:
            try:
                self._detach_device(context, attach_info, volume, properties,
                                    force=True)
            except Exception:
                if not message_created:
                    self.message_api.create_from_request_context(
                        context,
                        detail=message_field.Detail.DETACH_ERROR)
                raise

        # Regardless of whether the restore was successful, do some
        # housekeeping to ensure the restored volume's encryption key ID is
        # unique, and any previous key ID is deleted. Start by fetching fresh
        # info on the restored volume.
        restored_volume = objects.Volume.get_by_id(context, volume.id)
        restored_key_id = restored_volume.encryption_key_id
        if restored_key_id != orig_key_id:
            LOG.info('Updating encryption key ID for volume %(volume_id)s '
                     'from backup %(backup_id)s.',
                     {'volume_id': volume.id, 'backup_id': backup.id})

            key_mgr = key_manager.API(CONF)
            if orig_key_id is not None:
                LOG.debug('Deleting original volume encryption key ID.')
                volume_utils.delete_encryption_key(context,
                                                   key_mgr,
                                                   orig_key_id)

            if backup.encryption_key_id is None:
                # This backup predates the current code that stores the cloned
                # key ID in the backup database. Fortunately, the key ID
                # restored from the backup data _is_ a clone of the original
                # volume's key ID, so grab it.
                LOG.debug('Gleaning backup encryption key ID from metadata.')
                backup.encryption_key_id = restored_key_id
                backup.save()

            # Clone the key ID again to ensure every restored volume has
            # a unique key ID. The volume's key ID should not be the same
            # as the backup.encryption_key_id (the copy made when the backup
            # was first created).
            new_key_id = volume_utils.clone_encryption_key(
                context,
                key_mgr,
                backup.encryption_key_id)
            restored_volume.encryption_key_id = new_key_id
            restored_volume.save()
        else:
            LOG.debug('Encryption key ID for volume %(volume_id)s already '
                      'matches encryption key ID in backup %(backup_id)s.',
                      {'volume_id': volume.id, 'backup_id': backup.id})

    def delete_backup(self, context, backup):
        """Delete volume backup from configured backup service."""
        LOG.info('Delete backup started, backup: %s.', backup.id)

        self._notify_about_backup_usage(context, backup, "delete.start")

        context.message_resource_id = backup.id
        context.message_resource_type = message_field.Resource.VOLUME_BACKUP
        context.message_action = message_field.Action.BACKUP_DELETE
        expected_status = fields.BackupStatus.DELETING
        actual_status = backup.status
        if actual_status != expected_status:
            err = _('Delete_backup aborted, expected backup status '
                    '%(expected_status)s but got %(actual_status)s.') \
                % {'expected_status': expected_status,
                   'actual_status': actual_status}
            volume_utils.update_backup_error(backup, err)
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.BACKUP_INVALID_STATE)
            raise exception.InvalidBackup(reason=err)

        if backup.service and not self.is_working():
            err = _('Delete backup is aborted due to backup service is down.')
            status = fields.BackupStatus.ERROR_DELETING
            volume_utils.update_backup_error(backup, err, status)
            self.message_api.create_from_request_context(
                context,
                detail=message_field.Detail.BACKUP_SERVICE_DOWN)
            raise exception.InvalidBackup(reason=err)

        if not self._is_our_backup(backup):
            err = _('Delete backup aborted, the backup service currently'
                    ' configured [%(configured_service)s] is not the'
                    ' backup service that was used to create this'
                    ' backup [%(backup_service)s].')\
                % {'configured_service': self.driver_name,
                   'backup_service': backup.service}
            volume_utils.update_backup_error(backup, err)
            raise exception.InvalidBackup(reason=err)

        if backup.service:
            try:
                backup_service = self.service(context)
                backup_service.delete_backup(backup)
            except Exception as err:
                with excutils.save_and_reraise_exception():
                    volume_utils.update_backup_error(backup, str(err))
                    self.message_api.create_from_request_context(
                        context,
                        detail=message_field.Detail.BACKUP_DELETE_DRIVER_ERROR)

        # Get reservations
        try:
            reserve_opts = {
                'backups': -1,
                'backup_gigabytes': -backup.size,
            }
            reservations = QUOTAS.reserve(context,
                                          project_id=backup.project_id,
                                          **reserve_opts)
        except Exception:
            reservations = None
            LOG.exception("Failed to update usages deleting backup")

        if backup.encryption_key_id is not None:
            volume_utils.delete_encryption_key(context,
                                               key_manager.API(CONF),
                                               backup.encryption_key_id)
            backup.encryption_key_id = None
            backup.save()

        backup.destroy()
        # If this backup is incremental backup, handle the
        # num_dependent_backups of parent backup
        if backup.parent_id:
            parent_backup = objects.Backup.get_by_id(context,
                                                     backup.parent_id)
            if parent_backup.has_dependent_backups:
                parent_backup.num_dependent_backups -= 1
                parent_backup.save()
        # Commit the reservations
        if reservations:
            QUOTAS.commit(context, reservations,
                          project_id=backup.project_id)

        LOG.info('Delete backup finished, backup %s deleted.', backup.id)
        self._notify_about_backup_usage(context, backup, "delete.end")

    def _notify_about_backup_usage(self,
                                   context,
                                   backup,
                                   event_suffix,
                                   extra_usage_info=None):
        volume_utils.notify_about_backup_usage(
            context, backup, event_suffix,
            extra_usage_info=extra_usage_info,
            host=self.host)

    def export_record(self, context, backup):
        """Export all volume backup metadata details to allow clean import.

        Export backup metadata so it could be re-imported into the database
        without any prerequisite in the backup database.

        :param context: running context
        :param backup: backup object to export
        :returns: backup_record - a description of how to import the backup
        :returns: contains 'backup_url' - how to import the backup, and
        :returns: 'backup_service' describing the needed driver.
        :raises InvalidBackup:
        """
        LOG.info('Export record started, backup: %s.', backup.id)

        expected_status = fields.BackupStatus.AVAILABLE
        actual_status = backup.status
        if actual_status != expected_status:
            err = (_('Export backup aborted, expected backup status '
                     '%(expected_status)s but got %(actual_status)s.') %
                   {'expected_status': expected_status,
                    'actual_status': actual_status})
            raise exception.InvalidBackup(reason=err)

        backup_record = {'backup_service': backup.service}
        if not self._is_our_backup(backup):
            err = (_('Export record aborted, the backup service currently '
                     'configured [%(configured_service)s] is not the '
                     'backup service that was used to create this '
                     'backup [%(backup_service)s].') %
                   {'configured_service': self.driver_name,
                    'backup_service': backup.service})
            raise exception.InvalidBackup(reason=err)

        # Call driver to create backup description string
        try:
            backup_service = self.service(context)
            driver_info = backup_service.export_record(backup)
            backup_url = backup.encode_record(driver_info=driver_info)
            backup_record['backup_url'] = backup_url
        except Exception as err:
            msg = str(err)
            raise exception.InvalidBackup(reason=msg)

        LOG.info('Export record finished, backup %s exported.', backup.id)
        return backup_record

    def import_record(self,
                      context,
                      backup,
                      backup_service,
                      backup_url,
                      backup_hosts):
        """Import all volume backup metadata details to the backup db.

        :param context: running context
        :param backup: The new backup object for the import
        :param backup_service: The needed backup driver for import
        :param backup_url: An identifier string to locate the backup
        :param backup_hosts: Potential hosts to execute the import
        :raises InvalidBackup:
        :raises ServiceNotFound:
        """
        LOG.info('Import record started, backup_url: %s.', backup_url)

        # Can we import this backup?
        if not self._is_our_backup(backup_service):
            # No, are there additional potential backup hosts in the list?
            if len(backup_hosts) > 0:
                # try the next host on the list, maybe he can import
                first_host = backup_hosts.pop()
                self.backup_rpcapi.import_record(context,
                                                 first_host,
                                                 backup,
                                                 backup_service,
                                                 backup_url,
                                                 backup_hosts)
            else:
                # empty list - we are the last host on the list, fail
                err = _('Import record failed, cannot find backup '
                        'service to perform the import. Request service '
                        '%(service)s.') % {'service': backup_service}
                volume_utils.update_backup_error(backup, err)
                raise exception.ServiceNotFound(service_id=backup_service)
        else:
            # Yes...
            try:
                # Deserialize backup record information
                backup_options = backup.decode_record(backup_url)

                # Extract driver specific info and pass it to the driver
                driver_options = backup_options.pop('driver_info', {})
                backup_service = self.service(context)
                backup_service.import_record(backup, driver_options)
            except Exception as err:
                msg = str(err)
                volume_utils.update_backup_error(backup, msg)
                raise exception.InvalidBackup(reason=msg)

            required_import_options = {
                'display_name',
                'display_description',
                'container',
                'size',
                'service_metadata',
                'object_count',
                'id'
            }

            # Check for missing fields in imported data
            missing_opts = required_import_options - set(backup_options)
            if missing_opts:
                msg = (_('Driver successfully decoded imported backup data, '
                         'but there are missing fields (%s).') %
                       ', '.join(missing_opts))
                volume_utils.update_backup_error(backup, msg)
                raise exception.InvalidBackup(reason=msg)

            # Confirm the ID from the record in the DB is the right one
            backup_id = backup_options['id']
            if backup_id != backup.id:
                msg = (_('Trying to import backup metadata from id %(meta_id)s'
                         ' into backup %(id)s.') %
                       {'meta_id': backup_id, 'id': backup.id})
                volume_utils.update_backup_error(backup, msg)
                raise exception.InvalidBackup(reason=msg)

            # Overwrite some fields
            backup_options['service'] = self.driver_name
            backup_options['availability_zone'] = self.az
            backup_options['host'] = self.host

            # Remove some values which are not actual fields and some that
            # were set by the API node
            for key in ('name', 'user_id', 'project_id', 'deleted_at',
                        'deleted', 'fail_reason', 'status'):
                backup_options.pop(key, None)

            # Update the database
            backup.update(backup_options)
            backup.save()

            # Update the backup's status
            backup.update({"status": fields.BackupStatus.AVAILABLE})
            backup.save()

            LOG.info('Import record id %s metadata from driver '
                     'finished.', backup.id)

    def reset_status(self, context, backup, status):
        """Reset volume backup status.

        :param context: running context
        :param backup: The backup object for reset status operation
        :param status: The status to be set
        :raises InvalidBackup:
        :raises AttributeError:
        """
        LOG.info('Reset backup status started, backup_id: '
                 '%(backup_id)s, status: %(status)s.',
                 {'backup_id': backup.id,
                  'status': status})

        LOG.info('Backup service: %s.', backup.service)
        if not self._is_our_backup(backup):
            err = _('Reset backup status aborted, the backup service'
                    ' currently configured [%(configured_service)s] '
                    'is not the backup service that was used to create'
                    ' this backup [%(backup_service)s].') % \
                {'configured_service': self.driver_name,
                 'backup_service': backup.service}
            raise exception.InvalidBackup(reason=err)

        if backup.service is not None:
            backup.status = status
            backup.save()

            # Needs to clean temporary volumes and snapshots.
            try:
                self._cleanup_temp_volumes_snapshots_for_one_backup(
                    context, backup)
            except Exception:
                LOG.exception("Problem cleaning temp volumes and "
                              "snapshots for backup %(bkup)s.",
                              {'bkup': backup.id})

            volume_utils.notify_about_backup_usage(context, backup,
                                                   'reset_status.end')

    def check_support_to_force_delete(self, context):
        """Check if the backup driver supports force delete operation.

        :param context: running context
        """
        backup_service = self.service(context)
        return backup_service.support_force_delete

    def _attach_device(self, ctxt, backup_device,
                       properties, is_snapshot=False):
        """Attach backup device."""
        if not is_snapshot:
            return self._attach_volume(ctxt, backup_device, properties)
        else:
            return self._attach_snapshot(ctxt, backup_device, properties)

    def _attach_volume(self, context, volume, properties):
        """Attach a volume."""

        try:
            conn = self.volume_rpcapi.initialize_connection(context,
                                                            volume,
                                                            properties)
            return self._connect_device(conn)
        except Exception:
            with excutils.save_and_reraise_exception():
                try:
                    self.volume_rpcapi.terminate_connection(context, volume,
                                                            properties,
                                                            force=True)
                except Exception:
                    LOG.warning("Failed to terminate the connection "
                                "of volume %(volume_id)s, but it is "
                                "acceptable.",
                                {'volume_id': volume.id})

    def _attach_snapshot(self, ctxt, snapshot, properties):
        """Attach a snapshot."""

        try:
            conn = self.volume_rpcapi.initialize_connection_snapshot(
                ctxt, snapshot, properties)
            return self._connect_device(conn)
        except Exception:
            with excutils.save_and_reraise_exception():
                try:
                    self.volume_rpcapi.terminate_connection_snapshot(
                        ctxt, snapshot, properties, force=True)
                except Exception:
                    LOG.warning("Failed to terminate the connection "
                                "of snapshot %(snapshot_id)s, but it is "
                                "acceptable.",
                                {'snapshot_id': snapshot.id})

    def _connect_device(self, conn):
        """Establish connection to device."""
        use_multipath = CONF.use_multipath_for_image_xfer
        device_scan_attempts = CONF.num_volume_device_scan_tries
        protocol = conn['driver_volume_type']
        connector = volume_utils.brick_get_connector(
            protocol,
            use_multipath=use_multipath,
            device_scan_attempts=device_scan_attempts,
            conn=conn,
            expect_raw_disk=True)
        vol_handle = connector.connect_volume(conn['data'])

        return {'conn': conn, 'device': vol_handle, 'connector': connector}

    def _detach_device(self, ctxt, attach_info, device,
                       properties, is_snapshot=False, force=False,
                       ignore_errors=False):
        """Disconnect the volume or snapshot from the host. """
        connector = attach_info['connector']
        connector.disconnect_volume(attach_info['conn']['data'],
                                    attach_info['device'],
                                    force=force, ignore_errors=ignore_errors)
        rpcapi = self.volume_rpcapi
        if not is_snapshot:
            rpcapi.terminate_connection(ctxt, device, properties,
                                        force=force)
            rpcapi.remove_export(ctxt, device, sync=True)
        else:
            rpcapi.terminate_connection_snapshot(ctxt, device,
                                                 properties, force=force)
            rpcapi.remove_export_snapshot(ctxt, device, sync=True)

    def is_working(self):
        return self.is_initialized

    @periodic_task.periodic_task(
        spacing=CONF.backup_driver_stats_polling_interval)
    def publish_service_capabilities(self, context):
        """Collect driver status and then publish."""
        self._report_driver_status(context)
        self._publish_service_capabilities(context)

    def _report_driver_status(self, context):
        backup_stats = {
            'backend_state': self.is_working(),
            'driver_name': self.driver_name,
            'availability_zone': self.az
        }
        self.update_service_capabilities(backup_stats)