summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
blob: a811a86492a665423653ec2b733d1a7d9287c4c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "qpid/broker/LegacyLVQ.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueuedMessage.h"

namespace qpid {
namespace broker {

LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}

void LegacyLVQ::setNoBrowse(bool b)
{ 
    noBrowse = b;
}

bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
{
    Ordering::iterator i = messages.find(position);
    if (i != messages.end() && i->second.payload == message.payload) {
        message = i->second;
        erase(i);
        return true;
    } else {
        return false;
    }
}

bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
{
    if (MessageMap::next(position, message)) {
        if (!noBrowse) index.erase(getKey(message));
        return true;
    } else {
        return false;
    }
}

bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
{
    //Hack to disable LVQ behaviour on cluster update:
    if (broker && broker->isClusterUpdatee()) {
        messages[added.position] = added;
        return false;
    } else {
        return MessageMap::push(added, removed);
    }
}

const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
{ 
    //add the new message into the original position of the replaced message
    Ordering::iterator i = messages.find(original.position);
    i->second = update;
    i->second.position = original.position;
    return i->second;
}

void LegacyLVQ::removeIf(Predicate p)
{
    //Note: This method is currently called periodically on the timer
    //thread to expire messages. In a clustered broker this means that
    //the purging does not occur on the cluster event dispatch thread
    //and consequently that is not totally ordered w.r.t other events
    //(including publication of messages). The cluster does ensure
    //that the actual expiration of messages (as distinct from the
    //removing of those expired messages from the queue) *is*
    //consistently ordered w.r.t. cluster events. This means that
    //delivery of messages is in general consistent across the cluster
    //inspite of any non-determinism in the triggering of a
    //purge. However at present purging a last value queue (of the
    //legacy sort) could potentially cause inconsistencies in the
    //cluster (as the order w.r.t publications can affect the order in
    //which messages appear in the queue). Consequently periodic
    //purging of an LVQ is not enabled if the broker is clustered
    //(expired messages will be removed on delivery and consolidated
    //by key as part of normal LVQ operation).

    //TODO: Is there a neater way to check whether broker is
    //clustered? Here we assume that if the clustered timer is the
    //same as the regular timer, we are not clustered:
    if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
        MessageMap::removeIf(p);
}

std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, 
                                                   const std::string& key, bool noBrowse, Broker* broker)
{
    LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
    if (lvq) { 
        lvq->setNoBrowse(noBrowse);
        return current;
    } else {
        return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
    }
}

}} // namespace qpid::broker