summaryrefslogtreecommitdiff
path: root/morphlib/exts/docker.write
blob: c8e89bc196f9c0f6bffc71ea4b023201e37ea673 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/python
# Copyright (C) 2014  Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.


'''A Morph deployment write extension for deploying to Docker hosts

See docker.write.help for more information.

'''


import docker
import paramiko

import cliapp
import logging
import os
import Queue
import socket
import SocketServer
import tarfile
import threading
import time
import urlparse

import morphlib.writeexts


# Seconds to wait before raising a connection error.
CONNECTION_TIMEOUT = 10


class ChunkedTarfileAdapter(object):
    '''File-like object which allows batched writes.

    We need to send an entire system through a HTTP POST request. This might
    be big, so it must be streamed in chunks. This object buffers data written
    to it so that the 'requests' module can iterate through it and send each
    block as a HTTP chunk.

    '''

    # Some rough profiling with a 256.52MB system over Gb ethernet.
    # Morph machine was x86_32 VM with reasonable CPU and RAM.
    #
    #  no compression  bufsize=100KB  256.52MB in 9.04 seconds (28.39 MB/sec)
    #  no compression  bufsize=1MB    256.52MB in 11.45 seconds (22.4 MB/sec)
    #  gzip -1         bufsize=100KB  117.99MB in 19.34 seconds (6.10 MB/sec)
    #  no compression  bufsize=10MB   256.52MB in 65.57 seconds (3.91 MB/sec)
    #  gzip -1         bufsize=10MB   124.39MB in 77 sec (1.61 MB/sec)
    #  gzip -5         bufsize=10MB   117.99MB in 84.27 seconds (1.40 MB/sec)
    #  no compression  bufsize=100MB  took pretty much forever
    #
    # Ideally the buffer size would adapt to the available IO speed & free
    # memory. For now 100KB is OK.

    EXPECTED_BUFFER_SIZE = 100 * 1024

    def __init__(self, status_interval=0, status_callback=None):
        # Stream headers can involve several small writes (6 for gzip headers,
        # for example). Therefore the queue buffers up to 10 blocks here.
        self.queue = Queue.Queue(maxsize=10)

        self.eof = False
        self.exception = None

        self.start_time = None
        self.bytes_sent = 0

        self.status_interval = status_interval
        self.status_callback = status_callback

        self.last_status_time = time.time()

    def __iter__(self):
        '''Generator for reading the queued data chunks.

        This should be used from the main thread of the program.

        '''
        while True:
            try:
                data_chunk = self.queue.get(block=True, timeout=0.1)
                yield data_chunk
                self.bytes_sent += len(data_chunk)
            except Queue.Empty:
                pass

            if self.queue.empty() and self.eof:
                logging.debug('All data queued for transfer!')
                break
            elif self.exception is not None:
                # We may have received an abort() from the writing thread,
                # if so propagate it to the main loop.
                raise self.exception
            else:
                self.maybe_show_status()

    def write(self, data_chunk):
        '''Write a data chunk, blocking when the chunk queue is full.

        This can be called from a thread. If abort() is called, the exception
        will be passed on and raised to the thread that is calling write().

        '''
        if len(data_chunk) == 0:
            return
        if self.start_time is None:
            self.start_time = self.last_status_time = time.time()
        while True:
            if self.exception is not None:
                raise self.exception
            try:
                self.queue.put(data_chunk, block=True, timeout=0.1)
            except Queue.Full:
                pass
            else:
                return

    def abort(self, exception=None):
        '''Mark the transfer as failed.'''
        exception = exception or Exception('Unknown exception')
        self.exception = exception

    def close(self):
        '''Mark the transfer as successfully completed.'''
        self.eof = True

    def maybe_show_status(self):
        '''Show status if the status_interval has elapsed.'''
        if self.status_interval > 0 and self.status_callback is not None:
            now = time.time()
            if self.last_status_time + self.status_interval < now:
                self.last_status_time = now
                self.show_status()

    def show_status(self):
        '''Summarise the status of the transfer.'''
        if self.status_callback is not None:
            if self.start_time is None:
                message = 'Starting transfer'
            else:
                duration = time.time() - self.start_time
                megabytes = 1024 * 1024
                megabytes_written = self.bytes_sent / float(megabytes)
                message = '%0.2fMB transferred (%0.2f MB/sec)' % (
                    megabytes_written, megabytes_written / duration)
            self.status_callback(message)


