summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/Serializer.cpp
blob: faf94a0f937eb20f2a50f7db5c3fda62ef4b30ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "qpid/sys/Serializer.h"
#include "qpid/log/Statement.h"

#include <boost/bind.hpp>

#include <assert.h>

namespace qpid {
namespace sys {

Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn)
    : state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn)
{
    if (notifyDispatch.empty())
        notifyDispatch = boost::bind(&Serializer::notifyWorker, this);
}

Serializer::~Serializer() {
    {
        Mutex::ScopedLock l(lock);
        state = SHUTDOWN;
        lock.notify();
    }
    if (worker.id() != 0)
        worker.join();
}

void Serializer::dispatch(Task& task) {
    Mutex::ScopedUnlock u(lock);
    // Preconditions: lock is held, state is EXECUTING or DISPATCHING
    assert(state != IDLE);
    assert(state != SHUTDOWN);
    assert(state == EXECUTING || state == DISPATCHING);
    try {
        task();
    } catch (const std::exception& e) {
        QPID_LOG(critical, "Unexpected exception in Serializer::dispatch"
                 << e.what());
        assert(0);              // Should not happen.
    } catch (...) {
        QPID_LOG(critical, "Unexpected exception in Serializer::dispatch.");
        assert(0);              // Should not happen.
    }
}

void Serializer::execute(Task& task) {
    bool needNotify = false;
    {
        Mutex::ScopedLock l(lock);
        assert(state != SHUTDOWN);
        if (immediate && state == IDLE) {
            state = EXECUTING;
            dispatch(task);
            if (state != SHUTDOWN) {
                assert(state == EXECUTING);
                state = IDLE;
            }
        }
        else 
            queue.push_back(task);

        if (!queue.empty() && state == IDLE) {
            state = DISPATCHING;
            needNotify = true;
        }
    }
    if (needNotify)
        notifyDispatch();       // Not my function, call outside lock.
}

void Serializer::dispatch() {
    Mutex::ScopedLock l(lock);
    // TODO aconway 2007-07-16: This loop could be unbounded
    // if other threads add work while we're in dispatch(Task&).
    // If we need to bound it we could dispatch just the elements
    // that were enqueued when dispatch() was first called - save
    // begin() iterator and pop only up to that.
    while (!queue.empty() && state != SHUTDOWN) {
        assert(state == DISPATCHING);
        dispatch(queue.front());
        queue.pop_front();
    }
    if (state != SHUTDOWN) {
        assert(state == DISPATCHING);
        state = IDLE;
    }
}

void Serializer::notifyWorker() {
    if (!worker.id())
        worker = Thread(*this);
    else
        lock.notify();
}

void Serializer::run() {
    Mutex::ScopedLock l(lock);
    while (state != SHUTDOWN) {
        dispatch();
        lock.wait();
    }
}

}} // namespace qpid::sys