summaryrefslogtreecommitdiff
path: root/storage/ibmdb2i/db2i_ioBuffers.h
blob: 350d854f0550f0f4f51f9ca12ccf20b5e58558f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
/*
Licensed Materials - Property of IBM
DB2 Storage Engine Enablement
Copyright IBM Corporation 2007,2008
All rights reserved

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met: 
 (a) Redistributions of source code must retain this list of conditions, the
     copyright notice in section {d} below, and the disclaimer following this
     list of conditions. 
 (b) Redistributions in binary form must reproduce this list of conditions, the
     copyright notice in section (d) below, and the disclaimer following this
     list of conditions, in the documentation and/or other materials provided
     with the distribution. 
 (c) The name of IBM may not be used to endorse or promote products derived from
     this software without specific prior written permission. 
 (d) The text of the required copyright notice is: 
       Licensed Materials - Property of IBM
       DB2 Storage Engine Enablement 
       Copyright IBM Corporation 2007,2008 
       All rights reserved

THIS SOFTWARE IS PROVIDED BY IBM CORPORATION "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL IBM CORPORATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
OF SUCH DAMAGE.
*/


/**
  @file db2i_ioBuffers.h
  
  @brief Buffer classes used for interacting with QMYSE read/write buffers.
  
*/


#include "db2i_validatedPointer.h"
#include "mysql_priv.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <as400_types.h>

// Needed for compilers which do not include fstatx in standard headers.
extern "C" int fstatx(int, struct stat *, int, int);

/**
  Basic row buffer
  
  Provides the basic structure and methods needed for communicating
  with QMYSE I/O APIs.
  
  @details All QMYSE I/O apis use a buffer that is structured as two integer
  row counts (max and used) and storage for some number of rows. The row counts
  are both input and output for the API, and their usage depends on the 
  particular API invoked. This class encapsulates that buffer definition.
*/
class IORowBuffer
{
  public:
    IORowBuffer() : allocSize(0), rowLength(0) {;} 
    ~IORowBuffer() { freeBuf(); }
    ValidatedPointer<char>& ptr() { return data; }
    
    /**
      Sets up the buffer to hold the size indicated.
      
      @param rowLen  length of the rows that will be stored in this buffer
      @param nullMapOffset  position of null map within each row
      @param size    buffer size requested
    */
    void allocBuf(uint32 rowLen, uint16 nullMapOffset, uint32 size)
    {
      nullOffset = nullMapOffset;
      uint32 newSize = size + sizeof(BufferHdr_t);
        // If the internal structure of the row is changing, we need to
        // remember this and notify the subclasses via initAfterAllocate();
      bool formatChanged = ((size/rowLen) != rowCapacity);
      
      if (newSize > allocSize)
      {
        this->freeBuf();
        data.alloc(newSize);
        if (likely((void*)data))
          allocSize = newSize;        
      }
      
      if (likely((void*)data))
      {
        DBUG_ASSERT((uint64)(void*)data % 16 == 0);
        rowLength = rowLen;
        rowCapacity = size / rowLength;
        initAfterAllocate(formatChanged);
      }
      else
      {
        allocSize = 0;
        rowCapacity = 0;
      }
      
      DBUG_PRINT("db2i_ioBuffers::allocBuf",("rowCapacity = %d", rowCapacity));
    }
   
    void zeroBuf()
    {
      memset(data, 0, allocSize);
    }

    void freeBuf()
    {
      if (likely(allocSize))
      {
        prepForFree();
        DBUG_PRINT("IORowBuffer::freeBuf",("Freeing 0x%p", (char*)data));
        data.dealloc();
      }
    }

    char* getRowN(uint32 n)
    {
      if (unlikely(n >= getRowCapacity()))
        return NULL;
      return (char*)data + sizeof(BufferHdr_t) + (rowLength * n);
    };

    uint32 getRowCapacity() const {return rowCapacity;}
    uint32 getRowNullOffset() const {return nullOffset;}
    uint32 getRowLength() const {return rowLength;}
        