class DockerWriteExtension(morphlib.writeexts.WriteExtension):
    def process_args(self, args):
        if len(args) != 2:
            raise cliapp.AppException('Wrong number of command line args')

        temp_root, location_string = args

        location = urlparse.urlparse(location_string)
        if location.scheme == 'docker':
            if len(location.netloc) == 0:
                # The extension may need to allow the user to specify a
                # different path for the Docker daemon's socket. That could
                # be done through an environment variable, or via a config
                # file. It's can't really be encoded in the 'location' URL.
                socket = 'http+unix://var/run/docker.sock'
                docker_client = self.open_local_unix_socket(socket)
            else:
                # Note that access to the Docker daemon's socket == root access
                # to the host. Thus, this socket should only ever be a
                # local-only TCP socket. It might be worth refusing to connect
                # if host != '127.0.0.1' to make this clear.
                docker_client = self.open_tcp_socket(location.netloc)
        elif location.scheme == 'docker+ssh':
            if len(location.netloc) == 0:
                raise cliapp.AppException("Missing host in URL '%s'" %
                        location_string)
            docker_client = self.open_tcp_socket_with_ssh_tunnel(
                    location.username, location.hostname, location.port)
        else:
            raise cliapp.AppException(
                "Sorry, currently this extension only supports docker:// "
                "and docker+ssh:// URIs. Got '%s://'." % location.scheme)
        image_name = location.path[1:]

        docker_client.ping()

        try:
            self.do_import(docker_client, temp_root, image_name)
        except Exception as e:
            import pdb, sys
            pdb.post_mortem(sys.exc_traceback)

        self.status(
            msg='Docker image %(image_name)s has been created',
            image_name=image_name)

    def open_local_unix_socket(self, socket):
        self.status(msg='Connecting to local Docker service at %s' % socket)
        return docker.Client(base_url=socket, timeout=CONNECTION_TIMEOUT)

    def open_tcp_socket(self, netloc):
        self.status(msg='Connecting to Docker service at tcp://%s' % netloc)
        return docker.Client(base_url='tcp://%s' % netloc,
                             timeout=CONNECTION_TIMEOUT)

    def setup_ssh_tunnel(self, user, host, port):
        client = paramiko.SSHClient()
        client.load_system_host_keys()

        client.connect(host, username=user)
        transport = client.get_transport()
        transport.request_port_forward('127.0.0.1', port)

        local_bind_port = port
        return local_bind_port, None #tunnel_thread

    def open_tcp_socket_with_ssh_tunnel(self, user, host, port=2375):
        self.status(msg='Connecting to remote Docker service at %s:%s' % (host, port))

        try:
            local_bind_port, tunnel_thread = self.setup_ssh_tunnel(user, host, port)
        except (socket.error, paramiko.SSHException) as e:
            raise cliapp.AppException('Failed to create SSH tunnel: %s' % e)

        docker_client = docker.Client(
            base_url='http://127.0.0.1:%d' % local_bind_port,
            timeout=CONNECTION_TIMEOUT)

        return docker_client

    def do_import(self, docker_client, temp_root, image_name):
        def display_transfer_status(message):
            self.status(msg=message)

        tar_stream = ChunkedTarfileAdapter(
            status_interval=1, status_callback=display_transfer_status)
        tar_thread = threading.Thread(
            target=self.stream_system_as_tar, args=[temp_root, tar_stream])
        tar_thread.start()

        try:
            response = docker_client.import_image_from_stream(
                src=tar_stream,
                repository=image_name
            )
        except BaseException as e:
            logging.debug('Received %r while sending image', e)
            tar_stream.abort(e)
            raise

        tar_stream.show_status()
        logging.debug('Transfer complete! Response %s', response)

        if response.status_code != 200:
            raise cliapp.AppException(
                'Request to Docker daemon failed: %s (code %i)' %
                (response.reason, response.status_code))

    def stream_system_as_tar(self, fs_root, chunked_stream):
        def make_relative(tarinfo):
            old_name = os.path.join('/', tarinfo.name)
            tarinfo.name = os.path.relpath(old_name, fs_root)
            #print 'Tarinfo name %s, from path %s root %s' % (tarinfo.name, old_name, fs_root)
            if tarinfo.islnk():
                old_linkname = os.path.join('/', tarinfo.linkname)
                tarinfo.linkname = os.path.relpath(old_linkname, fs_root)
            return tarinfo

        try:
            tar_stream = tarfile.TarFile.open(
                name='docker.write-temp',
                mode='w|',
                bufsize=chunked_stream.EXPECTED_BUFFER_SIZE,
                fileobj=chunked_stream)

            logging.debug("Creating tar of rootfs")
            tar_stream.add(fs_root, recursive=True, filter=make_relative)
            tar_stream.close()
            logging.debug('Tar complete')
        except BaseException as e:
            logging.debug('Tar thread: Received %r', e)
            chunked_stream.abort(
                cliapp.AppException('Error received in tar thread: %s' % e))
        else:
            chunked_stream.close()


DockerWriteExtension().run()