diff options
author | stbuehler <stbuehler@152afb58-edef-0310-8abb-c4023f1b3aa9> | 2009-10-25 17:32:27 +0000 |
---|---|---|
committer | stbuehler <stbuehler@152afb58-edef-0310-8abb-c4023f1b3aa9> | 2009-10-25 17:32:27 +0000 |
commit | 5462f9f54893ef54c6c992a90b36a49e045cbc41 (patch) | |
tree | bf00d2fe926456b797e1034be78c620f78730f4f | |
parent | 56e523b46ec3f38c247222544c34fa2db0bdd48c (diff) | |
download | lighttpd-5462f9f54893ef54c6c992a90b36a49e045cbc41.tar.gz |
Remove joblist thread, don't use timed pops for async queues
* Less spam in strace, should result in less overhead with joblist queue
git-svn-id: svn://svn.lighttpd.net/lighttpd/trunk@2673 152afb58-edef-0310-8abb-c4023f1b3aa9
-rw-r--r-- | NEWS | 1 | ||||
-rw-r--r-- | src/base.h | 1 | ||||
-rw-r--r-- | src/joblist.c | 21 | ||||
-rw-r--r-- | src/joblist.h | 8 | ||||
-rw-r--r-- | src/network_gthread_aio.c | 13 | ||||
-rw-r--r-- | src/network_gthread_freebsd_sendfile.c | 13 | ||||
-rw-r--r-- | src/network_gthread_sendfile.c | 13 | ||||
-rw-r--r-- | src/network_linux_aio.c | 10 | ||||
-rw-r--r-- | src/network_posix_aio.c | 12 | ||||
-rw-r--r-- | src/server.c | 153 | ||||
-rw-r--r-- | src/stat_cache.c | 15 |
11 files changed, 47 insertions, 213 deletions
@@ -148,6 +148,7 @@ NEWS * mod_accesslog: escape special characters (fixes #1551, thx icy) * Don't print ssl error if client didn't support TLS SNI * Reopen out stream in X-Rewrite (fixes #1678) + * Remove joblist thread, don't use timed pops for async queues - 1.5.0-r19.. - * -F option added for spawn-fcgi @@ -715,6 +715,7 @@ typedef struct server { GAsyncQueue *joblist_queue; GAsyncQueue *aio_write_queue; + int did_wakeup; int wakeup_pipe[2]; iosocket *wakeup_iosocket; #endif diff --git a/src/joblist.c b/src/joblist.c index f643938b..8f68f496 100644 --- a/src/joblist.c +++ b/src/joblist.c @@ -1,12 +1,13 @@ #include <stdlib.h> #include <string.h> +#include <unistd.h> #include "base.h" #include "joblist.h" #include "log.h" -int joblist_append(server *srv, connection *con) { - if (con->in_joblist) return 0; +void joblist_append(server *srv, connection *con) { + if (con->in_joblist) return; con->in_joblist = 1; if (srv->joblist->size == 0) { @@ -18,8 +19,6 @@ int joblist_append(server *srv, connection *con) { } srv->joblist->ptr[srv->joblist->used++] = con; - - return 0; } void joblist_free(server *srv, connections *joblist) { @@ -29,6 +28,20 @@ void joblist_free(server *srv, connections *joblist) { free(joblist); } +#ifdef USE_GTHREAD +void joblist_async_append(server *srv, connection *con) { + g_async_queue_push(srv->joblist_queue, con); + + server_wakeup(srv); +} + +void server_wakeup(server *srv) { + if (g_atomic_int_compare_and_exchange(&srv->did_wakeup, 0, 1)) { + write(srv->wakeup_pipe[1], " ", 1); + } +} +#endif + connection *fdwaitqueue_unshift(server *srv, connections *fdwaitqueue) { connection *con; UNUSED(srv); diff --git a/src/joblist.h b/src/joblist.h index 037f1815..2f6bb236 100644 --- a/src/joblist.h +++ b/src/joblist.h @@ -3,9 +3,15 @@ #include "base.h" -LI_API int joblist_append(server *srv, connection *con); +LI_API void joblist_append(server *srv, connection *con); LI_API void joblist_free(server *srv, connections *joblist); +#ifdef USE_GTHREAD +LI_API void joblist_async_append(server *srv, connection *con); + +LI_API void server_wakeup(server *srv); +#endif + LI_API int fdwaitqueue_append(server *srv, connection *con); LI_API void fdwaitqueue_free(server *srv, connections *fdwaitqueue); LI_API connection* fdwaitqueue_unshift(server *srv, connections *fdwaitqueue); diff --git a/src/network_gthread_aio.c b/src/network_gthread_aio.c index 92c5d6be..42a2f3bd 100644 --- a/src/network_gthread_aio.c +++ b/src/network_gthread_aio.c @@ -87,26 +87,18 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) { server *srv = (server *)_srv; GAsyncQueue * inq; - GAsyncQueue * outq; int fadvise_is_enosys = 0; - g_async_queue_ref(srv->joblist_queue); g_async_queue_ref(srv->aio_write_queue); - outq = srv->joblist_queue; inq = srv->aio_write_queue; /* */ while (!srv->is_shutdown) { - GTimeVal ts; write_job *wj = NULL; - /* wait one second as the poll() */ - g_get_current_time(&ts); - g_time_val_add(&ts, 500 * 1000); - - if ((wj = g_async_queue_timed_pop(inq, &ts))) { + if ((wj = g_async_queue_pop(inq))) { /* let's see what we have to stat */ ssize_t r; off_t offset; @@ -219,7 +211,7 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) { } timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED); /* read async, write as usual */ - g_async_queue_push(outq, wj->con); + joblist_async_append(srv, wj->con); #if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED) /* read ahead */ @@ -240,7 +232,6 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) { } g_async_queue_unref(srv->aio_write_queue); - g_async_queue_unref(srv->joblist_queue); return NULL; diff --git a/src/network_gthread_freebsd_sendfile.c b/src/network_gthread_freebsd_sendfile.c index 654e2bc7..639eda67 100644 --- a/src/network_gthread_freebsd_sendfile.c +++ b/src/network_gthread_freebsd_sendfile.c @@ -77,24 +77,16 @@ gpointer network_gthread_freebsd_sendfile_read_thread(gpointer _srv) { server *srv = (server *)_srv; GAsyncQueue * inq; - GAsyncQueue * outq; - g_async_queue_ref(srv->joblist_queue); g_async_queue_ref(srv->aio_write_queue); - outq = srv->joblist_queue; inq = srv->aio_write_queue; /* */ while (!srv->is_shutdown) { - GTimeVal ts; write_job *wj = NULL; - /* wait one second as the poll() */ - g_get_current_time(&ts); - g_time_val_add(&ts, 500 * 1000); - - if ((wj = g_async_queue_timed_pop(inq, &ts))) { + if ((wj = g_async_queue_pop(inq))) { /* let's see what we have to stat */ off_t r; off_t offset; @@ -142,14 +134,13 @@ gpointer network_gthread_freebsd_sendfile_read_thread(gpointer _srv) { timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED); /* read async, write as usual */ - g_async_queue_push(outq, wj->con); + joblist_async_append(srv, wj->con); write_job_free(wj); } } g_async_queue_unref(srv->aio_write_queue); - g_async_queue_unref(srv->joblist_queue); return NULL; diff --git a/src/network_gthread_sendfile.c b/src/network_gthread_sendfile.c index 39a40087..b39a082d 100644 --- a/src/network_gthread_sendfile.c +++ b/src/network_gthread_sendfile.c @@ -77,24 +77,16 @@ gpointer network_gthread_sendfile_read_thread(gpointer _srv) { server *srv = (server *)_srv; GAsyncQueue * inq; - GAsyncQueue * outq; - g_async_queue_ref(srv->joblist_queue); g_async_queue_ref(srv->aio_write_queue); - outq = srv->joblist_queue; inq = srv->aio_write_queue; /* */ while (!srv->is_shutdown) { - GTimeVal ts; write_job *wj = NULL; - /* wait one second as the poll() */ - g_get_current_time(&ts); - g_time_val_add(&ts, 500 * 1000); - - if ((wj = g_async_queue_timed_pop(inq, &ts))) { + if ((wj = g_async_queue_pop(inq))) { /* let's see what we have to stat */ ssize_t r; off_t offset; @@ -147,14 +139,13 @@ gpointer network_gthread_sendfile_read_thread(gpointer _srv) { timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED); /* read async, write as usual */ - g_async_queue_push(outq, wj->con); + joblist_async_append(srv, wj->con); write_job_free(wj); } } g_async_queue_unref(srv->aio_write_queue); - g_async_queue_unref(srv->joblist_queue); return NULL; diff --git a/src/network_linux_aio.c b/src/network_linux_aio.c index 81e0e1c9..23f64160 100644 --- a/src/network_linux_aio.c +++ b/src/network_linux_aio.c @@ -41,12 +41,6 @@ gpointer linux_aio_read_thread(gpointer _srv) { server *srv = (server *)_srv; - GAsyncQueue * outq; - - g_async_queue_ref(srv->joblist_queue); - - outq = srv->joblist_queue; - /* */ while (!srv->is_shutdown) { /* let's see what we have to stat */ @@ -70,14 +64,12 @@ gpointer linux_aio_read_thread(gpointer _srv) { /* free the iocb */ event[i].obj->data = NULL; - g_async_queue_push(outq, con); + joblist_async_append(srv, con); } } else if (res < 0) { TRACE("getevents - failed: %d: %s", res, strerror(-res)); } } - - g_async_queue_unref(srv->joblist_queue); return NULL; } diff --git a/src/network_posix_aio.c b/src/network_posix_aio.c index 86d1e6c2..b77abb22 100644 --- a/src/network_posix_aio.c +++ b/src/network_posix_aio.c @@ -78,17 +78,9 @@ static void posix_aio_completion_handler(union sigval foo) { chunk *c = wj->c; int res; - GAsyncQueue * outq; - - g_async_queue_ref(srv->joblist_queue); - - outq = srv->joblist_queue; - if (srv->is_shutdown) { write_job_free(wj); - g_async_queue_unref(outq); - return; } @@ -119,14 +111,12 @@ static void posix_aio_completion_handler(union sigval foo) { c->async.ret_val = NETWORK_STATUS_FATAL_ERROR; } - g_async_queue_push(outq, con); + joblist_async_append(srv, con); iocb->aio_nbytes = 0; /* mark the entry as unused */ } write_job_free(wj); - - g_async_queue_unref(outq); } NETWORK_BACKEND_WRITE(posixaio) { diff --git a/src/server.c b/src/server.c index c95dc2c9..05793336 100644 --- a/src/server.c +++ b/src/server.c @@ -532,71 +532,6 @@ static void show_help (void) { exit(-1); } } -#ifdef USE_GTHREAD -static GMutex *joblist_queue_mutex = NULL; -static GCond *joblist_queue_cond = NULL; -#endif - -/** - * a async handler - * - * The problem: - * - poll(..., 1000) is blocking - * - g_async_queue_timed_pop(..., 1000) is blocking too - * - * I havn't found a way to monitor g_async_queue_timed_pop with poll or friends - * - * So ... we run g_async_queue_timed_pop() and fdevents_poll() at the same time. As long - * as the poll() is running g_async_queue_timed_pop() running too. - * - * The one who finishes earlier writes to wakeup_pipe[1] to interrupt - * the other system call. We use mutexes to synchronize the two functions. - * - */ -#ifdef USE_GTHREAD -static void *joblist_queue_thread(void *_data) { - server *srv = _data; - - g_mutex_lock(joblist_queue_mutex); - - while (!srv_shutdown) { - GTimeVal ts; - connection *con; - - g_cond_wait(joblist_queue_cond, joblist_queue_mutex); - /* wait for getting signaled */ - - if (srv_shutdown) - break; - - /* wait one second as the poll() */ - g_get_current_time(&ts); - g_time_val_add(&ts, 1000 * 1000); - - /* we can't get interrupted :( - * if we don't get something into the queue we leave */ - if (NULL != (con = g_async_queue_timed_pop(srv->joblist_queue, &ts))) { - int killme = 0; - do { - if (con == (void *)1) { - /* ignore the wakeup-packet, it is only used to break out of the - * blocking nature of g_async_queue_timed_pop() */ - } else { - killme++; - joblist_append(srv, con); - } - } while ((con = g_async_queue_try_pop(srv->joblist_queue))); - - /* interrupt the poll() */ - if (killme) write(srv->wakeup_pipe[1], " ", 1); - } - } - - g_mutex_unlock(joblist_queue_mutex); - - return NULL; -} -#endif /** * call this function whenever you get a EMFILE or ENFILE as return-value @@ -931,27 +866,9 @@ static int lighty_mainloop(server *srv) { connection_state_machine(srv, con); } } -#ifdef USE_GTHREAD - /* open the joblist-queue handling */ - g_mutex_unlock(joblist_queue_mutex); - g_cond_signal(joblist_queue_cond); -#endif n = fdevent_poll(srv->ev, 1000); poll_errno = errno; -#ifdef USE_GTHREAD - if (FALSE == g_mutex_trylock(joblist_queue_mutex)) { - /** - * we couldn't get the lock, looks like the joblist-thread - * is still blocking on g_async_queue_timed_pop() - * - * let's send it a bogus job to jump out of the blocking mode - */ - g_async_queue_push(srv->joblist_queue, (void *)1); /* HACK to wakeup the g_async_queue_timed_pop() */ - - g_mutex_lock(joblist_queue_mutex); - } -#endif if (n > 0) { /* n is the number of events */ size_t i; @@ -1023,6 +940,14 @@ static int lighty_mainloop(server *srv) { * Note: Two joblist's are needed so a connection can be added back into the joblist * without getting stuck inside the for loop. */ +#ifdef USE_GTHREAD + { + connection *con; + while (NULL != (con = g_async_queue_try_pop(srv->joblist_queue))) { + joblist_append(srv, con); + } + } +#endif if(srv->joblist->used > 0) { connections *joblist = srv->joblist; /* switch joblist queues. */ @@ -1062,6 +987,7 @@ static handler_t wakeup_handle_fdevent(void *s, void *context, int revent) { UNUSED(con); UNUSED(revent); + g_atomic_int_set(&srv->did_wakeup, 0); (void) read(srv->wakeup_iosocket->fd, buf, sizeof(buf)); return HANDLER_GO_ON; } @@ -1077,13 +1003,11 @@ int main (int argc, char **argv, char **envp) { int pid_fd = -1, fd; size_t i; #ifdef USE_GTHREAD - int need_joblist_queue_thread = 0; GThread **stat_cache_threads; GThread **aio_write_threads = NULL; #ifdef USE_LINUX_AIO_SENDFILE GThread *linux_aio_read_thread_id = NULL; #endif - GThread * joblist_queue_thread_id = NULL; GError *gerr = NULL; #endif @@ -1675,6 +1599,7 @@ int main (int argc, char **argv, char **envp) { srv->wakeup_iosocket = iosocket_init(); srv->wakeup_iosocket->type = IOSOCKET_TYPE_PIPE; srv->wakeup_iosocket->fd = srv->wakeup_pipe[0]; + srv->did_wakeup = 0; fdevent_fcntl_set(srv->ev, srv->wakeup_iosocket); /* block on write */ #ifdef FD_CLOEXEC @@ -1706,15 +1631,10 @@ int main (int argc, char **argv, char **envp) { fdevent_event_add(srv->ev, srv->stat_cache->sock, FDEVENT_IN); } #endif - joblist_queue_mutex = g_mutex_new(); - joblist_queue_cond = g_cond_new(); - g_mutex_lock(joblist_queue_mutex); - stat_cache_threads = calloc(srv->srvconf.max_stat_threads, sizeof(*stat_cache_threads)); for (i = 0; i < srv->srvconf.max_stat_threads; i++) { stat_cache_threads[i] = g_thread_create(stat_cache_thread, srv, 1, &gerr); - need_joblist_queue_thread = 1; if (gerr) { ERROR("g_thread_create failed: %s", gerr->message); @@ -1734,7 +1654,6 @@ int main (int argc, char **argv, char **envp) { return -1; } } - need_joblist_queue_thread = 1; break; #ifdef USE_GTHREAD_SENDFILE case NETWORK_BACKEND_GTHREAD_SENDFILE: @@ -1747,7 +1666,6 @@ int main (int argc, char **argv, char **envp) { return -1; } } - need_joblist_queue_thread = 1; break; #endif #ifdef USE_GTHREAD_FREEBSD_SENDFILE @@ -1761,13 +1679,11 @@ int main (int argc, char **argv, char **envp) { return -1; } } - need_joblist_queue_thread = 1; break; #endif #ifdef USE_POSIX_AIO case NETWORK_BACKEND_POSIX_AIO: srv->posix_aio_iocbs = calloc(srv->srvconf.max_read_threads, sizeof(*srv->posix_aio_iocbs)); - need_joblist_queue_thread = 1; break; #endif #ifdef USE_LINUX_AIO_SENDFILE @@ -1785,8 +1701,6 @@ int main (int argc, char **argv, char **envp) { return -1; } - - need_joblist_queue_thread = 1; break; #endif default: @@ -1794,20 +1708,6 @@ int main (int argc, char **argv, char **envp) { } #endif /* ifndef _WIN32 */ - /* check if we really need this thread - * - * it simplifies debugging if there is no 'futex()' making noise in the strace()s - * - * - * */ - - - if (need_joblist_queue_thread) { - joblist_queue_thread_id = g_thread_create_full(joblist_queue_thread, srv, LI_THREAD_STACK_SIZE, 1, TRUE, G_THREAD_PRIORITY_NORMAL, &gerr); - if (gerr) { - return -1; - } - } #endif /* USE_GTHREAD */ for (i = 0; i < srv->srv_sockets.used; i++) { @@ -1864,39 +1764,6 @@ int main (int argc, char **argv, char **envp) { g_thread_join(stat_cache_threads[i]); } - /* in case the thread is still running, take it down now - * as it might be blocked in g_cond_wait() send a signal and - * let it shutdown */ - g_mutex_unlock(joblist_queue_mutex); - g_cond_signal(joblist_queue_cond); - - if (joblist_queue_thread_id) { - g_async_queue_push(srv->joblist_queue, (void *) 1); - g_thread_join(joblist_queue_thread_id); - } - -#if 0 - g_mutex_lock(joblist_queue_mutex); - - g_cond_free(joblist_queue_cond); - - g_mutex_unlock(joblist_queue_mutex); - - /* if I leave this enabled I get: - * - * GThread-ERROR **: file gthread-posix.c: line 160 (): error 'Device or resource busy' during 'pthread_mutex_destroy ((pthread_mutex_t *) mutex)' - * aborting... - * - * $ man pthread_mutex_destroy - * - * EBUSY The implementation has detected an attempt to destroy the object referenced by mutex while it is locked - * or referenced (for example, while being used in a pthread_cond_timedwait() or pthread_cond_wait()) - * by another thread. - * - * */ - g_mutex_free(joblist_queue_mutex); -#endif - /* the ref-count should be 0 now */ g_async_queue_unref(srv->stat_queue); g_async_queue_unref(srv->joblist_queue); diff --git a/src/stat_cache.c b/src/stat_cache.c index 6b605e82..55587ec2 100644 --- a/src/stat_cache.c +++ b/src/stat_cache.c @@ -20,6 +20,7 @@ #include "fdevent.h" #include "etag.h" #include "server.h" +#include "joblist.h" #ifdef HAVE_ATTR_ATTRIBUTES_H #include <attr/attributes.h> @@ -102,39 +103,29 @@ gpointer stat_cache_thread(gpointer _srv) { /* take the stat-job-queue */ GAsyncQueue * inq; - GAsyncQueue * outq; - g_async_queue_ref(srv->joblist_queue); g_async_queue_ref(srv->stat_queue); inq = srv->stat_queue; - outq = srv->joblist_queue; /* */ while (!srv->is_shutdown) { /* let's see what we have to stat */ struct stat st; - GTimeVal ts; - /* wait one second as the poll() */ - g_get_current_time(&ts); - g_time_val_add(&ts, 500 * 1000); - - if ((sj = g_async_queue_timed_pop(inq, &ts))) { + if ((sj = g_async_queue_pop(inq))) { if(sj == (stat_job *) 1) continue; /* just notifying us that srv->is_shutdown changed */ /* don't care about the return code for now */ stat(sj->name->ptr, &st); - g_async_queue_push(outq, sj->con); - + joblist_async_append(srv, sj->con); stat_job_free(sj); } } g_async_queue_unref(srv->stat_queue); - g_async_queue_unref(srv->joblist_queue); return NULL; } |