diff options
Diffstat (limited to 'src/lib/ecore/ecore_thread.c')
-rw-r--r-- | src/lib/ecore/ecore_thread.c | 639 |
1 files changed, 240 insertions, 399 deletions
diff --git a/src/lib/ecore/ecore_thread.c b/src/lib/ecore/ecore_thread.c index abef08f4ad..8c95dd96da 100644 --- a/src/lib/ecore/ecore_thread.c +++ b/src/lib/ecore/ecore_thread.c @@ -15,30 +15,30 @@ #include "Ecore.h" #include "ecore_private.h" -# define LK(x) Eina_Lock x -# define LKI(x) eina_lock_new(&(x)) -# define LKD(x) eina_lock_free(&(x)) -# define LKL(x) eina_lock_take(&(x)) -# define LKU(x) eina_lock_release(&(x)) - -# define SLK(x) Eina_Spinlock x -# define SLKI(x) eina_spinlock_new(&(x)) -# define SLKD(x) eina_spinlock_free(&(x)) -# define SLKL(x) eina_spinlock_take(&(x)) -# define SLKU(x) eina_spinlock_release(&(x)) - -# define CD(x) Eina_Condition x -# define CDI(x, m) eina_condition_new(&(x), &(m)) -# define CDD(x) eina_condition_free(&(x)) -# define CDB(x) eina_condition_broadcast(&(x)) -# define CDW(x, t) eina_condition_timedwait(&(x), t) - -# define LRWK(x) Eina_RWLock x -# define LRWKI(x) eina_rwlock_new(&(x)); -# define LRWKD(x) eina_rwlock_free(&(x)); -# define LRWKWL(x) eina_rwlock_take_write(&(x)); -# define LRWKRL(x) eina_rwlock_take_read(&(x)); -# define LRWKU(x) eina_rwlock_release(&(x)); +# define LK(x) Eina_Lock x +# define LKI(x) eina_lock_new(&(x)) +# define LKD(x) eina_lock_free(&(x)) +# define LKL(x) eina_lock_take(&(x)) +# define LKU(x) eina_lock_release(&(x)) + +# define SLK(x) Eina_Spinlock x +# define SLKI(x) eina_spinlock_new(&(x)) +# define SLKD(x) eina_spinlock_free(&(x)) +# define SLKL(x) eina_spinlock_take(&(x)) +# define SLKU(x) eina_spinlock_release(&(x)) + +# define CD(x) Eina_Condition x +# define CDI(x, m) eina_condition_new(&(x), &(m)) +# define CDD(x) eina_condition_free(&(x)) +# define CDB(x) eina_condition_broadcast(&(x)) +# define CDW(x, t) eina_condition_timedwait(&(x), t) + +# define LRWK(x) Eina_RWLock x +# define LRWKI(x) eina_rwlock_new(&(x)); +# define LRWKD(x) eina_rwlock_free(&(x)); +# define LRWKWL(x) eina_rwlock_take_write(&(x)); +# define LRWKRL(x) eina_rwlock_take_read(&(x)); +# define LRWKU(x) eina_rwlock_release(&(x)); # define PH(x) Eina_Thread x # define PHE(x, y) eina_thread_equal(x, y) @@ -49,13 +49,13 @@ typedef struct _Ecore_Pthread_Worker Ecore_Pthread_Worker; typedef struct _Ecore_Pthread Ecore_Pthread; typedef struct _Ecore_Thread_Data Ecore_Thread_Data; -typedef struct _Ecore_Thread_Waiter Ecore_Thread_Waiter; +typedef struct _Ecore_Thread_Waiter Ecore_Thread_Waiter; struct _Ecore_Thread_Waiter { Ecore_Thread_Cb func_cancel; Ecore_Thread_Cb func_end; - Eina_Bool waiting; + Eina_Bool waiting; }; struct _Ecore_Thread_Data @@ -66,7 +66,8 @@ struct _Ecore_Thread_Data struct _Ecore_Pthread_Worker { - union { + union + { struct { Ecore_Thread_Cb func_blocking; @@ -81,14 +82,16 @@ struct _Ecore_Pthread_Worker int send; int received; } feedback_run; - struct { - Ecore_Thread_Cb func_main; + struct + { + Ecore_Thread_Cb func_main; Ecore_Thread_Notify_Cb func_notify; Ecore_Pipe *send; Ecore_Pthread_Worker *direct_worker; - struct { + struct + { int send; int received; } from, to; @@ -96,49 +99,50 @@ struct _Ecore_Pthread_Worker } u; Ecore_Thread_Waiter *waiter; - Ecore_Thread_Cb func_cancel; - Ecore_Thread_Cb func_end; - PH(self); - Eina_Hash *hash; - CD(cond); - LK(mutex); + Ecore_Thread_Cb func_cancel; + Ecore_Thread_Cb func_end; + PH(self); + Eina_Hash *hash; + CD(cond); + LK(mutex); - const void *data; + const void *data; - int cancel; + int cancel; SLK(cancel_mutex); - Eina_Bool message_run : 1; - Eina_Bool feedback_run : 1; - Eina_Bool kill : 1; - Eina_Bool reschedule : 1; - Eina_Bool no_queue : 1; + Eina_Bool message_run : 1; + Eina_Bool feedback_run : 1; + Eina_Bool kill : 1; + Eina_Bool reschedule : 1; + Eina_Bool no_queue : 1; }; typedef struct _Ecore_Pthread_Notify Ecore_Pthread_Notify; struct _Ecore_Pthread_Notify { Ecore_Pthread_Worker *work; - const void *user_data; + const void *user_data; }; -typedef void *(*Ecore_Thread_Sync_Cb)(void* data, Ecore_Thread *thread); +typedef void *(*Ecore_Thread_Sync_Cb)(void *data, Ecore_Thread *thread); typedef struct _Ecore_Pthread_Message Ecore_Pthread_Message; struct _Ecore_Pthread_Message { - union { - Ecore_Thread_Cb async; + union + { + Ecore_Thread_Cb async; Ecore_Thread_Sync_Cb sync; } u; const void *data; - int code; + int code; - Eina_Bool callback : 1; - Eina_Bool sync : 1; + Eina_Bool callback : 1; + Eina_Bool sync : 1; }; static int _ecore_thread_count_max = 0; @@ -146,6 +150,7 @@ static int _ecore_thread_count_max = 0; static void _ecore_thread_handler(void *data); static int _ecore_thread_count = 0; +static int _ecore_thread_count_no_queue = 0; static Eina_List *_ecore_running_job = NULL; static Eina_List *_ecore_pending_job_threads = NULL; @@ -208,7 +213,7 @@ _ecore_thread_data_free(void *data) free(d); } -static void +void _ecore_thread_join(PH(thread)) { DBG("joining thread=%" PRIu64, (uint64_t)thread); @@ -261,6 +266,7 @@ static void _ecore_nothing_handler(void *data EINA_UNUSED, void *buffer EINA_UNUSED, unsigned int nbyte EINA_UNUSED) { } + #endif static void @@ -268,7 +274,7 @@ _ecore_notify_handler(void *data) { Ecore_Pthread_Notify *notify = data; Ecore_Pthread_Worker *work = notify->work; - void *user_data = (void*) notify->user_data; + void *user_data = (void *)notify->user_data; work->u.feedback_run.received++; @@ -289,7 +295,7 @@ _ecore_message_notify_handler(void *data) { Ecore_Pthread_Notify *notify = data; Ecore_Pthread_Worker *work = notify->work; - Ecore_Pthread_Message *user_data = (void *) notify->user_data; + Ecore_Pthread_Message *user_data = (void *)notify->user_data; Eina_Bool delete = EINA_TRUE; work->u.message_run.from.received++; @@ -297,13 +303,13 @@ _ecore_message_notify_handler(void *data) if (!user_data->callback) { if (work->u.message_run.func_notify) - work->u.message_run.func_notify((void *) work->data, (Ecore_Thread *) work, (void *) user_data->data); + work->u.message_run.func_notify((void *)work->data, (Ecore_Thread *)work, (void *)user_data->data); } else { if (user_data->sync) { - user_data->data = user_data->u.sync((void*) user_data->data, (Ecore_Thread *) work); + user_data->data = user_data->u.sync((void *)user_data->data, (Ecore_Thread *)work); user_data->callback = EINA_FALSE; user_data->code = INT_MAX; ecore_pipe_write(work->u.message_run.send, &user_data, sizeof (Ecore_Pthread_Message *)); @@ -312,7 +318,7 @@ _ecore_message_notify_handler(void *data) } else { - user_data->u.async((void*) user_data->data, (Ecore_Thread *) work); + user_data->u.async((void *)user_data->data, (Ecore_Thread *)work); } } @@ -376,7 +382,7 @@ _ecore_short_job(PH(thread)) SLKL(_ecore_running_job_mutex); _ecore_running_job = eina_list_append(_ecore_running_job, work); SLKU(_ecore_running_job_mutex); - + SLKL(work->cancel_mutex); cancel = work->cancel; SLKU(work->cancel_mutex); @@ -384,7 +390,7 @@ _ecore_short_job(PH(thread)) EINA_THREAD_CLEANUP_PUSH(_ecore_short_job_cleanup, work); if (!cancel) - work->u.short_run.func_blocking((void *) work->data, (Ecore_Thread*) work); + work->u.short_run.func_blocking((void *)work->data, (Ecore_Thread *)work); eina_thread_cancellable_set(EINA_FALSE, NULL); EINA_THREAD_CLEANUP_POP(EINA_TRUE); } @@ -419,15 +425,15 @@ _ecore_feedback_job(PH(thread)) { Ecore_Pthread_Worker *work; int cancel; - + SLKL(_ecore_pending_job_threads_mutex); - + if (!_ecore_pending_job_threads_feedback) { SLKU(_ecore_pending_job_threads_mutex); return; } - + work = eina_list_data_get(_ecore_pending_job_threads_feedback); _ecore_pending_job_threads_feedback = eina_list_remove_list(_ecore_pending_job_threads_feedback, _ecore_pending_job_threads_feedback); @@ -435,7 +441,7 @@ _ecore_feedback_job(PH(thread)) SLKL(_ecore_running_job_mutex); _ecore_running_job = eina_list_append(_ecore_running_job, work); SLKU(_ecore_running_job_mutex); - + SLKL(work->cancel_mutex); cancel = work->cancel; SLKU(work->cancel_mutex); @@ -443,7 +449,7 @@ _ecore_feedback_job(PH(thread)) EINA_THREAD_CLEANUP_PUSH(_ecore_feedback_job_cleanup, work); if (!cancel) - work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work); + work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); eina_thread_cancellable_set(EINA_FALSE, NULL); EINA_THREAD_CLEANUP_POP(EINA_TRUE); } @@ -455,10 +461,13 @@ _ecore_direct_worker_cleanup(void *data) DBG("cleanup work=%p, thread=%" PRIu64 " (should join)", work, (uint64_t)work->self); + SLKL(_ecore_pending_job_threads_mutex); + _ecore_thread_count_no_queue--; ecore_main_loop_thread_safe_call_async(_ecore_thread_handler, work); - ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join, - (void*)(intptr_t)PHS()); + ecore_main_loop_thread_safe_call_async((Ecore_Cb)_ecore_thread_join, + (void *)(intptr_t)PHS()); + SLKU(_ecore_pending_job_threads_mutex); } static void * @@ -470,9 +479,9 @@ _ecore_direct_worker(Ecore_Pthread_Worker *work) EINA_THREAD_CLEANUP_PUSH(_ecore_direct_worker_cleanup, work); if (work->message_run) - work->u.message_run.func_main((void *) work->data, (Ecore_Thread *) work); + work->u.message_run.func_main((void *)work->data, (Ecore_Thread *)work); else - work->u.feedback_run.func_heavy((void *) work->data, (Ecore_Thread *) work); + work->u.feedback_run.func_heavy((void *)work->data, (Ecore_Thread *)work); eina_thread_cancellable_set(EINA_FALSE, NULL); EINA_THREAD_CLEANUP_POP(EINA_TRUE); @@ -485,9 +494,9 @@ _ecore_thread_worker_cleanup(void *data EINA_UNUSED) DBG("cleanup thread=%" PRIuPTR " (should join)", PHS()); SLKL(_ecore_pending_job_threads_mutex); _ecore_thread_count--; + ecore_main_loop_thread_safe_call_async((Ecore_Cb)_ecore_thread_join, + (void *)(intptr_t)PHS()); SLKU(_ecore_pending_job_threads_mutex); - ecore_main_loop_thread_safe_call_async((Ecore_Cb) _ecore_thread_join, - (void*)(intptr_t)PHS()); } static void * @@ -541,14 +550,14 @@ _ecore_thread_worker_new(void) result = eina_trash_pop(&_ecore_thread_worker_trash); - if (!result) + if (!result) { - result = calloc(1, sizeof(Ecore_Pthread_Worker)); - _ecore_thread_worker_count++; + result = calloc(1, sizeof(Ecore_Pthread_Worker)); + _ecore_thread_worker_count++; } else { - memset(result, 0, sizeof(Ecore_Pthread_Worker)); + memset(result, 0, sizeof(Ecore_Pthread_Worker)); } SLKI(result->cancel_mutex); @@ -576,78 +585,81 @@ void _ecore_thread_shutdown(void) { /* FIXME: If function are still running in the background, should we kill them ? */ - Ecore_Pthread_Worker *work; - Eina_List *l; - Eina_Bool test; - int iteration = 0; + Ecore_Pthread_Worker *work; + Eina_List *l; + Eina_Bool test; + int iteration = 0; - SLKL(_ecore_pending_job_threads_mutex); + SLKL(_ecore_pending_job_threads_mutex); - EINA_LIST_FREE(_ecore_pending_job_threads, work) - { - if (work->func_cancel) - work->func_cancel((void *)work->data, (Ecore_Thread *) work); - free(work); - } + EINA_LIST_FREE(_ecore_pending_job_threads, work) + { + if (work->func_cancel) + work->func_cancel((void *)work->data, (Ecore_Thread *)work); + free(work); + } - EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work) - { - if (work->func_cancel) - work->func_cancel((void *)work->data, (Ecore_Thread *) work); - free(work); - } + EINA_LIST_FREE(_ecore_pending_job_threads_feedback, work) + { + if (work->func_cancel) + work->func_cancel((void *)work->data, (Ecore_Thread *)work); + free(work); + } - SLKU(_ecore_pending_job_threads_mutex); - SLKL(_ecore_running_job_mutex); + SLKU(_ecore_pending_job_threads_mutex); + SLKL(_ecore_running_job_mutex); - EINA_LIST_FOREACH(_ecore_running_job, l, work) - ecore_thread_cancel((Ecore_Thread*) work); + EINA_LIST_FOREACH(_ecore_running_job, l, work) + ecore_thread_cancel((Ecore_Thread *)work); - SLKU(_ecore_running_job_mutex); + SLKU(_ecore_running_job_mutex); - do - { - SLKL(_ecore_pending_job_threads_mutex); - if (_ecore_thread_count > 0) - { - test = EINA_TRUE; - } - else - { - test = EINA_FALSE; - } - SLKU(_ecore_pending_job_threads_mutex); - iteration++; - if (test) usleep(50000); - } - while (test == EINA_TRUE && iteration < 20); - - if (iteration == 20 && _ecore_thread_count > 0) - { - ERR("%i of the child thread are still running after 1s. This can lead to a segv. Sorry.", _ecore_thread_count); - } + do + { + SLKL(_ecore_pending_job_threads_mutex); + if (_ecore_thread_count + _ecore_thread_count_no_queue > 0) + { + test = EINA_TRUE; + } + else + { + test = EINA_FALSE; + } + SLKU(_ecore_pending_job_threads_mutex); + iteration++; + if (test) + { + _ecore_main_call_flush(); + usleep(1000); + } + } while (test == EINA_TRUE && iteration < 50); + + if (iteration == 20 && _ecore_thread_count > 0) + { + ERR("%i of the child thread are still running after 1s. This can lead to a segv. Sorry.", _ecore_thread_count); + } - if (_ecore_thread_global_hash) - eina_hash_free(_ecore_thread_global_hash); - have_main_loop_thread = 0; + if (_ecore_thread_global_hash) + eina_hash_free(_ecore_thread_global_hash); + have_main_loop_thread = 0; - while ((work = eina_trash_pop(&_ecore_thread_worker_trash))) - { - free(work); - } - - SLKD(_ecore_pending_job_threads_mutex); - LRWKD(_ecore_thread_global_hash_lock); - LKD(_ecore_thread_global_hash_mutex); - SLKD(_ecore_running_job_mutex); - CDD(_ecore_thread_global_hash_cond); + while ((work = eina_trash_pop(&_ecore_thread_worker_trash))) + { + free(work); + } + + SLKD(_ecore_pending_job_threads_mutex); + LRWKD(_ecore_thread_global_hash_lock); + LKD(_ecore_thread_global_hash_mutex); + SLKD(_ecore_running_job_mutex); + CDD(_ecore_thread_global_hash_cond); } EAPI Ecore_Thread * ecore_thread_run(Ecore_Thread_Cb func_blocking, Ecore_Thread_Cb func_end, Ecore_Thread_Cb func_cancel, - const void *data) + const void *data) { Ecore_Pthread_Worker *work; Eina_Bool tried = EINA_FALSE; @@ -695,18 +707,18 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking, SLKL(_ecore_pending_job_threads_mutex); - retry: +retry: if (PHC(thread, _ecore_thread_worker, NULL)) { _ecore_thread_count++; - SLKU(_ecore_pending_job_threads_mutex); + SLKU(_ecore_pending_job_threads_mutex); return (Ecore_Thread *)work; } if (!tried) { - _ecore_main_call_flush(); - tried = EINA_TRUE; - goto retry; + _ecore_main_call_flush(); + tried = EINA_TRUE; + goto retry; } if (_ecore_thread_count == 0) @@ -714,9 +726,9 @@ ecore_thread_run(Ecore_Thread_Cb func_blocking, _ecore_pending_job_threads = eina_list_remove(_ecore_pending_job_threads, work); if (work->func_cancel) - work->func_cancel((void *) work->data, (Ecore_Thread *) work); + work->func_cancel((void *)work->data, (Ecore_Thread *)work); - _ecore_thread_worker_free(work); + _ecore_thread_worker_free(work); work = NULL; } SLKU(_ecore_pending_job_threads_mutex); @@ -793,7 +805,7 @@ ecore_thread_cancel(Ecore_Thread *thread) work = (Ecore_Pthread_Worker *)thread; /* Delay the destruction */ - on_exit: +on_exit: eina_thread_cancel(work->self); /* noop unless eina_thread_cancellable_set() was used by user */ SLKL(work->cancel_mutex); work->cancel = EINA_TRUE; @@ -802,7 +814,6 @@ ecore_thread_cancel(Ecore_Thread *thread) return EINA_FALSE; } - static void _ecore_thread_wait_reset(Ecore_Thread_Waiter *waiter, Ecore_Pthread_Worker *worker) @@ -819,7 +830,7 @@ _ecore_thread_wait_reset(Ecore_Thread_Waiter *waiter, static void _ecore_thread_wait_cancel(void *data EINA_UNUSED, Ecore_Thread *thread) { - Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread; + Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Waiter *waiter = worker->waiter; if (waiter->func_cancel) waiter->func_cancel(data, thread); @@ -829,7 +840,7 @@ _ecore_thread_wait_cancel(void *data EINA_UNUSED, Ecore_Thread *thread) static void _ecore_thread_wait_end(void *data EINA_UNUSED, Ecore_Thread *thread) { - Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread; + Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Waiter *waiter = worker->waiter; if (waiter->func_end) waiter->func_end(data, thread); @@ -839,7 +850,7 @@ _ecore_thread_wait_end(void *data EINA_UNUSED, Ecore_Thread *thread) EAPI Eina_Bool ecore_thread_wait(Ecore_Thread *thread, double wait) { - Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker*) thread; + Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Waiter waiter; if (!thread) return EINA_TRUE; @@ -858,7 +869,8 @@ ecore_thread_wait(Ecore_Thread *thread, double wait) double start, end; start = ecore_time_get(); - ecore_main_loop_thread_safe_call_wait(wait); + _ecore_main_call_flush(); + ecore_main_loop_thread_safe_call_wait(0.0001); end = ecore_time_get(); wait -= end - start; @@ -880,7 +892,7 @@ ecore_thread_wait(Ecore_Thread *thread, double wait) EAPI Eina_Bool ecore_thread_check(Ecore_Thread *thread) { - Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *) thread; + Ecore_Pthread_Worker *volatile worker = (Ecore_Pthread_Worker *)thread; int cancel; if (!worker) return EINA_TRUE; @@ -888,21 +900,21 @@ ecore_thread_check(Ecore_Thread *thread) cancel = worker->cancel; /* FIXME: there is an insane bug driving me nuts here. I don't know if - it's a race condition, some cache issue or some alien attack on our software. - But ecore_thread_check will only work correctly with a printf, all the volatile, - lock and even usleep don't help here... */ + it's a race condition, some cache issue or some alien attack on our software. + But ecore_thread_check will only work correctly with a printf, all the volatile, + lock and even usleep don't help here... */ /* fprintf(stderr, "wc: %i\n", cancel); */ SLKU(worker->cancel_mutex); return cancel; } EAPI Ecore_Thread * -ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, +ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, Ecore_Thread_Notify_Cb func_notify, - Ecore_Thread_Cb func_end, - Ecore_Thread_Cb func_cancel, - const void *data, - Eina_Bool try_no_queue) + Ecore_Thread_Cb func_end, + Ecore_Thread_Cb func_cancel, + const void *data, + Eina_Bool try_no_queue) { Ecore_Pthread_Worker *worker; Eina_Bool tried = EINA_FALSE; @@ -942,15 +954,20 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, eina_threads_init(); - retry_direct: +retry_direct: if (PHC(t, _ecore_direct_worker, worker)) - return (Ecore_Thread *)worker; - if (!tried) - { - _ecore_main_call_flush(); - tried = EINA_TRUE; - goto retry_direct; - } + { + SLKL(_ecore_pending_job_threads_mutex); + _ecore_thread_count_no_queue++; + SLKU(_ecore_pending_job_threads_mutex); + return (Ecore_Thread *)worker; + } + if (!tried) + { + _ecore_main_call_flush(); + tried = EINA_TRUE; + goto retry_direct; + } if (worker->u.feedback_run.direct_worker) { @@ -978,18 +995,18 @@ ecore_thread_feedback_run(Ecore_Thread_Cb func_heavy, eina_threads_init(); SLKL(_ecore_pending_job_threads_mutex); - retry: +retry: if (PHC(thread, _ecore_thread_worker, NULL)) { _ecore_thread_count++; - SLKU(_ecore_pending_job_threads_mutex); + SLKU(_ecore_pending_job_threads_mutex); return (Ecore_Thread *)worker; } if (!tried) { _ecore_main_call_flush(); - tried = EINA_TRUE; - goto retry; + tried = EINA_TRUE; + goto retry; } SLKU(_ecore_pending_job_threads_mutex); @@ -1019,7 +1036,7 @@ on_error: EAPI Eina_Bool ecore_thread_feedback(Ecore_Thread *thread, - const void *data) + const void *data) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; @@ -1072,60 +1089,61 @@ ecore_thread_feedback(Ecore_Thread *thread, #if 0 EAPI Ecore_Thread * ecore_thread_message_run(Ecore_Thread_Cb func_main, - Ecore_Thread_Notify_Cb func_notify, - Ecore_Thread_Cb func_end, - Ecore_Thread_Cb func_cancel, - const void *data) + Ecore_Thread_Notify_Cb func_notify, + Ecore_Thread_Cb func_end, + Ecore_Thread_Cb func_cancel, + const void *data) { - Ecore_Pthread_Worker *worker; - PH(t); + Ecore_Pthread_Worker *worker; + PH(t); - if (!func_main) return NULL; + if (!func_main) return NULL; - worker = _ecore_thread_worker_new(); - if (!worker) return NULL; + worker = _ecore_thread_worker_new(); + if (!worker) return NULL; - worker->u.message_run.func_main = func_main; - worker->u.message_run.func_notify = func_notify; - worker->u.message_run.direct_worker = _ecore_thread_worker_new(); - worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker); - worker->u.message_run.from.send = 0; - worker->u.message_run.from.received = 0; - worker->u.message_run.to.send = 0; - worker->u.message_run.to.received = 0; + worker->u.message_run.func_main = func_main; + worker->u.message_run.func_notify = func_notify; + worker->u.message_run.direct_worker = _ecore_thread_worker_new(); + worker->u.message_run.send = ecore_pipe_add(_ecore_nothing_handler, worker); + worker->u.message_run.from.send = 0; + worker->u.message_run.from.received = 0; + worker->u.message_run.to.send = 0; + worker->u.message_run.to.received = 0; - ecore_pipe_freeze(worker->u.message_run.send); + ecore_pipe_freeze(worker->u.message_run.send); - worker->func_cancel = func_cancel; - worker->func_end = func_end; - worker->hash = NULL; - worker->data = data; + worker->func_cancel = func_cancel; + worker->func_end = func_end; + worker->hash = NULL; + worker->data = data; - worker->cancel = EINA_FALSE; - worker->message_run = EINA_TRUE; - worker->feedback_run = EINA_FALSE; - worker->kill = EINA_FALSE; - worker->reschedule = EINA_FALSE; - worker->no_queue = EINA_FALSE; - worker->self = 0; + worker->cancel = EINA_FALSE; + worker->message_run = EINA_TRUE; + worker->feedback_run = EINA_FALSE; + worker->kill = EINA_FALSE; + worker->reschedule = EINA_FALSE; + worker->no_queue = EINA_FALSE; + worker->self = 0; - eina_threads_init(); + eina_threads_init(); - if (PHC(t, _ecore_direct_worker, worker)) - return (Ecore_Thread*) worker; + if (PHC(t, _ecore_direct_worker, worker)) + return (Ecore_Thread *)worker; - eina_threads_shutdown(); + eina_threads_shutdown(); - if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker); - if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send); + if (worker->u.message_run.direct_worker) _ecore_thread_worker_free(worker->u.message_run.direct_worker); + if (worker->u.message_run.send) ecore_pipe_del(worker->u.message_run.send); - CDD(worker->cond); - LKD(worker->mutex); + CDD(worker->cond); + LKD(worker->mutex); - func_cancel((void *) data, NULL); + func_cancel((void *)data, NULL); - return NULL; + return NULL; } + #endif EAPI Eina_Bool @@ -1222,10 +1240,10 @@ ecore_thread_available_get(void) EAPI Eina_Bool ecore_thread_local_data_add(Ecore_Thread *thread, - const char *key, - void *value, - Eina_Free_Cb cb, - Eina_Bool direct) + const char *key, + void *value, + Eina_Free_Cb cb, + Eina_Bool direct) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Data *d; @@ -1260,9 +1278,9 @@ ecore_thread_local_data_add(Ecore_Thread *thread, EAPI void * ecore_thread_local_data_set(Ecore_Thread *thread, - const char *key, - void *value, - Eina_Free_Cb cb) + const char *key, + void *value, + Eina_Free_Cb cb) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Data *d, *r; @@ -1271,7 +1289,6 @@ ecore_thread_local_data_set(Ecore_Thread *thread, if ((!thread) || (!key) || (!value)) return NULL; - LKL(worker->mutex); if (!worker->hash) worker->hash = eina_hash_string_small_new(_ecore_thread_data_free); @@ -1293,16 +1310,16 @@ ecore_thread_local_data_set(Ecore_Thread *thread, if (r) { - ret = r->data; - free(r); - return ret; + ret = r->data; + free(r); + return ret; } return NULL; } EAPI void * ecore_thread_local_data_find(Ecore_Thread *thread, - const char *key) + const char *key) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Ecore_Thread_Data *d; @@ -1323,7 +1340,7 @@ ecore_thread_local_data_find(Ecore_Thread *thread, EAPI Eina_Bool ecore_thread_local_data_del(Ecore_Thread *thread, - const char *key) + const char *key) { Ecore_Pthread_Worker *worker = (Ecore_Pthread_Worker *)thread; Eina_Bool r; @@ -1341,10 +1358,10 @@ ecore_thread_local_data_del(Ecore_Thread *thread, } EAPI Eina_Bool -ecore_thread_global_data_add(const char *key, - void *value, +ecore_thread_global_data_add(const char *key, + void *value, Eina_Free_Cb cb, - Eina_Bool direct) + Eina_Bool direct) { Ecore_Thread_Data *d; Eina_Bool ret; @@ -1380,8 +1397,8 @@ ecore_thread_global_data_add(const char *key, } EAPI void * -ecore_thread_global_data_set(const char *key, - void *value, +ecore_thread_global_data_set(const char *key, + void *value, Eina_Free_Cb cb) { Ecore_Thread_Data *d, *r; @@ -1455,7 +1472,7 @@ ecore_thread_global_data_del(const char *key) EAPI void * ecore_thread_global_data_wait(const char *key, - double seconds) + double seconds) { double tm = 0; Ecore_Thread_Data *ret = NULL; @@ -1484,179 +1501,3 @@ ecore_thread_global_data_wait(const char *key, return NULL; } -typedef struct _Ecore_Thread_Set Ecore_Thread_Set; -struct _Ecore_Thread_Set -{ - Eo *obj; - - union { - struct { - void *v; - Eina_Free_Cb free_cb; - } value; - Eina_Error error; - } u; - - Eina_Bool success; -}; - -static void -_ecore_thread_main_loop_set(void *data) -{ - Ecore_Thread_Set *dt = data; - - if (dt->success) - efl_promise_value_set(efl_super(dt->obj, EFL_OBJECT_OVERRIDE_CLASS), - dt->u.value.v, dt->u.value.free_cb); - else - efl_promise_failed_set(efl_super(dt->obj, EFL_OBJECT_OVERRIDE_CLASS), - dt->u.error); - - efl_unref(dt->obj); - free(dt); -} - -static Ecore_Thread_Set * -_ecore_thread_set_new(Eo *obj) -{ - Ecore_Thread_Set *dt; - - dt = calloc(1, sizeof (Ecore_Thread_Set)); - if (!dt) return NULL; - - dt->obj = efl_ref(obj); - - return dt; -} - -static void -_ecore_thread_value_set(Eo *obj, const void *data EINA_UNUSED, void *v, Eina_Free_Cb free_cb) -{ - Ecore_Thread_Set *dt; - - dt = _ecore_thread_set_new(obj); - if (!dt) return ; - - dt->success = EINA_TRUE; - dt->u.value.v = v; - dt->u.value.free_cb = free_cb; - - ecore_main_loop_thread_safe_call_async(_ecore_thread_main_loop_set, dt); -} - -static void -_ecore_thread_failed_set(Eo *obj, const void *data EINA_UNUSED, Eina_Error err) -{ - Ecore_Thread_Set *dt; - - dt = _ecore_thread_set_new(obj); - if (!dt) return ; - - dt->success = EINA_FALSE; - dt->u.error = err; - - ecore_main_loop_thread_safe_call_async(_ecore_thread_main_loop_set, dt); -} - -typedef struct _Ecore_Thread_Progress Ecore_Thread_Progress; -struct _Ecore_Thread_Progress -{ - Eo *obj; - const void *p; -}; - -static void * -_ecore_thread_progress_sync(void *data) -{ - Ecore_Thread_Progress *p = data; - - efl_promise_progress_set(efl_super(p->obj, EFL_OBJECT_OVERRIDE_CLASS), p->p); - - return NULL; -} - -static void -_ecore_thread_progress_set(Eo *obj, const void *data EINA_UNUSED, const void *p) -{ - Ecore_Thread_Progress ip = { efl_ref(obj), p }; - - ecore_main_loop_thread_safe_call_sync(_ecore_thread_progress_sync, &ip); - efl_unref(obj); -} - -static void -_ecore_thread_future_heavy(void *dp, Ecore_Thread *thread) -{ - Ecore_Thread_Future_Cb heavy; - const void *data; - Eo *p = dp; - - heavy = efl_key_data_get(p, "_ecore_thread.heavy"); - data = efl_key_data_get(p, "_ecore_thread.data"); - - heavy(data, p, thread); -} - -static void -_ecore_thread_future_end(void *dp, Ecore_Thread *thread EINA_UNUSED) -{ - Eina_Free_Cb free_cb; - void *data; - Eo *p = dp; - - free_cb = efl_key_data_get(p, "_ecore_thread.free_cb"); - data = efl_key_data_get(p, "_ecore_thread.data"); - - if (free_cb) free_cb(data); - efl_del(p); -} - -static void -_ecore_thread_future_none(void *data, const Efl_Event *ev EINA_UNUSED) -{ - Ecore_Thread *t = data; - - // Cancelling thread if there is nobody listening on the promise anymore - ecore_thread_cancel(t); -} - -EAPI Efl_Future * -ecore_thread_future_run(Ecore_Thread_Future_Cb heavy, const void *data, Eina_Free_Cb free_cb) -{ - Ecore_Thread *t; - Eo *p; - - if (!heavy) return NULL; - - EFL_OPS_DEFINE(thread_safe_call, - EFL_OBJECT_OP_FUNC(efl_promise_value_set, _ecore_thread_value_set), - EFL_OBJECT_OP_FUNC(efl_promise_failed_set, _ecore_thread_failed_set), - EFL_OBJECT_OP_FUNC(efl_promise_progress_set, _ecore_thread_progress_set)); - - efl_domain_current_push(EFL_ID_DOMAIN_SHARED); - - efl_wref_add(efl_add(EFL_PROMISE_CLASS, efl_main_loop_get()), &p); - if (!p) goto end; - - efl_object_override(p, &thread_safe_call); - - efl_key_data_set(p, "_ecore_thread.data", data); - efl_key_data_set(p, "_ecore_thread.free_cb", free_cb); - efl_key_data_set(p, "_ecore_thread.heavy", heavy); - - t = ecore_thread_run(_ecore_thread_future_heavy, - _ecore_thread_future_end, - _ecore_thread_future_end, - p); - - if (p) - { - efl_event_callback_add(p, EFL_PROMISE_EVENT_FUTURE_NONE, _ecore_thread_future_none, t); - efl_wref_del(p, &p); - } - - end: - efl_domain_current_pop(); - - return p ? efl_promise_future_get(p) : NULL; -} |