  protected: 
    /**
      Called prior to freeing buffer storage so that subclasses can do
      any required cleanup      
    */
    virtual void prepForFree()
    { 
      allocSize = 0;
      rowCapacity = 0;
    }
    
    /**
      Called after buffer storage so that subclasses can do any required setup.
    */
    virtual void initAfterAllocate(bool sizeChanged) { return;}

    ValidatedPointer<char> data;
    uint32 allocSize;
    uint32 rowCapacity;
    uint32 rowLength;
    uint16 nullOffset;
    uint32& usedRows() const { return ((BufferHdr_t*)(char*)data)->UsedRowCnt; }
    uint32& maxRows() const {return ((BufferHdr_t*)(char*)data)->MaxRowCnt; }
};


/**
  Write buffer
  
  Implements methods for inserting data into a row buffer for use with the
  QMY_WRITE and QMY_UPDATE APIs.
  
  @details The max row count defines how many rows are in the buffer. The used 
  row count is updated by QMYSE to indicate how many rows have been 
  successfully written.
*/
class IOWriteBuffer : public IORowBuffer
{
  public: 
    bool endOfBuffer() const {return (maxRows() == getRowCapacity());}
  
    char* addRow()
    {
      return getRowN(maxRows()++);
    }
    
    void resetAfterWrite()
    {
      maxRows() = 0;
    }
    
    void deleteRow()
    {
      --maxRows();
    }
    
    uint32 rowCount() const {return maxRows();}
    
    uint32 rowsWritten() const {return usedRows()-1;}
    
  private: 
    void initAfterAllocate(bool sizeChanged) {maxRows() = 0; usedRows() = 0;}
};


/**
  Read buffer
  
  Implements methods for reading data from and managing a row buffer for use 
  with the QMY_READ APIs. This is primarily for use with metainformation queries.
*/
class IOReadBuffer : public IORowBuffer
{
  public: 
        
    IOReadBuffer() {;}
    IOReadBuffer(uint32 rows, uint32 rowLength)
    {
      allocBuf(rows, 0, rows * rowLength);
      maxRows() = rows;
    }
    
    uint32 rowCount() {return usedRows();}
    void setRowsToProcess(uint32 rows) { maxRows() = rows; }
};


/**
  Read buffer
  
  Implements methods for reading data from and managing a row buffer for use 
  with the QMY_READ APIs.
  
  @details This class supports both sync and async read modes.  The max row
  count defines the number of rows that are requested to be read. The used row
  count defines how many rows have been read. Sync mode is  reasonably
  straightforward, but async mode has a complex system of communicating with
  QMYSE that is optimized for low latency. In async mode, the used row count is
  updated continuously by QMYSE as rows are read. At the same time, messages are
  sent to the associated pipe indicating that a row has been read. As long as
  the internal read cursor lags behind the used row count,  the pipe is never
  consulted. But if the internal read cursor "catches up to" the used row count,
  then we block on the pipe until we find a message indicating that  a new row
  has been read or that an error has occurred.
*/
class IOAsyncReadBuffer : public IOReadBuffer
{
  public: 
    IOAsyncReadBuffer() : 
      file(0), readIsAsync(false), msgPipe(QMY_REUSE), bridge(NULL)
    {
    }
      
    ~IOAsyncReadBuffer() 
    {
      interruptRead();
      rrnList.dealloc();
    }

    
    /**
      Signal read operation complete
    
      Indicates that the storage engine requires no more data from the table.
      Must be called between calls to newReadRequest().      
    */
    void endRead()
    {
#ifndef DBUG_OFF
      if (readCursor < rowCount())
        DBUG_PRINT("PERF:",("Wasting %d buffered rows!\n", rowCount() - readCursor));
#endif
      interruptRead();
      
      file = 0;
      bridge = NULL;
    }
    
