summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
blob: cf815b14c1a4231462ed4f84b65a6df4d919a1fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#define DBTUX_CMP_CPP
#include "Dbtux.hpp"

/*
 * Search key vs node prefix or entry.
 *
 * The comparison starts at given attribute position.  The position is
 * updated by number of equal initial attributes found.  The entry data
 * may be partial in which case CmpUnknown may be returned.
 *
 * The attributes are normalized and have variable size given in words.
 */
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, ConstData entryData, unsigned maxlen)
{
  const unsigned numAttrs = frag.m_numAttrs;
  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
  // skip to right position in search key only
  for (unsigned i = 0; i < start; i++) {
    jam();
    searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
  }
  // number of words of entry data left
  unsigned len2 = maxlen;
  int ret = 0;
  while (start < numAttrs) {
    if (len2 <= AttributeHeaderSize) {
      jam();
      ret = NdbSqlUtil::CmpUnknown;
      break;
    }
    len2 -= AttributeHeaderSize;
    if (! searchKey.ah().isNULL()) {
      if (! entryData.ah().isNULL()) {
        jam();
        // verify attribute id
        const DescAttr& descAttr = descEnt.m_descAttr[start];
        ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
        ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
        // sizes
        const unsigned size1 = searchKey.ah().getDataSize();
        const unsigned size2 = min(entryData.ah().getDataSize(), len2);
        len2 -= size2;
        // compare
        NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
        const Uint32* const p1 = &searchKey[AttributeHeaderSize];
        const Uint32* const p2 = &entryData[AttributeHeaderSize];
        const bool full = (maxlen == MaxAttrDataSize);
        ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
        if (ret != 0) {
          jam();
          break;
        }
      } else {
        jam();
        // not NULL > NULL
        ret = +1;
        break;
      }
    } else {
      if (! entryData.ah().isNULL()) {
        jam();
        // NULL < not NULL
        ret = -1;
        break;
      }
    }
    searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
    start++;
  }
  return ret;
}

/*
 * Scan bound vs node prefix or entry.
 *
 * Compare lower or upper bound and index entry data.  The entry data
 * may be partial in which case CmpUnknown may be returned.  Otherwise
 * returns -1 if the bound is to the left of the entry and +1 if the
 * bound is to the right of the entry.
 *
 * The routine is similar to cmpSearchKey, but 0 is never returned.
 * Suppose all attributes compare equal.  Recall that all bounds except
 * possibly the last one are non-strict.  Use the given bound direction
 * (0-lower 1-upper) and strictness of last bound to return -1 or +1.
 *
 * Following example illustrates this.  We are at (a=2, b=3).
 *
 * idir bounds                  strict          return
 * 0    a >= 2 and b >= 3       no              -1
 * 0    a >= 2 and b >  3       yes             +1
 * 1    a <= 2 and b <= 3       no              +1
 * 1    a <= 2 and b <  3       yes             -1
 *
 * The attributes are normalized and have variable size given in words.
 */
int
Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{
  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
  // direction 0-lower 1-upper
  ndbrequire(idir <= 1);
  // number of words of data left
  unsigned len2 = maxlen;
  // in case of no bounds, init last type to something non-strict
  unsigned type = 4;
  while (boundCount != 0) {
    if (len2 <= AttributeHeaderSize) {
      jam();
      return NdbSqlUtil::CmpUnknown;
    }
    len2 -= AttributeHeaderSize;
    // get and skip bound type (it is used after the loop)
    type = boundInfo[0];
    boundInfo += 1;
    if (! boundInfo.ah().isNULL()) {
      if (! entryData.ah().isNULL()) {
        jam();
        // verify attribute id
        const Uint32 index = boundInfo.ah().getAttributeId();
        ndbrequire(index < frag.m_numAttrs);
        const DescAttr& descAttr = descEnt.m_descAttr[index];
        ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
        // sizes
        const unsigned size1 = boundInfo.ah().getDataSize();
        const unsigned size2 = min(entryData.ah().getDataSize(), len2);
        len2 -= size2;
        // compare
        NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
        const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
        const Uint32* const p2 = &entryData[AttributeHeaderSize];
        const bool full = (maxlen == MaxAttrDataSize);
        int ret = (*cmp)(0, p1, size1 << 2, p2, size2 << 2, full);
        if (ret != 0) {
          jam();
          return ret;
        }
      } else {
        jam();
        // not NULL > NULL
        return +1;
      }
    } else {
      jam();
      if (! entryData.ah().isNULL()) {
        jam();
        // NULL < not NULL
        return -1;
      }
    }
    boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
    boundCount -= 1;
  }
  // all attributes were equal
  const int strict = (type & 0x1);
  return (idir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
}