summaryrefslogtreecommitdiff
path: root/aiogreen/__init__.py
blob: c98ac4a40c9e8fe5e432e9663fc5df3a52aed736 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from eventlet import hubs
from trollius import futures
from trollius import selectors
from trollius import tasks
from trollius.base_events import BaseEventLoop
import eventlet.hubs.hub
import sys
import trollius
try:
    # Python 2
    import Queue as queue
except ImportError:
    import queue

threading = eventlet.patcher.original('threading')

_READ = eventlet.hubs.hub.READ
_WRITE = eventlet.hubs.hub.WRITE


def _is_main_thread():
    return isinstance(threading.current_thread(), threading._MainThread)


class EventLoopPolicy(trollius.AbstractEventLoopPolicy):
    def __init__(self):
        self._loop = None

    def get_event_loop(self):
        if not _is_main_thread():
            raise NotImplementedError("currently aiogreen can only run in the main thread")
        self._loop = EventLoop()
        return self._loop

    def new_event_loop(self):
        if self._loop is not None:
            raise NotImplementedError("cannot run two event loops in the same thread")
        return self.get_event_loop()

    def set_event_loop(self, loop):
        if self._loop is not None:
            raise NotImplementedError("cannot run two event loops in the same thread")
        self._loop = loop


# FIXME: is there a more efficient way to exchange data between two threads?
class ThreadQueue:
    def __init__(self, loop):
        self._loop = loop
        self._queue = queue.Queue()
        # FIXME: only schedule the consumer at the first call
        # to consume?
        self._greenthread = eventlet.spawn(self._consume)

    def _consume(self):
        while True:
            try:
                # FIXME: don't use polling
                stop, handle = self._queue.get(timeout=0.01)
            except eventlet.queue.Empty:
                eventlet.sleep(0)
                continue

            if stop:
                break
            self._loop._call_soon_handle(handle)

    def put(self, item):
        self._queue.put((False, item))

    def stop(self):
        self._queue.put((True, None))
        self._greenthread.wait()


class TimerHandle(trollius.Handle):
    def __init__(self, callback, args, loop):
        super(TimerHandle, self).__init__(callback, args, loop)
        self._timer = None

    def _run(self):
        super(TimerHandle, self)._run()

    def cancel(self):
        super(TimerHandle, self).cancel()
        self._timer.cancel()


class EventLoop(BaseEventLoop):
    def __init__(self):
        super(EventLoop, self).__init__()
        self._pool = eventlet.GreenPool()
        # Queue used by call_soon_threadsafe()
        self._queue = ThreadQueue(self)
        self._run = None
        if self.get_debug():
            hub = hubs.get_hub()
            hub.debug_blocking = True

    def time(self):
        # FIXME: is it safe to store the hub in an attribute of the event loop?
        # If yes, get the hub when the event loop is created
        hub = hubs.get_hub()
        return hub.clock()

    def _call(self, handle):
        if handle._cancelled:
            return
        handle._run()

    def _call_soon_handle(self, handle):
        self._pool.spawn(self._call, handle)

    def call_soon(self, callback, *args):
        handle = trollius.Handle(callback, args, self)
        self._call_soon_handle(handle)
        return handle

    def call_soon_threadsafe(self, callback, *args):
        handle = trollius.Handle(callback, args, self)
        self._queue.put(handle)
        return handle

    def call_later(self, delay, callback, *args):
        if 0:
            handle = TimerHandle(callback, args, self)

            # inline spawn_after() to get the timer object, to be able
            # to cancel directly the timer
            hub = hubs.get_hub()
            greenthread = eventlet.greenthread.GreenThread(hub.greenlet)
            timer = hub.schedule_call_global(delay, greenthread.switch,
                                             handle._run)

            handle._timer = timer
            return handle
        else:
            handle = trollius.Handle(callback, args, self)
            greenthread = eventlet.spawn_after(delay, self._call, handle)
            return handle

    def call_at(self, when, callback, *args):
        delay = when - self.time()
        return self.call_later(delay, callback, *args)

    # FIXME: run_in_executor(): use eventlet.tpool as the default executor?
    # It avoids the dependency to concurrent.futures, but later it would be
    # better to use concurrent.futures. So... What is the best?

    def stop(self):
        if self._run is None:
            # not running or stop already scheduled
            return
        self._run.send("stop")
        self._run = None

    def run_forever(self):
        if self._run is not None:
            raise RuntimeError("reentrant call to run_forever()")

        try:
            self._run = eventlet.event.Event()
            # use a local copy because stop() clears the attribute
            run = self._run
            run.wait()
        finally:
            self._run = None

    def close(self):
        super(EventLoop, self).close()
        self._queue.stop()

    def run_until_complete(self, future):
        # FIXME: don't copy/paste Trollius code, but
        # fix Trollius to call self.stop?
        self._check_closed()

        new_task = not isinstance(future, futures._FUTURE_CLASSES)
        future = tasks.async(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        def stop(fut):
            self.stop()

        future.add_done_callback(stop)
        self.run_forever()
        future.remove_done_callback(stop)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

    def _throwback(self):
        # FIXME: do something?
        pass

    def _add_fd(self, event_type, fd, callback, args):
        hub = hubs.get_hub()
        fd = selectors._fileobj_to_fd(fd)
        def func(fd):
            return callback(*args)
        hub.add(event_type, fd, func, self._throwback, None)

    def add_reader(self, fd, callback, *args):
        self._add_fd(_READ, fd, callback, args)

    def add_writer(self, fd, callback, *args):
        self._add_fd(_WRITE, fd, callback, args)

    def _remove_fd(self, event_type, fd):
        hub = hubs.get_hub()
        fd = selectors._fileobj_to_fd(fd)
        try:
            listener = hub.listeners[event_type][fd]
        except KeyError:
            return False
        hub.remove(listener)
        return True

    def remove_reader(self, fd):
        return self._remove_fd(_READ, fd)

    def remove_writer(self, fd):
        return self._remove_fd(_WRITE, fd)