summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/store/ms-sql/MessageRecordset.cpp
blob: b62a333df6a2723836da725fc4138a78bb820fe9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <qpid/Exception.h>
#include <qpid/log/Statement.h>

#include "MessageRecordset.h"
#include "BlobAdapter.h"
#include "BlobEncoder.h"
#include "VariantHelper.h"

#include <boost/intrusive_ptr.hpp>

class qpid::broker::PersistableMessage;

namespace qpid {
namespace store {
namespace ms_sql {

void
MessageRecordset::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
{
    BlobEncoder blob (msg);   // Marshall headers and content to a blob
    rs->AddNew();
    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
    rs->Update();
    uint64_t id = rs->Fields->Item["persistenceId"]->Value;
    msg->setPersistenceId(id);
}

void
MessageRecordset::append(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
                         const std::string& data)
{
    // Look up the message by its Id
    std::ostringstream filter;
    filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
    rs->PutFilter (VariantHelper<std::string>(filter.str()));
    if (rs->RecordCount == 0) {
        throw Exception("Can't append to message not stored in database");
    }
    BlobEncoder blob (data);   // Marshall string data to a blob
    rs->Fields->GetItem("fieldTableBlob")->AppendChunk(blob);
    rs->Update();
}

void
MessageRecordset::remove(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg)
{
    BlobRecordset::remove(msg->getPersistenceId());
}

void
MessageRecordset::loadContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
                              std::string& data,
                              uint64_t offset,
                              uint32_t length)
{
    // Look up the message by its Id
    std::ostringstream filter;
    filter << "persistenceId = " << msg->getPersistenceId() << std::ends;
    rs->PutFilter (VariantHelper<std::string>(filter.str()));
    if (rs->RecordCount == 0) {
        throw Exception("Can't load message not stored in database");
    }

    // NOTE! If this code needs to change, please verify the encoding
    // code in BlobEncoder.
    long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
    uint32_t headerSize;
    const size_t headerFieldLength = sizeof(headerSize);
    BlobAdapter blob(headerFieldLength);
    blob =
        rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
    headerSize = ((qpid::framing::Buffer&)blob).getLong();

    // GetChunk always begins reading where the previous GetChunk left off,
    // so we can't just tell it to ignore the header and read the data.
    // So, read the header plus the offset, plus the desired data, then
    // copy the desired data to the supplied string. If this ends up asking
    // for more than is available in the field, reduce it to what's there.
    long getSize = headerSize + offset + length;
    if (getSize + (long)headerFieldLength > blobSize) {
        size_t reduce = (getSize + headerFieldLength) - blobSize;
        getSize -= reduce;
        length -= reduce;
    }
    BlobAdapter header_plus(getSize);
    header_plus = rs->Fields->Item["fieldTableBlob"]->GetChunk(getSize);
    uint8_t *throw_away = new uint8_t[headerSize + offset];
    ((qpid::framing::Buffer&)header_plus).getRawData(throw_away, headerSize + offset);
    delete throw_away;
    ((qpid::framing::Buffer&)header_plus).getRawData(data, length);
}

void
MessageRecordset::recover(qpid::broker::RecoveryManager& recoverer,
                          std::map<uint64_t, broker::RecoverableMessage::shared_ptr>& messageMap)
{
    if (rs->BOF && rs->EndOfFile)
        return;   // Nothing to do
    rs->MoveFirst();
    Binding b;
    IADORecordBinding *piAdoRecordBinding;
    rs->QueryInterface(__uuidof(IADORecordBinding), 
                       (LPVOID *)&piAdoRecordBinding);
    piAdoRecordBinding->BindToRecordset(&b);
    while (!rs->EndOfFile) {
        // The blob was written as normal, but with the header length
        // prepended in a uint32_t. Due to message staging threshold
        // limits, the header may be all that's read in; get it first,
        // recover that message header, then see if the rest is needed.
        //
        // NOTE! If this code needs to change, please verify the encoding
        // code in BlobEncoder.
        long blobSize = rs->Fields->Item["fieldTableBlob"]->ActualSize;
        uint32_t headerSize;
        const size_t headerFieldLength = sizeof(headerSize);
        BlobAdapter blob(headerFieldLength);
        blob =
          rs->Fields->Item["fieldTableBlob"]->GetChunk((long)headerFieldLength);
        headerSize = ((qpid::framing::Buffer&)blob).getLong();
        BlobAdapter header(headerSize);
        header = rs->Fields->Item["fieldTableBlob"]->GetChunk(headerSize);
        broker::RecoverableMessage::shared_ptr msg;
        msg = recoverer.recoverMessage(header);
        msg->setPersistenceId(b.messageId);
        messageMap[b.messageId] = msg;

        // Now, do we need the rest of the content?
        long contentLength = blobSize - headerFieldLength - headerSize;
        if (msg->loadContent(contentLength)) {
            BlobAdapter content(contentLength);
             content =
                rs->Fields->Item["fieldTableBlob"]->GetChunk(contentLength);
            msg->decodeContent(content);
        }
        rs->MoveNext();
    }

    piAdoRecordBinding->Release();
}

void
MessageRecordset::dump()
{
    Recordset::dump();
    if (rs->EndOfFile && rs->BOF)    // No records
        return;
    rs->MoveFirst();

    Binding b;
    IADORecordBinding *piAdoRecordBinding;
    rs->QueryInterface(__uuidof(IADORecordBinding), 
                       (LPVOID *)&piAdoRecordBinding);
    piAdoRecordBinding->BindToRecordset(&b);
   
    while (VARIANT_FALSE == rs->EndOfFile) {
        QPID_LOG(notice, "Msg " << b.messageId);
        rs->MoveNext();
    }

    piAdoRecordBinding->Release();
}

}}}  // namespace qpid::store::ms_sql