summaryrefslogtreecommitdiff
path: root/libs/log/src/threadsafe_queue.cpp
blob: 9f86f75d9b9d6065aa57e3dd89ceff8f1f7c3d0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
 *          Copyright Andrey Semashev 2007 - 2015.
 * Distributed under the Boost Software License, Version 1.0.
 *    (See accompanying file LICENSE_1_0.txt or copy at
 *          http://www.boost.org/LICENSE_1_0.txt)
 */
/*!
 * \file   threadsafe_queue.cpp
 * \author Andrey Semashev
 * \date   05.11.2010
 *
 * \brief  This header is the Boost.Log library implementation, see the library documentation
 *         at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html.
 *
 * The implementation is based on algorithms published in the "Simple, Fast,
 * and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" article
 * in PODC96 by Maged M. Michael and Michael L. Scott. Pseudocode is available here:
 * http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
 *
 * The lock-free version of the mentioned algorithms contain a race condition and therefore
 * were not included here.
 */

#include <boost/log/detail/threadsafe_queue.hpp>

#ifndef BOOST_LOG_NO_THREADS

#include <new>
#include <boost/assert.hpp>
#include <boost/throw_exception.hpp>
#include <boost/align/aligned_alloc.hpp>
#include <boost/type_traits/alignment_of.hpp>
#include <boost/log/detail/spin_mutex.hpp>
#include <boost/log/detail/locks.hpp>
#include <boost/log/detail/header.hpp>

namespace boost {

BOOST_LOG_OPEN_NAMESPACE

namespace aux {

//! Generic queue implementation with two locks
class threadsafe_queue_impl_generic :
    public threadsafe_queue_impl
{
private:
    //! Mutex type to be used
    typedef spin_mutex mutex_type;

    /*!
     * A structure that contains a pointer to the node and the associated mutex.
     * The alignment below allows to eliminate false sharing, it should not be less than CPU cache line size.
     */
    struct BOOST_ALIGNMENT(BOOST_LOG_CPU_CACHE_LINE_SIZE) pointer
    {
        //! Pointer to the either end of the queue
        node_base* node;
        //! Lock for access synchronization
        mutex_type mutex;
        //  128 bytes padding is chosen to mitigate false sharing for NetBurst CPUs, which load two cache lines in one go.
        unsigned char padding[128U - (sizeof(node_base*) + sizeof(mutex_type)) % 128U];
    };

private:
    //! Pointer to the beginning of the queue
    pointer m_Head;
    //! Pointer to the end of the queue
    pointer m_Tail;

public:
    explicit threadsafe_queue_impl_generic(node_base* first_node)
    {
        set_next(first_node, NULL);
        m_Head.node = m_Tail.node = first_node;
    }

    ~threadsafe_queue_impl_generic()
    {
    }

    node_base* reset_last_node()
    {
        BOOST_ASSERT(m_Head.node == m_Tail.node);
        node_base* p = m_Head.node;
        m_Head.node = m_Tail.node = NULL;
        return p;
    }

    bool unsafe_empty()
    {
        return m_Head.node == m_Tail.node;
    }

    void push(node_base* p)
    {
        set_next(p, NULL);
        exclusive_lock_guard< mutex_type > _(m_Tail.mutex);
        set_next(m_Tail.node, p);
        m_Tail.node = p;
    }

    bool try_pop(node_base*& node_to_free, node_base*& node_with_value)
    {
        exclusive_lock_guard< mutex_type > _(m_Head.mutex);
        node_base* next = get_next(m_Head.node);
        if (next)
        {
            // We have a node to pop
            node_to_free = m_Head.node;
            node_with_value = m_Head.node = next;
            return true;
        }
        else
            return false;
    }

private:
    // Copying and assignment are closed
    threadsafe_queue_impl_generic(threadsafe_queue_impl_generic const&);
    threadsafe_queue_impl_generic& operator= (threadsafe_queue_impl_generic const&);

    BOOST_FORCEINLINE static void set_next(node_base* p, node_base* next)
    {
        p->next.data[0] = next;
    }
    BOOST_FORCEINLINE static node_base* get_next(node_base* p)
    {
        return static_cast< node_base* >(p->next.data[0]);
    }
};

BOOST_LOG_API threadsafe_queue_impl* threadsafe_queue_impl::create(node_base* first_node)
{
    return new threadsafe_queue_impl_generic(first_node);
}

BOOST_LOG_API void* threadsafe_queue_impl::operator new (std::size_t size)
{
    void* p = alignment::aligned_alloc(BOOST_LOG_CPU_CACHE_LINE_SIZE, size);
    if (!p)
        BOOST_THROW_EXCEPTION(std::bad_alloc());
    return p;
}

BOOST_LOG_API void threadsafe_queue_impl::operator delete (void* p, std::size_t)
{
    alignment::aligned_free(p);
}

} // namespace aux

BOOST_LOG_CLOSE_NAMESPACE // namespace log

} // namespace boost

#include <boost/log/detail/footer.hpp>

#endif // BOOST_LOG_NO_THREADS