summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/PollableQueue.h
blob: 0bba2ba790d6cf1efc421b3ccb16737f4dfb17c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
#define QPID_CLUSTER_POLLABLEQUEUE_H

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "qpid/cluster/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <deque>

namespace qpid {

namespace sys { class Poller; }

namespace cluster {

// FIXME aconway 2008-08-11: this could be of more general interest,
// move to common lib.

/**
 * A queue that can be polled by sys::Poller.  Any thread can push to
 * the queue, on wakeup the poller thread processes all items on the
 * queue by passing them to a callback in a batch.
 */
template <class T>
class PollableQueue {
    typedef std::deque<T> Queue;

  public:
    typedef typename Queue::iterator iterator;
    
    /** Callback to process a range of items. */
    typedef boost::function<void (const iterator&, const iterator&)> Callback;

    /** When the queue is selected by the poller, values are passed to callback cb. */
    explicit PollableQueue(const Callback& cb);

    /** Push a value onto the queue. Thread safe */
    void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }

    /** Start polling. */ 
    void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }

    /** Stop polling. */
    void stop() { handle.stopWatch(); }
    
  private:
    typedef sys::Mutex::ScopedLock ScopedLock;
    typedef sys::Mutex::ScopedUnlock ScopedUnlock;

    void dispatch(sys::DispatchHandle&);
    
    sys::Mutex lock;
    Callback callback;
    PollableCondition condition;
    sys::DispatchHandle handle;
    Queue queue;
    Queue batch;
};

template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: 
    : callback(cb),
      handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
{}

template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
    ScopedLock l(lock);         // Lock for concurrent push() 
    batch.clear();
    batch.swap(queue);
    condition.clear();
    ScopedUnlock u(lock);
    callback(batch.begin(), batch.end()); // Process the batch outside the lock.
    h.rewatch();
}

}} // namespace qpid::cluster

#endif  /*!QPID_CLUSTER_POLLABLEQUEUE_H*/