/*------------------------------------------------------------------------- * * hashovfl.c * Overflow page management code for the Postgres hash access method * * Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.20 1999/02/13 23:14:19 momjian Exp $ * * NOTES * Overflow pages look like ordinary relation pages. * *------------------------------------------------------------------------- */ #include #include #include #include #ifndef HAVE_MEMMOVE #include #else #include #endif static OverflowPageAddress _hash_getovfladdr(Relation rel, Buffer *metabufp); static uint32 _hash_firstfreebit(uint32 map); /* * _hash_addovflpage * * Add an overflow page to the page currently pointed to by the buffer * argument 'buf'. * * *Metabufp has a read lock upon entering the function; buf has a * write lock. * */ Buffer _hash_addovflpage(Relation rel, Buffer *metabufp, Buffer buf) { OverflowPageAddress oaddr; BlockNumber ovflblkno; Buffer ovflbuf; HashMetaPage metap; HashPageOpaque ovflopaque; HashPageOpaque pageopaque; Page page; Page ovflpage; /* this had better be the last page in a bucket chain */ page = BufferGetPage(buf); _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno)); metap = (HashMetaPage) BufferGetPage(*metabufp); _hash_checkpage((Page) metap, LH_META_PAGE); /* allocate an empty overflow page */ oaddr = _hash_getovfladdr(rel, metabufp); if (oaddr == InvalidOvflAddress) elog(ERROR, "_hash_addovflpage: problem with _hash_getovfladdr."); ovflblkno = OADDR_TO_BLKNO(OADDR_OF(SPLITNUM(oaddr), OPAGENUM(oaddr))); Assert(BlockNumberIsValid(ovflblkno)); ovflbuf = _hash_getbuf(rel, ovflblkno, HASH_WRITE); Assert(BufferIsValid(ovflbuf)); ovflpage = BufferGetPage(ovflbuf); /* initialize the new overflow page */ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf)); ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage); ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf); ovflopaque->hasho_nextblkno = InvalidBlockNumber; ovflopaque->hasho_flag = LH_OVERFLOW_PAGE; ovflopaque->hasho_oaddr = oaddr; ovflopaque->hasho_bucket = pageopaque->hasho_bucket; _hash_wrtnorelbuf(rel, ovflbuf); /* logically chain overflow page to previous page */ pageopaque->hasho_nextblkno = ovflblkno; _hash_wrtnorelbuf(rel, buf); return ovflbuf; } /* * _hash_getovfladdr() * * Find an available overflow page and return its address. * * When we enter this function, we have a read lock on *metabufp which * we change to a write lock immediately. Before exiting, the write lock * is exchanged for a read lock. * */ static OverflowPageAddress _hash_getovfladdr(Relation rel, Buffer *metabufp) { HashMetaPage metap; Buffer mapbuf = 0; BlockNumber blkno; PageOffset offset; OverflowPageAddress oaddr; SplitNumber splitnum; uint32 *freep = NULL; uint32 max_free; uint32 bit; uint32 first_page; uint32 free_bit; uint32 free_page; uint32 in_use_bits; uint32 i, j; metap = (HashMetaPage) _hash_chgbufaccess(rel, metabufp, HASH_READ, HASH_WRITE); splitnum = metap->OVFL_POINT; max_free = metap->SPARES[splitnum]; free_page = (max_free - 1) >> (metap->hashm_bshift + BYTE_TO_BIT); free_bit = (max_free - 1) & (BMPGSZ_BIT(metap) - 1); /* Look through all the free maps to find the first free block */ first_page = metap->LAST_FREED >> (metap->hashm_bshift + BYTE_TO_BIT); for (i = first_page; i <= free_page; i++) { Page mappage; blkno = metap->hashm_mapp[i]; mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE); mappage = BufferGetPage(mapbuf); _hash_checkpage(mappage, LH_BITMAP_PAGE); freep = HashPageGetBitmap(mappage); Assert(freep); if (i == free_page) in_use_bits = free_bit; else in_use_bits = BMPGSZ_BIT(metap) - 1; if (i == first_page) { bit = metap->LAST_FREED & (BMPGSZ_BIT(metap) - 1); j = bit / BITS_PER_MAP; bit = bit & ~(BITS_PER_MAP - 1); } else { bit = 0; j = 0; } for (; bit <= in_use_bits; j++, bit += BITS_PER_MAP) if (freep[j] != ALL_SET) goto found; } /* No Free Page Found - have to allocate a new page */ metap->LAST_FREED = metap->SPARES[splitnum]; metap->SPARES[splitnum]++; offset = metap->SPARES[splitnum] - (splitnum ? metap->SPARES[splitnum - 1] : 0); #define OVMSG "HASH: Out of overflow pages. Out of luck.\n" if (offset > SPLITMASK) { if (++splitnum >= NCACHED) elog(ERROR, OVMSG); metap->OVFL_POINT = splitnum; metap->SPARES[splitnum] = metap->SPARES[splitnum - 1]; metap->SPARES[splitnum - 1]--; offset = 0; } /* Check if we need to allocate a new bitmap page */ if (free_bit == BMPGSZ_BIT(metap) - 1) { /* won't be needing old map page */ _hash_relbuf(rel, mapbuf, HASH_WRITE); free_page++; if (free_page >= NCACHED) elog(ERROR, OVMSG); /* * This is tricky. The 1 indicates that you want the new page * allocated with 1 clear bit. Actually, you are going to * allocate 2 pages from this map. The first is going to be the * map page, the second is the overflow page we were looking for. * The init_bitmap routine automatically, sets the first bit of * itself to indicate that the bitmap itself is in use. We would * explicitly set the second bit, but don't have to if we tell * init_bitmap not to leave it clear in the first place. */ if (_hash_initbitmap(rel, metap, OADDR_OF(splitnum, offset), 1, free_page)) elog(ERROR, "overflow_page: problem with _hash_initbitmap."); metap->SPARES[splitnum]++; offset++; if (offset > SPLITMASK) { if (++splitnum >= NCACHED) elog(ERROR, OVMSG); metap->OVFL_POINT = splitnum; metap->SPARES[splitnum] = metap->SPARES[splitnum - 1]; metap->SPARES[splitnum - 1]--; offset = 0; } } else { /* * Free_bit addresses the last used bit. Bump it to address the * first available bit. */ free_bit++; SETBIT(freep, free_bit); _hash_wrtbuf(rel, mapbuf); } /* Calculate address of the new overflow page */ oaddr = OADDR_OF(splitnum, offset); _hash_chgbufaccess(rel, metabufp, HASH_WRITE, HASH_READ); return oaddr; found: bit = bit + _hash_firstfreebit(freep[j]); SETBIT(freep, bit); _hash_wrtbuf(rel, mapbuf); /* * Bits are addressed starting with 0, but overflow pages are * addressed beginning at 1. Bit is a bit addressnumber, so we need to * increment it to convert it to a page number. */ bit = 1 + bit + (i * BMPGSZ_BIT(metap)); if (bit >= metap->LAST_FREED) metap->LAST_FREED = bit - 1; /* Calculate the split number for this page */ for (i = 0; (i < splitnum) && (bit > metap->SPARES[i]); i++) ; offset = (i ? bit - metap->SPARES[i - 1] : bit); if (offset >= SPLITMASK) elog(ERROR, OVMSG); /* initialize this page */ oaddr = OADDR_OF(i, offset); _hash_chgbufaccess(rel, metabufp, HASH_WRITE, HASH_READ); return oaddr; } /* * _hash_firstfreebit() * * Return the first bit that is not set in the argument 'map'. This * function is used to find an available overflow page within a * splitnumber. * */ static uint32 _hash_firstfreebit(uint32 map) { uint32 i, mask; mask = 0x1; for (i = 0; i < BITS_PER_MAP; i++) { if (!(mask & map)) return i; mask = mask << 1; } return i; } /* * _hash_freeovflpage() - * * Mark this overflow page as free and return a buffer with * the page that follows it (which may be defined as * InvalidBuffer). * */ Buffer _hash_freeovflpage(Relation rel, Buffer ovflbuf) { HashMetaPage metap; Buffer metabuf; Buffer mapbuf; BlockNumber prevblkno; BlockNumber blkno; BlockNumber nextblkno; HashPageOpaque ovflopaque; Page ovflpage; Page mappage; OverflowPageAddress addr; SplitNumber splitnum; uint32 *freep; uint32 ovflpgno; int32 bitmappage, bitmapbit; Bucket bucket; metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE); metap = (HashMetaPage) BufferGetPage(metabuf); _hash_checkpage((Page) metap, LH_META_PAGE); ovflpage = BufferGetPage(ovflbuf); _hash_checkpage(ovflpage, LH_OVERFLOW_PAGE); ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage); addr = ovflopaque->hasho_oaddr; nextblkno = ovflopaque->hasho_nextblkno; prevblkno = ovflopaque->hasho_prevblkno; bucket = ovflopaque->hasho_bucket; MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf)); _hash_wrtbuf(rel, ovflbuf); /* * fix up the bucket chain. this is a doubly-linked list, so we must * fix up the bucket chain members behind and ahead of the overflow * page being deleted. * * XXX this should look like: - lock prev/next - modify/write prev/next * (how to do write ordering with a doubly-linked list?) - unlock * prev/next */ if (BlockNumberIsValid(prevblkno)) { Buffer prevbuf = _hash_getbuf(rel, prevblkno, HASH_WRITE); Page prevpage = BufferGetPage(prevbuf); HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage); _hash_checkpage(prevpage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE); Assert(prevopaque->hasho_bucket == bucket); prevopaque->hasho_nextblkno = nextblkno; _hash_wrtbuf(rel, prevbuf); } if (BlockNumberIsValid(nextblkno)) { Buffer nextbuf = _hash_getbuf(rel, nextblkno, HASH_WRITE); Page nextpage = BufferGetPage(nextbuf); HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage); _hash_checkpage(nextpage, LH_OVERFLOW_PAGE); Assert(nextopaque->hasho_bucket == bucket); nextopaque->hasho_prevblkno = prevblkno; _hash_wrtbuf(rel, nextbuf); } /* * Fix up the overflow page bitmap that tracks this particular * overflow page. The bitmap can be found in the MetaPageData array * element hashm_mapp[bitmappage]. */ splitnum = (addr >> SPLITSHIFT); ovflpgno = (splitnum ? metap->SPARES[splitnum - 1] : 0) + (addr & SPLITMASK) - 1; if (ovflpgno < metap->LAST_FREED) metap->LAST_FREED = ovflpgno; bitmappage = (ovflpgno >> (metap->hashm_bshift + BYTE_TO_BIT)); bitmapbit = ovflpgno & (BMPGSZ_BIT(metap) - 1); blkno = metap->hashm_mapp[bitmappage]; mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE); mappage = BufferGetPage(mapbuf); _hash_checkpage(mappage, LH_BITMAP_PAGE); freep = HashPageGetBitmap(mappage); CLRBIT(freep, bitmapbit); _hash_wrtbuf(rel, mapbuf); _hash_relbuf(rel, metabuf, HASH_WRITE); /* * now instantiate the page that replaced this one, if it exists, and * return that buffer with a write lock. */ if (BlockNumberIsValid(nextblkno)) return _hash_getbuf(rel, nextblkno, HASH_WRITE); else return InvalidBuffer; } /* * _hash_initbitmap() * * Initialize a new bitmap page. The metapage has a write-lock upon * entering the function. * * 'pnum' is the OverflowPageAddress of the new bitmap page. * 'nbits' is how many bits to clear (i.e., make available) in the new * bitmap page. the remainder of the bits (as well as the first bit, * representing the bitmap page itself) will be set. * 'ndx' is the 0-based offset of the new bitmap page within the * metapage's array of bitmap page OverflowPageAddresses. */ #define INT_MASK ((1 << INT_TO_BIT) -1) int32 _hash_initbitmap(Relation rel, HashMetaPage metap, int32 pnum, int32 nbits, int32 ndx) { Buffer buf; BlockNumber blkno; Page pg; HashPageOpaque op; uint32 *freep; int clearbytes, clearints; blkno = OADDR_TO_BLKNO(pnum); buf = _hash_getbuf(rel, blkno, HASH_WRITE); pg = BufferGetPage(buf); _hash_pageinit(pg, BufferGetPageSize(buf)); op = (HashPageOpaque) PageGetSpecialPointer(pg); op->hasho_oaddr = InvalidOvflAddress; op->hasho_prevblkno = InvalidBlockNumber; op->hasho_nextblkno = InvalidBlockNumber; op->hasho_flag = LH_BITMAP_PAGE; op->hasho_bucket = -1; freep = HashPageGetBitmap(pg); /* set all of the bits above 'nbits' to 1 */ clearints = ((nbits - 1) >> INT_TO_BIT) + 1; clearbytes = clearints << INT_TO_BYTE; MemSet((char *) freep, 0, clearbytes); MemSet(((char *) freep) + clearbytes, 0xFF, BMPGSZ_BYTE(metap) - clearbytes); freep[clearints - 1] = ALL_SET << (nbits & INT_MASK); /* bit 0 represents the new bitmap page */ SETBIT(freep, 0); /* metapage already has a write lock */ metap->hashm_nmaps++; metap->hashm_mapp[ndx] = blkno; /* write out the new bitmap page (releasing its locks) */ _hash_wrtbuf(rel, buf); return 0; } /* * _hash_squeezebucket(rel, bucket) * * Try to squeeze the tuples onto pages occuring earlier in the * bucket chain in an attempt to free overflow pages. When we start * the "squeezing", the page from which we start taking tuples (the * "read" page) is the last bucket in the bucket chain and the page * onto which we start squeezing tuples (the "write" page) is the * first page in the bucket chain. The read page works backward and * the write page works forward; the procedure terminates when the * read page and write page are the same page. */ void _hash_squeezebucket(Relation rel, HashMetaPage metap, Bucket bucket) { Buffer wbuf; Buffer rbuf = 0; BlockNumber wblkno; BlockNumber rblkno; Page wpage; Page rpage; HashPageOpaque wopaque; HashPageOpaque ropaque; OffsetNumber woffnum; OffsetNumber roffnum; HashItem hitem; int itemsz; /* elog(DEBUG, "_hash_squeezebucket: squeezing bucket %d", bucket); */ /* * start squeezing into the base bucket page. */ wblkno = BUCKET_TO_BLKNO(bucket); wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE); wpage = BufferGetPage(wbuf); _hash_checkpage(wpage, LH_BUCKET_PAGE); wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage); /* * if there aren't any overflow pages, there's nothing to squeeze. */ if (!BlockNumberIsValid(wopaque->hasho_nextblkno)) { _hash_relbuf(rel, wbuf, HASH_WRITE); return; } /* * find the last page in the bucket chain by starting at the base * bucket page and working forward. * * XXX if chains tend to be long, we should probably move forward using * HASH_READ and then _hash_chgbufaccess to HASH_WRITE when we reach * the end. if they are short we probably don't care very much. if * the hash function is working at all, they had better be short.. */ ropaque = wopaque; do { rblkno = ropaque->hasho_nextblkno; if (ropaque != wopaque) _hash_relbuf(rel, rbuf, HASH_WRITE); rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE); rpage = BufferGetPage(rbuf); _hash_checkpage(rpage, LH_OVERFLOW_PAGE); Assert(!PageIsEmpty(rpage)); ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage); Assert(ropaque->hasho_bucket == bucket); } while (BlockNumberIsValid(ropaque->hasho_nextblkno)); /* * squeeze the tuples. */ roffnum = FirstOffsetNumber; for (;;) { hitem = (HashItem) PageGetItem(rpage, PageGetItemId(rpage, roffnum)); itemsz = IndexTupleDSize(hitem->hash_itup) + (sizeof(HashItemData) - sizeof(IndexTupleData)); itemsz = DOUBLEALIGN(itemsz); /* * walk up the bucket chain, looking for a page big enough for * this item. */ while (PageGetFreeSpace(wpage) < itemsz) { wblkno = wopaque->hasho_nextblkno; _hash_wrtbuf(rel, wbuf); if (!BlockNumberIsValid(wblkno) || (rblkno == wblkno)) { _hash_wrtbuf(rel, rbuf); /* wbuf is already released */ return; } wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE); wpage = BufferGetPage(wbuf); _hash_checkpage(wpage, LH_OVERFLOW_PAGE); Assert(!PageIsEmpty(wpage)); wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage); Assert(wopaque->hasho_bucket == bucket); } /* * if we're here, we have found room so insert on the "write" * page. */ woffnum = OffsetNumberNext(PageGetMaxOffsetNumber(wpage)); PageAddItem(wpage, (Item) hitem, itemsz, woffnum, LP_USED); /* * delete the tuple from the "read" page. PageIndexTupleDelete * repacks the ItemId array, so 'roffnum' will be "advanced" to * the "next" ItemId. */ PageIndexTupleDelete(rpage, roffnum); _hash_wrtnorelbuf(rel, rbuf); /* * if the "read" page is now empty because of the deletion, free * it. */ if (PageIsEmpty(rpage) && (ropaque->hasho_flag & LH_OVERFLOW_PAGE)) { rblkno = ropaque->hasho_prevblkno; Assert(BlockNumberIsValid(rblkno)); /* * free this overflow page. the extra _hash_relbuf is because * _hash_freeovflpage gratuitously returns the next page (we * want the previous page and will get it ourselves later). */ rbuf = _hash_freeovflpage(rel, rbuf); if (BufferIsValid(rbuf)) _hash_relbuf(rel, rbuf, HASH_WRITE); if (rblkno == wblkno) { /* rbuf is already released */ _hash_wrtbuf(rel, wbuf); return; } rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE); rpage = BufferGetPage(rbuf); _hash_checkpage(rpage, LH_OVERFLOW_PAGE); Assert(!PageIsEmpty(rpage)); ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage); Assert(ropaque->hasho_bucket == bucket); roffnum = FirstOffsetNumber; } } }