    /**
      Update data that may change on each read operation
    */
    void update(char newAccessIntent, 
              bool* newReleaseRowNeeded,
              char commitLvl)
    {
      accessIntent = newAccessIntent;
      releaseRowNeeded = newReleaseRowNeeded;
      commitLevel = commitLvl;
    }
    
    /**
      Read the next row in the table.
      
      Return a pointer to the next row in the table, where "next" is defined
      by the orientation.
      
      @param orientaton
      @param[out] rrn The relative record number of the row returned. Not reliable
                      if NULL is returned by this function.
      
      @return Pointer to the row. Null if no more rows are available or an error
              occurred.
    */
    char* readNextRow(char orientation, uint32& rrn)
    {
      DBUG_PRINT("db2i_ioBuffers::readNextRow", ("readCursor: %d, filledRows: %d, rc: %d", readCursor, rowCount(), rc));
      
      while (readCursor >= rowCount() && !rc)
      {
        if (!readIsAsync)
          loadNewRows(orientation);
        else
          pollNextRow(orientation);
      }
      
      if (readCursor >= rowCount())
        return NULL;
      
      rrn = rrnList[readCursor];      
      return getRowN(readCursor++);
    }
    
    /**
      Retrieve the return code generated by the last operation.
                  
      @return The return code, translated to the appropriate HA_ERR_*
              value if possible.
    */
    int32 lastrc()
    {
      return db2i_ileBridge::translateErrorCode(rc);
    }
        
    void rewind()
    {
      readCursor = 0;
      rc = 0;
      usedRows() = 0;
    }
    
    bool reachedEOD() { return EOD; }
    
    void newReadRequest(FILE_HANDLE infile,
                         char orientation,
                         uint32 rowsToBuffer,
                         bool useAsync,
                         ILEMemHandle key,
                         int keyLength,
                         int keyParts);
  
  private:       
    
    /**
      End any running async read operation.
    */
    void interruptRead()
    {
      closePipe();
      if (file && readIsAsync && (rc == 0) && (rowCount() < getRowCapacity()))
      {
        DBUG_PRINT("IOReadBuffer::interruptRead", ("PERF: Interrupting %d", (uint32)file));
        getBridge()->readInterrupt(file);
      }
    }
    
    void closePipe()
    {
      if (msgPipe != QMY_REUSE)
      {
        DBUG_PRINT("db2i_ioBuffers::closePipe", ("Closing pipe %d", msgPipe));
        close(msgPipe);
        msgPipe = QMY_REUSE;
      }
    }
    
    /**
      Get a pointer to the active ILE bridge.
      
      Getting the bridge pointer is (relatively) expensive, so we cache
      it off for each operation.
    */
    db2i_ileBridge* getBridge()
    {
      if (unlikely(bridge == NULL))
      {
        bridge = db2i_ileBridge::getBridgeForThread();
      }
      return bridge;
    }
    
    void drainPipe();
    void pollNextRow(char orientation);
    void prepForFree();
    void initAfterAllocate(bool sizeChanged);
    void loadNewRows(char orientation);

    
    uint32 readCursor;                          // Read position within buffer
    int32 rc;                                   // Last return code received
    ValidatedPointer<uint32> rrnList;           // Receiver for list of rrns
    char accessIntent;                          // The access intent for this read
    char commitLevel;                           // What isolation level should be used
    char EOD;                                   // Whether end-of-data was hit
    char readIsAsync;                           // Are reads to be done asynchronously?
    bool* releaseRowNeeded;                     
        /* Does the caller need to release the current row when finished reading */
    FILE_HANDLE file;                           // The file to be read
    int msgPipe;                                
        /* The read descriptor of the pipe used to pass messages during async reads */
    db2i_ileBridge* bridge;                     // Cached pointer to bridge
    uint32 rowsToBlock;                         // Number of rows to request
    enum
    {
      ConsumedFullBufferMsg,
      PendingFullBufferMsg,
      Untouched
    } pipeState;
        /* The state of the async read message pipe */
};