summaryrefslogtreecommitdiff
path: root/cpp/src/tests/storePerfTools/asyncPerf/MockPersistableQueue.h
blob: a77f41743c72a76669814f46ecb2a62d23f6529e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/**
 * \file MockPersistableQueue.h
 */

#ifndef tests_storePerfTools_asyncPerf_MockPersistableQueue_h_
#define tests_storePerfTools_asyncPerf_MockPersistableQueue_h_

#include "qpid/asyncStore/AsyncOperation.h"
#include "qpid/broker/AsyncStore.h" // qpid::broker::DataSource
#include "qpid/broker/BrokerContext.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueHandle.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Mutex.h"

#include <boost/shared_ptr.hpp>
#include <deque>

namespace qpid {
namespace asyncStore {
class AsyncStoreImpl;
}
namespace framing {
class FieldTable;
}}

namespace tests {
namespace storePerftools {
namespace asyncPerf {

class QueuedMessage;
class TestOptions;

typedef boost::shared_ptr<qpid::asyncStore::AsyncStoreImpl> AsyncStoreImplPtr;
typedef boost::shared_ptr<QueuedMessage> QueuedMessagePtr;

class MockPersistableQueue : public qpid::broker::PersistableQueue, qpid::broker::DataSource
{
public:
    class QueueContext : public qpid::broker::BrokerContext
    {
    public:
        QueueContext(MockPersistableQueue* q,
                     const qpid::asyncStore::AsyncOperation::opCode op);
        virtual ~QueueContext();
        const char* getOp() const;
        void destroy();
        MockPersistableQueue* m_q;
        const qpid::asyncStore::AsyncOperation::opCode m_op;
    };

    MockPersistableQueue(const std::string& name,
                         const qpid::framing::FieldTable& args,
                         AsyncStoreImplPtr store,
                         const TestOptions& perfTestParams,
                         const char* msgData);
    virtual ~MockPersistableQueue();

    // --- Async functionality ---
    static void handleAsyncResult(const qpid::broker::AsyncResult* res,
                                  qpid::broker::BrokerContext* bc);
    qpid::broker::QueueHandle& getHandle();

    // --- Performance test thread entry points ---
    void* runEnqueues();
    void* runDequeues();
    static void* startEnqueues(void* ptr);
    static void* startDequeues(void* ptr);

    // --- Interface qpid::broker::Persistable ---
    virtual void encode(qpid::framing::Buffer& buffer) const;
    virtual uint32_t encodedSize() const;
    virtual uint64_t getPersistenceId() const;
    virtual void setPersistenceId(uint64_t persistenceId) const;

    // --- Interface qpid::broker::PersistableQueue ---
    virtual void flush();
    virtual const std::string& getName() const;
    virtual void setExternalQueueStore(qpid::broker::ExternalQueueStore* inst);

    // --- Interface DataStore ---
    virtual uint64_t getSize();
    virtual void write(char* target);

protected:
    const std::string m_name;
    AsyncStoreImplPtr m_store;
    mutable uint64_t m_persistenceId;
    std::string m_persistableData;
    qpid::broker::QueueHandle m_queueHandle;

    // Test params
    const TestOptions& m_perfTestOpts;
    const char* m_msgData;

    typedef std::deque<QueuedMessagePtr> MsgEnqList;
    typedef MsgEnqList::iterator MsgEnqListItr;
    MsgEnqList m_enqueuedMsgs;
    qpid::sys::Mutex m_enqueuedMsgsMutex;
    qpid::sys::Condition m_dequeueCondition;

    // --- Ascnc op completions (called through handleAsyncResult) ---
    void createComplete(const QueueContext* qc);
    void flushComplete(const QueueContext* qc);
    void destroyComplete(const QueueContext* qc);

    // --- Queue functionality ---
    void push(QueuedMessagePtr& msg);
    void pop(QueuedMessagePtr& msg);
};

}}} // namespace tests::storePerftools::asyncPerf

#endif // tests_storePerfTools_asyncPerf_MockPersistableQueue_h_