/* Copyright (C) 2012 Samsung Electronics This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "config.h" #include "WorkQueue.h" #include #include #include static const int invalidSocketDescriptor = -1; static const int threadMessageSize = 1; static const char finishThreadMessage[] = "F"; static const char wakupThreadMessage[] = "W"; PassOwnPtr WorkQueue::TimerWorkItem::create(Function function, double expireTime) { if (expireTime < 0) return nullptr; return adoptPtr(new TimerWorkItem(function, expireTime)); } WorkQueue::TimerWorkItem::TimerWorkItem(Function function, double expireTime) : m_function(function) , m_expireTime(expireTime) { } void WorkQueue::platformInitialize(const char* name) { int fds[2]; if (pipe(fds)) ASSERT_NOT_REACHED(); m_readFromPipeDescriptor = fds[0]; m_writeToPipeDescriptor = fds[1]; FD_ZERO(&m_fileDescriptorSet); FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet); m_maxFileDescriptor = m_readFromPipeDescriptor; m_socketDescriptor = invalidSocketDescriptor; m_threadLoop = true; createThread(reinterpret_cast(&WorkQueue::workQueueThread), this, name); } void WorkQueue::platformInvalidate() { sendMessageToThread(finishThreadMessage); } void WorkQueue::performWork() { while (true) { Vector > workItemQueue; { MutexLocker locker(m_workItemQueueLock); if (m_workItemQueue.isEmpty()) return; m_workItemQueue.swap(workItemQueue); } for (size_t i = 0; i < workItemQueue.size(); ++i) { workItemQueue[i](); deref(); } } } void WorkQueue::performFileDescriptorWork() { fd_set readFileDescriptorSet = m_fileDescriptorSet; if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) { if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) { char readBuf[threadMessageSize]; if (read(m_readFromPipeDescriptor, readBuf, threadMessageSize) == -1) LOG_ERROR("Failed to read from WorkQueueThread pipe"); if (!strncmp(readBuf, finishThreadMessage, threadMessageSize)) m_threadLoop = false; } if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet)) m_socketEventHandler(); } } struct timeval* WorkQueue::getNextTimeOut() { MutexLocker locker(m_timerWorkItemsLock); if (m_timerWorkItems.isEmpty()) return 0; static struct timeval timeValue; timeValue.tv_sec = 0; timeValue.tv_usec = 0; double timeOut = m_timerWorkItems[0]->expireTime() - currentTime(); if (timeOut > 0) { timeValue.tv_sec = static_cast(timeOut); timeValue.tv_usec = static_cast((timeOut - timeValue.tv_sec) * 1000000); } return &timeValue; } void WorkQueue::insertTimerWorkItem(PassOwnPtr item) { if (!item) return; size_t position = 0; MutexLocker locker(m_timerWorkItemsLock); // m_timerWorkItems should be ordered by expire time. for (; position < m_timerWorkItems.size(); ++position) if (item->expireTime() < m_timerWorkItems[position]->expireTime()) break; m_timerWorkItems.insert(position, item); } void WorkQueue::performTimerWork() { Vector > timerWorkItems; { // Protects m_timerWorkItems. MutexLocker locker(m_timerWorkItemsLock); if (m_timerWorkItems.isEmpty()) return; // Copies all the timer work items in m_timerWorkItems to local vector. m_timerWorkItems.swap(timerWorkItems); } double current = currentTime(); for (size_t i = 0; i < timerWorkItems.size(); ++i) { if (!timerWorkItems[i]->expired(current)) { // If a timer work item does not expired, keep it to the m_timerWorkItems. // m_timerWorkItems should be ordered by expire time. insertTimerWorkItem(timerWorkItems[i].release()); continue; } // If a timer work item expired, dispatch the function of the work item. timerWorkItems[i]->dispatch(); deref(); } } void WorkQueue::sendMessageToThread(const char* message) { MutexLocker locker(m_writeToPipeDescriptorLock); if (write(m_writeToPipeDescriptor, message, threadMessageSize) == -1) LOG_ERROR("Failed to wake up WorkQueue Thread"); } void* WorkQueue::workQueueThread(WorkQueue* workQueue) { while (workQueue->m_threadLoop) { workQueue->performWork(); workQueue->performTimerWork(); workQueue->performFileDescriptorWork(); } close(workQueue->m_readFromPipeDescriptor); close(workQueue->m_writeToPipeDescriptor); return 0; } void WorkQueue::registerSocketEventHandler(int fileDescriptor, const Function& function) { if (m_socketDescriptor != invalidSocketDescriptor) LOG_ERROR("%d is already registerd.", fileDescriptor); m_socketDescriptor = fileDescriptor; m_socketEventHandler = function; if (fileDescriptor > m_maxFileDescriptor) m_maxFileDescriptor = fileDescriptor; FD_SET(fileDescriptor, &m_fileDescriptorSet); } void WorkQueue::unregisterSocketEventHandler(int fileDescriptor) { m_socketDescriptor = invalidSocketDescriptor; if (fileDescriptor == m_maxFileDescriptor) m_maxFileDescriptor = m_readFromPipeDescriptor; FD_CLR(fileDescriptor, &m_fileDescriptorSet); } void WorkQueue::dispatch(const Function& function) { ref(); { MutexLocker locker(m_workItemQueueLock); m_workItemQueue.append(function); } sendMessageToThread(wakupThreadMessage); } void WorkQueue::dispatchAfterDelay(const Function& function, double delay) { if (delay < 0) return; OwnPtr timerWorkItem = TimerWorkItem::create(function, currentTime() + delay); if (!timerWorkItem) return; ref(); insertTimerWorkItem(timerWorkItem.release()); sendMessageToThread(wakupThreadMessage); }