summaryrefslogtreecommitdiff
path: root/Doc/includes/mp_synchronize.py
diff options
context:
space:
mode:
Diffstat (limited to 'Doc/includes/mp_synchronize.py')
-rw-r--r--Doc/includes/mp_synchronize.py278
1 files changed, 0 insertions, 278 deletions
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
deleted file mode 100644
index 81dbc387fc..0000000000
--- a/Doc/includes/mp_synchronize.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#
-# A test file for the `multiprocessing` package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import sys
-import random
-from queue import Empty
-
-import multiprocessing # may get overwritten
-
-
-#### TEST_VALUE
-
-def value_func(running, mutex):
- random.seed()
- time.sleep(random.random()*4)
-
- mutex.acquire()
- print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
- running.value -= 1
- mutex.release()
-
-def test_value():
- TASKS = 10
- running = multiprocessing.Value('i', TASKS)
- mutex = multiprocessing.Lock()
-
- for i in range(TASKS):
- p = multiprocessing.Process(target=value_func, args=(running, mutex))
- p.start()
-
- while running.value > 0:
- time.sleep(0.08)
- mutex.acquire()
- print(running.value, end=' ')
- sys.stdout.flush()
- mutex.release()
-
- print()
- print('No more running processes')
-
-
-#### TEST_QUEUE
-
-def queue_func(queue):
- for i in range(30):
- time.sleep(0.5 * random.random())
- queue.put(i*i)
- queue.put('STOP')
-
-def test_queue():
- q = multiprocessing.Queue()
-
- p = multiprocessing.Process(target=queue_func, args=(q,))
- p.start()
-
- o = None
- while o != 'STOP':
- try:
- o = q.get(timeout=0.3)
- print(o, end=' ')
- sys.stdout.flush()
- except Empty:
- print('TIMEOUT')
-
- print()
-
-
-#### TEST_CONDITION
-
-def condition_func(cond):
- cond.acquire()
- print('\t' + str(cond))
- time.sleep(2)
- print('\tchild is notifying')
- print('\t' + str(cond))
- cond.notify()
- cond.release()
-
-def test_condition():
- cond = multiprocessing.Condition()
-
- p = multiprocessing.Process(target=condition_func, args=(cond,))
- print(cond)
-
- cond.acquire()
- print(cond)
- cond.acquire()
- print(cond)
-
- p.start()
-
- print('main is waiting')
- cond.wait()
- print('main has woken up')
-
- print(cond)
- cond.release()
- print(cond)
- cond.release()
-
- p.join()
- print(cond)
-
-
-#### TEST_SEMAPHORE
-
-def semaphore_func(sema, mutex, running):
- sema.acquire()
-
- mutex.acquire()
- running.value += 1
- print(running.value, 'tasks are running')
- mutex.release()
-
- random.seed()
- time.sleep(random.random()*2)
-
- mutex.acquire()
- running.value -= 1
- print('%s has finished' % multiprocessing.current_process())
- mutex.release()
-
- sema.release()
-
-def test_semaphore():
- sema = multiprocessing.Semaphore(3)
- mutex = multiprocessing.RLock()
- running = multiprocessing.Value('i', 0)
-
- processes = [
- multiprocessing.Process(target=semaphore_func,
- args=(sema, mutex, running))
- for i in range(10)
- ]
-
- for p in processes:
- p.start()
-
- for p in processes:
- p.join()
-
-
-#### TEST_JOIN_TIMEOUT
-
-def join_timeout_func():
- print('\tchild sleeping')
- time.sleep(5.5)
- print('\n\tchild terminating')
-
-def test_join_timeout():
- p = multiprocessing.Process(target=join_timeout_func)
- p.start()
-
- print('waiting for process to finish')
-
- while 1:
- p.join(timeout=1)
- if not p.is_alive():
- break
- print('.', end=' ')
- sys.stdout.flush()
-
-
-#### TEST_EVENT
-
-def event_func(event):
- print('\t%r is waiting' % multiprocessing.current_process())
- event.wait()
- print('\t%r has woken up' % multiprocessing.current_process())
-
-def test_event():
- event = multiprocessing.Event()
-
- processes = [multiprocessing.Process(target=event_func, args=(event,))
- for i in range(5)]
-
- for p in processes:
- p.start()
-
- print('main is sleeping')
- time.sleep(2)
-
- print('main is setting event')
- event.set()
-
- for p in processes:
- p.join()
-
-
-#### TEST_SHAREDVALUES
-
-def sharedvalues_func(values, arrays, shared_values, shared_arrays):
- for i in range(len(values)):
- v = values[i][1]
- sv = shared_values[i].value
- assert v == sv
-
- for i in range(len(values)):
- a = arrays[i][1]
- sa = list(shared_arrays[i][:])
- assert a == sa
-
- print('Tests passed')
-
-def test_sharedvalues():
- values = [
- ('i', 10),
- ('h', -2),
- ('d', 1.25)
- ]
- arrays = [
- ('i', list(range(100))),
- ('d', [0.25 * i for i in range(100)]),
- ('H', list(range(1000)))
- ]
-
- shared_values = [multiprocessing.Value(id, v) for id, v in values]
- shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
-
- p = multiprocessing.Process(
- target=sharedvalues_func,
- args=(values, arrays, shared_values, shared_arrays)
- )
- p.start()
- p.join()
-
- assert p.exitcode == 0
-
-
-####
-
-def test(namespace=multiprocessing):
- global multiprocessing
-
- multiprocessing = namespace
-
- for func in [test_value, test_queue, test_condition,
- test_semaphore, test_join_timeout, test_event,
- test_sharedvalues]:
-
- print('\n\t######## %s\n' % func.__name__)
- func()
-
- ignore = multiprocessing.active_children() # cleanup any old processes
- if hasattr(multiprocessing, '_debug_info'):
- info = multiprocessing._debug_info()
- if info:
- print(info)
- raise ValueError('there should be no positive refcounts left')
-
-
-if __name__ == '__main__':
- multiprocessing.freeze_support()
-
- assert len(sys.argv) in (1, 2)
-
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
- print(' Using processes '.center(79, '-'))
- namespace = multiprocessing
- elif sys.argv[1] == 'manager':
- print(' Using processes and a manager '.center(79, '-'))
- namespace = multiprocessing.Manager()
- namespace.Process = multiprocessing.Process
- namespace.current_process = multiprocessing.current_process
- namespace.active_children = multiprocessing.active_children
- elif sys.argv[1] == 'threads':
- print(' Using threads '.center(79, '-'))
- import multiprocessing.dummy as namespace
- else:
- print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
- raise SystemExit(2)
-
- test(namespace)