summaryrefslogtreecommitdiff
path: root/src/key-value-store/database/kissdb.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/key-value-store/database/kissdb.c')
-rw-r--r--src/key-value-store/database/kissdb.c1511
1 files changed, 1511 insertions, 0 deletions
diff --git a/src/key-value-store/database/kissdb.c b/src/key-value-store/database/kissdb.c
new file mode 100644
index 0000000..4c8e7b6
--- /dev/null
+++ b/src/key-value-store/database/kissdb.c
@@ -0,0 +1,1511 @@
+ /******************************************************************************
+ * Project Persistency
+ * (c) copyright 2014
+ * Company XS Embedded GmbH
+ *****************************************************************************/
+/* (Keep It) Simple Stupid Database
+*
+* Written by Adam Ierymenko <adam.ierymenko@zerotier.com>
+* Modified by Simon Disch <simon.disch@xse.de>
+*
+* KISSDB is in the public domain and is distributed with NO WARRANTY.
+*
+* http://creativecommons.org/publicdomain/zero/1.0/ */
+
+/* Compile with KISSDB_TEST to build as a test program. */
+
+/* Note: big-endian systems will need changes to implement byte swapping
+* on hash table file I/O. Or you could just use it as-is if you don't care
+* that your database files will be unreadable on little-endian systems. */
+
+#define _FILE_OFFSET_BITS 64
+#define TMP_BUFFER_LENGTH 128
+#define KISSDB_HEADER_SIZE sizeof(Header_s)
+#define __useBackups
+//#define __useFileMapping
+//#define __writeThrough
+#define __checkerror
+
+#include "./kissdb.h"
+#include "../crc32.h"
+#include <string.h>
+#include <stdlib.h>
+#include <inttypes.h>
+#include <stdint.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <sys/time.h>
+
+#include "dlt.h"
+
+DLT_DECLARE_CONTEXT(persComLldbDLTCtx);
+
+#ifdef __showTimeMeasurements
+inline long long getNsDuration(struct timespec* start, struct timespec* end)
+{
+ return ((end->tv_sec * SECONDS2NANO) + end->tv_nsec) - ((start->tv_sec * SECONDS2NANO) + start->tv_nsec);
+}
+#endif
+
+/* djb2 hash function */
+static uint64_t KISSDB_hash(const void *b, unsigned long len)
+{
+ unsigned long i;
+ uint64_t hash = 5381;
+ for (i = 0; i < len; ++i)
+ hash = ((hash << 5) + hash) + (uint64_t) (((const uint8_t *) b)[i]);
+ return hash;
+}
+
+//returns a name for shared memory objects beginning with a slash followed by "path" (non alphanumeric chars are replaced with '_') appended with "tailing"
+char * kdbGetShmName(const char *tailing, const char * path)
+{
+ char * result = (char *) malloc(1 + strlen(path) + strlen(tailing) + 1); //free happens at lifecycle shutdown
+ int i =0;
+ int x = 1;
+
+ if (result != NULL)
+ {
+ result[0] = '/';
+ for (i = 0; i < strlen(path); i++)
+ {
+ if (!isalnum(path[i]))
+ result[i + 1] = '_';
+ else
+ result[i + 1] = path[i];
+ }
+ for (x = 0; x < strlen(tailing); x++)
+ {
+ result[i + x + 1] = tailing[x];
+ }
+ result[i + x + 1] = '\0';
+ }
+ return result;
+}
+
+//returns -1 on error and positive value for success
+int kdbShmemOpen(const char * name, size_t length, Kdb_bool* shmCreator)
+{
+ int result;
+ result = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+ if (result < 0)
+ {
+ if (errno == EEXIST)
+ {
+ *shmCreator = Kdb_false;
+ result = shm_open(name, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
+ if (result < 0)
+ return -1;
+ }
+ }
+ else
+ {
+ *shmCreator = Kdb_true;
+ if (ftruncate(result, length) < 0)
+ return -1;
+ }
+ return result;
+}
+
+void Kdb_wrlock(pthread_rwlock_t * wrlock)
+{
+ pthread_rwlock_wrlock(wrlock);
+}
+
+void Kdb_rdlock(pthread_rwlock_t * rdlock)
+{
+ pthread_rwlock_rdlock(rdlock);
+}
+
+void Kdb_unlock(pthread_rwlock_t * lock)
+{
+ pthread_rwlock_unlock(lock);
+}
+
+Kdb_bool kdbShmemClose(int shmem, const char * shmName)
+{
+ if( close(shmem) == -1)
+ return Kdb_false;
+ if( shm_unlink(shmName) < 0)
+ return Kdb_false;
+ return Kdb_true;
+}
+
+void * getKdbShmemPtr(int shmem, size_t length)
+{
+ void* result = mmap(NULL, length, PROT_READ | PROT_WRITE, MAP_SHARED, shmem, 0);
+ if (result == MAP_FAILED)
+ return ((void *) -1);
+ return result;
+}
+
+Kdb_bool freeKdbShmemPtr(void * shmem_ptr, size_t length)
+{
+ if(munmap(shmem_ptr, length) == 0)
+ return Kdb_true;
+ else
+ return Kdb_false;
+}
+
+Kdb_bool resizeKdbShmem(int shmem, Hashtable_slot_s** shmem_ptr, size_t oldLength, size_t newLength)
+{
+ //unmap shm with old size
+ if( freeKdbShmemPtr(*shmem_ptr, oldLength) == Kdb_false)
+ return Kdb_false;
+
+ if (ftruncate(shmem, newLength) < 0)
+ return Kdb_false;
+
+ //get pointer to resized shm with new Length
+ *shmem_ptr = getKdbShmemPtr(shmem, newLength);
+ if(*shmem_ptr == ((void *) -1))
+ return Kdb_false;
+ return Kdb_true;
+}
+
+#ifdef __writeThrough
+Kdb_bool remapKdbShmem(int shmem, uint64_t** shmem_ptr, size_t oldLength, size_t newLength)
+{
+ //unmap shm with old size
+ if( freeKdbShmemPtr(*shmem_ptr, oldLength) == Kdb_false )
+ return Kdb_false;
+ //get pointer to resized shm with new Length
+ *shmem_ptr = getKdbShmemPtr(shmem, newLength);
+ if(*shmem_ptr == ((void *) -1))
+ return Kdb_false;
+ return Kdb_true;
+}
+#endif
+
+
+int KISSDB_open(KISSDB *db, const char *path, int mode, uint16_t hash_table_size, uint64_t key_size,
+ uint64_t value_size)
+{
+ Hashtable_slot_s *httmp;
+ Kdb_bool tmp_creator;
+ int ret = 0;
+
+ //TODO check if usage of O_SYNC O_DIRECT flags is needed. If O_SYNC and O_DIrect is specified, no additional fsync calls are needed after fflush
+ if(mode == KISSDB_OPEN_MODE_RWCREAT)
+ db->fd = open(path, O_CREAT | O_RDWR , S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH ); //gets closed when db->f is closed
+ else
+ db->fd = open(path, O_RDWR , S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH ); //gets closed when db->f is closed
+
+ if(db->fd == -1)
+ return KISSDB_ERROR_IO;
+
+ if (lseek(db->fd, 0, SEEK_END) == -1)
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+ if (lseek(db->fd, 0, SEEK_CUR) < KISSDB_HEADER_SIZE)
+ {
+ /* write header if not already present */
+ if ((hash_table_size) && (key_size) && (value_size))
+ {
+ ret = writeHeader(db, &hash_table_size, &key_size, &value_size);
+ if(0 != ret)
+ {
+ close(db->fd);
+ return ret;
+ }
+ //Seek behind header
+ if (lseek(db->fd, KISSDB_HEADER_SIZE, SEEK_SET) == -1)
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+ }
+ else
+ {
+ close(db->fd);
+ return KISSDB_ERROR_INVALID_PARAMETERS;
+ }
+ }
+ else
+ {
+ //read existing header
+ ret = readHeader(db, &hash_table_size, &key_size, &value_size);
+ if( 0 != ret)
+ return ret;
+
+ if (lseek(db->fd, KISSDB_HEADER_SIZE, SEEK_SET) == -1)
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ } //Seek behind header
+ }
+ //store non shared db information
+ db->hash_table_size = hash_table_size;
+ db->key_size = key_size;
+ db->value_size = value_size;
+ db->hash_table_size_bytes = sizeof(Hashtable_slot_s) * (hash_table_size + 1); /* [hash_table_size] == next table */
+
+ //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Hashtable size in bytes: "), DLT_UINT64(db->hash_table_size_bytes));
+
+ if (db->already_open == Kdb_false) //check if this instance has already opened the db before
+ {
+ db->shmem_cached_name = kdbGetShmName("-cache", path);
+ if(db->shmem_cached_name == NULL)
+ return KISSDB_ERROR_MALLOC;
+ db->shmem_info_name = kdbGetShmName("-shm-info", path);
+ if(db->shmem_info_name == NULL)
+ return KISSDB_ERROR_MALLOC;
+ db->shmem_info_fd = kdbShmemOpen(db->shmem_info_name, sizeof(Shared_Data_s), &db->shmem_creator);
+ if(db->shmem_info_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ db->shmem_info = (Shared_Data_s *) getKdbShmemPtr(db->shmem_info_fd, sizeof(Shared_Data_s));
+ if(db->shmem_info == ((void *) -1))
+ return KISSDB_ERROR_MAP_SHM;
+
+ size_t first_mapping;
+ if(db->shmem_info->shmem_size > db->hash_table_size_bytes )
+ first_mapping = db->shmem_info->shmem_size;
+ else
+ first_mapping = db->hash_table_size_bytes;
+
+ //open / create shared memory for first hashtable
+ db->shmem_ht_name = kdbGetShmName("-ht", path);
+ if(db->shmem_ht_name == NULL)
+ return KISSDB_ERROR_MALLOC;
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, first_mapping, &tmp_creator);
+ if(db->shmem_ht_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ db->hash_tables = (Hashtable_slot_s *) getKdbShmemPtr(db->shmem_ht_fd, first_mapping);
+ if(db->hash_tables == ((void *) -1))
+ return KISSDB_ERROR_MAP_SHM;
+ db->old_mapped_size = first_mapping; //local information
+
+ //if shared memory for rwlock was opened (created) with this call to KISSDB_open for the first time -> init rwlock
+ if (db->shmem_creator == Kdb_true)
+ {
+ //[Initialize rwlock attributes]
+ pthread_rwlockattr_t rwlattr, cache_rwlattr;
+ pthread_rwlockattr_init(&rwlattr);
+ pthread_rwlockattr_init(&cache_rwlattr);
+ pthread_rwlockattr_setpshared(&rwlattr, PTHREAD_PROCESS_SHARED);
+ pthread_rwlockattr_setpshared(&cache_rwlattr, PTHREAD_PROCESS_SHARED);
+ pthread_rwlock_init(&db->shmem_info->rwlock, &rwlattr);
+ pthread_rwlock_init(&db->shmem_info->cache_rwlock, &cache_rwlattr);
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+#ifdef __checkerror
+ //CHECK POWERLOSS FLAGS
+ ret = checkErrorFlags(db);
+ if (0 != ret)
+ {
+ close(db->fd);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return ret;
+ }
+#endif
+ db->shmem_info->num_hash_tables = 0;
+ }
+ else // already initialized
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+ db->already_open = Kdb_true;
+ }
+ else
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+ //only read header from file into memory for first caller of KISSDB_open
+ if (db->shmem_creator == Kdb_true)
+ {
+ httmp = (Hashtable_slot_s*) malloc(db->hash_table_size_bytes); //read hashtable from file
+ if (!httmp)
+ {
+ close(db->fd);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_MALLOC;
+ }
+ while (read(db->fd, httmp, db->hash_table_size_bytes) == db->hash_table_size_bytes)
+ {
+ Kdb_bool result = Kdb_false;
+ //if new size would exceed old shared memory size-> allocate additional memory page to shared memory
+ if (db->hash_table_size_bytes * (db->shmem_info->num_hash_tables + 1) > db->old_mapped_size)
+ {
+ Kdb_bool temp;
+ if (db->shmem_ht_fd <= 0)
+ {
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp);
+ if(db->shmem_ht_fd < 0)
+ {
+ free(httmp);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_OPEN_SHM;
+ }
+ }
+ result = resizeKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->old_mapped_size + db->hash_table_size_bytes);
+ if (result == Kdb_false)
+ {
+ free(httmp);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_RESIZE_SHM;
+ }
+ else
+ {
+ db->shmem_info->shmem_size = db->old_mapped_size + db->hash_table_size_bytes;
+ db->old_mapped_size = db->old_mapped_size + db->hash_table_size_bytes;
+ }
+ }
+ // copy the current hashtable read from file to (htadress + (htsize * htcount)) in memory
+ memcpy(((uint8_t *) db->hash_tables) + (db->hash_table_size_bytes * db->shmem_info->num_hash_tables), httmp, db->hash_table_size_bytes);
+ ++db->shmem_info->num_hash_tables;
+
+ //read until all hash tables have been read
+ if (httmp[db->hash_table_size].offsetA) //if httable[hash_table_size] contains a offset to a further hashtable
+ {
+ //ONE MORE HASHTABLE FOUND
+ if (lseek(db->fd, httmp[db->hash_table_size].offsetA, SEEK_SET) == -1)
+ { //move the filepointer to the next hashtable in the file
+ KISSDB_close(db);
+ free(httmp);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ }
+ else
+ break; // no further hashtables exist
+ }
+ free(httmp);
+ }
+
+ //printSharedHashtable(db);
+
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0;
+}
+
+
+
+
+
+int KISSDB_close(KISSDB *db)
+{
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+ uint64_t crc = 0;
+ Header_s* ptr = 0;
+#ifdef __showTimeMeasurements
+ long long KdbDuration = 0;
+ struct timespec mmapStart, mmapEnd;
+ KdbDuration = 0;
+#endif
+
+ //printSharedHashtable(db);
+ if (db->shmem_creator == Kdb_true)
+ {
+ //free shared hashtable
+ if( freeKdbShmemPtr(db->hash_tables, db->old_mapped_size) == Kdb_false)
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_UNMAP_SHM;
+ }
+ if( kdbShmemClose(db->shmem_ht_fd, db->shmem_ht_name) == Kdb_false)
+ return KISSDB_ERROR_CLOSE_SHM;
+
+ free(db->shmem_ht_name);
+ Kdb_unlock(&db->shmem_info->rwlock);
+ pthread_rwlock_destroy(&db->shmem_info->rwlock);
+ pthread_rwlock_destroy(&db->shmem_info->cache_rwlock);
+
+ // free shared information
+ if (freeKdbShmemPtr(db->shmem_info, sizeof(Kdb_bool)) == Kdb_false)
+ return KISSDB_ERROR_UNMAP_SHM;
+ if (kdbShmemClose(db->shmem_info_fd, db->shmem_info_name) == Kdb_false)
+ return KISSDB_ERROR_CLOSE_SHM;
+ free(db->shmem_info_name);
+
+#ifdef __showTimeMeasurements
+ clock_gettime(CLOCK_ID, &mmapStart);
+#endif
+
+ //update header (checksum and flags)
+ int mapFlag = PROT_WRITE | PROT_READ;
+ ptr = (Header_s*) mmap(NULL,KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0);
+ if (ptr == MAP_FAILED)
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+#ifdef __checkerror
+ // generate checksum over database file (beginning at file offset [sizeof(ptr->KdbV) + sizeof(ptr->checksum)] up to EOF)
+ if( db->fd )
+ {
+ crc = 0;
+ crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s) );
+ ptr->checksum = crc;
+ //printf("CLOSING ------ DB: %s, WITH CHECKSUM CALCULATED: %" PRIu64 " \n", db->shmem_ht_name, ptr->checksum);
+ }
+#endif
+ ptr->closeFailed = 0x00; //remove closeFailed flag
+ ptr->closeOk = 0x01; //set closeOk flag
+
+ //sync changes with file
+ if( 0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE))
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+ //unmap memory
+ if( 0 != munmap(ptr, KISSDB_HEADER_SIZE))
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+#ifdef __showTimeMeasurements
+ clock_gettime(CLOCK_ID, &mmapEnd);
+ KdbDuration += getNsDuration(&mmapStart, &mmapEnd);
+ printf("mmap duration for => %f ms\n", (double)((double)KdbDuration/NANO2MIL));
+#endif
+ fsync(db->fd);
+
+ if( db->fd)
+ close(db->fd);
+
+ db->already_open = Kdb_false;
+ //memset(db, 0, sizeof(KISSDB)); //todo check if necessary
+ }
+ else
+ //if caller is not the creator of the lock
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0;
+}
+
+
+int KISSDB_get(KISSDB *db, const void *key, void *vbuf)
+{
+ Kdb_rdlock(&db->shmem_info->rwlock);
+
+ uint8_t tmp[TMP_BUFFER_LENGTH];
+ uint64_t current;
+ const uint8_t *kptr;
+ unsigned long klen, i;
+ long n = 0;
+ uint64_t checksum, backupChecksum, crc;
+ uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size;
+ int64_t offset, backupOffset, htoffset, checksumOffset, flagOffset; //lasthtoffset
+ Hashtable_slot_s *cur_hash_table;
+
+#ifdef __writeThrough
+ //if new one or more hashtables were appended, remap shared memory block to adress space
+ if (db->old_mapped_size < db->shmem_info->shmem_size)
+ {
+ Kdb_bool temp;
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp);
+ if(db->shmem_ht_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ res = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->shmem_info->shmem_size);
+ if (res == Kdb_false)
+ return KISSDB_ERROR_REMAP_SHM;
+ db->old_mapped_size = db->shmem_info->shmem_size;
+ }
+#endif
+
+ htoffset = KISSDB_HEADER_SIZE; //lasthtoffset
+ cur_hash_table = db->hash_tables;//pointer to current hashtable in memory
+ for (i = 0; i < db->shmem_info->num_hash_tables; ++i)
+ {
+ offset = cur_hash_table[hash].offsetA;//get fileoffset where the data can be found in the file
+#ifdef __useBackups
+ //get information about current valid offset to latest written data
+ if(cur_hash_table[hash].current == 0x00) //valid is offsetA
+ {
+ offset = cur_hash_table[hash].offsetA;
+ checksum = cur_hash_table[hash].checksumA;
+ }
+ else
+ {
+ offset = cur_hash_table[hash].offsetB;
+ checksum = cur_hash_table[hash].checksumB;
+ }
+#endif
+
+ if (offset >= KISSDB_HEADER_SIZE) //if a valid offset is available in the slot
+ {
+ if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to this offset
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ kptr = (const uint8_t *) key;
+ klen = db->key_size;
+ while (klen)
+ {
+ n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen);
+ if (n > 0)
+ {
+ if (memcmp(kptr, tmp, n))//if key does not match -> search in next hashtable
+ goto get_no_match_next_hash_table;
+ kptr += n;
+ klen -= (unsigned long) n;
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+ }
+ }
+ if (read(db->fd, vbuf, db->value_size) == db->value_size) //if key matches at the fileoffset -> read the value
+ {
+ //crc check for file content
+#ifdef __useBackups
+ //only validate checksums at read if checksum of file is invalid
+ if (db->shmem_info->crc_invalid == Kdb_true)
+ {
+ //verify checksum of current key/value pair
+ crc = 0;
+ crc = (uint64_t) pcoCrc32(crc, (unsigned char*) vbuf, db->value_size);
+ if (checksum != crc)
+ {
+ //printf("KISSDB_get: WARNING: checksum invalid -> try to read from valid data block \n");
+ //try to read valid data from backup
+ Hashtable_slot_s slot = cur_hash_table[hash];
+ if (cur_hash_table[hash].current == 0x00) //current is offsetA, but Data there is corrupt--> so use offsetB as backupOffset
+ {
+ backupOffset = cur_hash_table[hash].offsetB;
+ backupChecksum = cur_hash_table[hash].checksumB;
+ checksumOffset = htoffset + (sizeof(Hashtable_slot_s) * hash + sizeof(slot.offsetA)); //offset that points to checksumA
+ current = 0x01; //current is offsetB
+ }
+ else
+ {
+ backupOffset = cur_hash_table[hash].offsetA;
+ backupChecksum = cur_hash_table[hash].checksumA;
+ checksumOffset = htoffset
+ + (sizeof(Hashtable_slot_s) * hash + sizeof(slot.offsetA) + sizeof(slot.checksumA)
+ + sizeof(slot.offsetB)); //offset that points to checksumB
+ current = 0x00;
+ }
+ flagOffset = htoffset
+ + (sizeof(Hashtable_slot_s) * hash + (sizeof(Hashtable_slot_s) - sizeof(slot.current))); //offset that points to currentflag
+
+ //seek to backup data
+ if (lseek(db->fd, backupOffset + db->key_size, SEEK_SET) == -1) //move filepointer to data of key-value pair //TODO make checksum over key AND data ??
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+ //verify checksum of backup key/value pair
+ //read from backup data
+ if (read(db->fd, vbuf, db->value_size) == db->value_size) //read value of backup Data block
+ {
+ //generate checksum of backup
+ crc = 0;
+ crc = (uint64_t) pcoCrc32(crc, (unsigned char*) vbuf, db->value_size);
+ if (crc == backupChecksum) //if checksum ok
+ {
+ //printf("KISSDB_get: WARNING: OVERWRITING CORRUPT DATA \n");
+ //seek to corrupt data
+ if (lseek(db->fd, offset + db->key_size, SEEK_SET) == -1) //move filepointer to data of corrupt key-value pair
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ //overwrite corrupt data
+ if (write( db->fd, vbuf, db->value_size) != db->value_size ) //write value
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ //seek to header slot and update checksum of corrupt data (do not modify offsets)
+ if (lseek(db->fd, checksumOffset, SEEK_SET) == -1) //move to checksumX in file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t) ) //write checksumX to hashtbale slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ //update checksumX in memory
+ if (cur_hash_table[hash].current == 0x00) //current is offsetA, but Data there is corrupt--> so update checksumA with new checksum
+ cur_hash_table[hash].checksumA = crc;
+ else
+ cur_hash_table[hash].checksumB = crc;
+ //switch current valid to backup
+
+ if (lseek(db->fd, flagOffset, SEEK_SET) == -1) //move to current flag in file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, &current, sizeof(uint64_t)) != sizeof(uint64_t) ) //write current hashtable slot in file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ //update current valid in memory
+ cur_hash_table[hash].current = current;
+ //fsync(db->fd)
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+ }
+ else //if checksum not valid, return NOT FOUND
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+ }
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ }
+ }
+#endif
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+ }
+ //get_no_match_next_hash_table: cur_hash_table += db->hash_table_size + 1;
+ get_no_match_next_hash_table: //update lastht offset //lasthtoffset = htoffset
+ htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to the next file-hashtable
+ cur_hash_table += (db->hash_table_size + 1); //pointer to the next memory-hashtable
+ }
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+}
+
+
+//TODO check current valid data to be deleted ?
+int KISSDB_delete(KISSDB *db, const void *key)
+{
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+ uint8_t tmp[TMP_BUFFER_LENGTH];
+ //uint64_t current = 0x00;
+ const uint8_t *kptr;
+ long n;
+ unsigned long klen, i;
+ //uint64_t crc = 0x00;
+ uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size;
+ //int64_t empty_offset = 0;
+ int64_t empty_offsetB = 0;
+ int64_t offset = 0;
+ int64_t htoffset = 0;
+ Hashtable_slot_s *cur_hash_table;
+
+#ifdef __writeThrough
+ //if new hashtable was appended, remap shared memory block to adress space
+ if (db->old_mapped_size < db->shmem_info->shmem_size)
+ {
+ Kdb_bool temp;
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp);
+ if(db->shmem_ht_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ result = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->shmem_info->shmem_size);
+ if (result == Kdb_false)
+ return KISSDB_ERROR_REMAP_SHM;
+ db->old_mapped_size = db->shmem_info->shmem_size;
+ }
+#endif
+
+ htoffset = KISSDB_HEADER_SIZE;
+ cur_hash_table = db->hash_tables; //pointer to current hashtable in memory
+
+ for (i = 0; i < db->shmem_info->num_hash_tables; ++i)
+ {
+ offset = cur_hash_table[hash].offsetA; //get fileoffset where the data can be found in the file
+ if (offset >= KISSDB_HEADER_SIZE)
+ {
+ if (lseek(db->fd, offset, SEEK_SET) == -1)
+ {
+ //set filepointer to Key value offset in file
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ kptr = (const uint8_t *) key;
+ klen = db->key_size;
+ while (klen)
+ {
+ n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen);
+ if (n > 0)
+ {
+ if (memcmp(kptr, tmp, n))//if key does not match, search in next hashtable
+ goto get_no_match_next_hash_table;
+ kptr += n;
+ klen -= (unsigned long) n;
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+ }
+ }
+ //TODO: mmap Hashtable slot structure to avoid seeking -> align hashtables at a multiple of a pagesize
+#ifdef __useFileMapping
+ empty_offsetB = -(offset + (db->key_size + db->value_size)); //todo check if offset is rewritten in put function !
+ cur_hash_table[hash].offsetB = empty_offsetB;
+ cur_hash_table[hash].checksumA = 0x00;
+ cur_hash_table[hash].checksumB = 0x00;
+ cur_hash_table[hash].current = 0x00;
+ int testoffset= lseek(fd, 0, SEEK_CUR); //filepointer position
+ int myoffset = htoffset + (sizeof(Hashtable_slot_s) * hash);
+
+ printf("Endoffset in file: %d , Offset for mmap: %d , size for mmap: %d \n", testoffset, myoffset, sizeof(Hashtable_slot_s));
+
+ //mmap the current hashtable slot
+ int mapFlag = PROT_WRITE | PROT_READ;
+ printf("In Delete: filedes: %d\n", db->fd);
+ htSlot = (Hashtable_slot_s*) mmap(NULL, sizeof(Hashtable_slot_s), mapFlag, MAP_SHARED, db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash) ); //TODO offset must be a multiple of pagesize
+ if (htSlot == MAP_FAILED)
+ {
+ printf("MMAP ERROR !\n");
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+ //do changes to slot in file
+ htSlot->offsetA = empty_offset;
+ htSlot->checksumA = 0x00;
+ htSlot->offsetB = empty_offsetB;
+ htSlot->checksumB = 0x00;
+ htSlot->current = 0x00;
+
+ //sync changes with file
+ if (0 != msync(htSlot, sizeof(Hashtable_slot_s), MS_SYNC | MS_INVALIDATE))
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+ //unmap memory
+ if (0 != munmap(htSlot, sizeof(Hashtable_slot_s)))
+ {
+ close(db->fd);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move Filepointer to used slot in file-hashtable.
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+#ifndef __useBackups
+ cur_hash_table[hash].offsetA = -offset; //negate offset in hashtable that points to the data
+ empty_offset = -offset;
+ //update hashtable slot in file header (delete existing offset information)
+ if (write( db->fd, &empty_offset, sizeof(int64_t)) != sizeof(int64_t) ) //mark slot in file-hashtable as deleted
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+
+#ifdef __useBackups
+ //negate offsetB, delete checksums and current flag in memory
+ cur_hash_table[hash].offsetA = -offset; //negate offset in hashtable that points to the data
+ empty_offsetB = -(offset + (db->key_size + db->value_size));
+ cur_hash_table[hash].checksumA = 0x00;
+ cur_hash_table[hash].offsetB = empty_offsetB;
+ cur_hash_table[hash].checksumB = 0x00;
+ cur_hash_table[hash].current = 0x00;
+ if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ //TODO currently, no synchronus Filedescriptor is used!!!! fsync after fflush is needed to do synchronus writes
+ //fsync(db->fd) // associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush()
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+ }
+ else
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */ //if no offset is found at hashed position in ht
+ }
+ get_no_match_next_hash_table: htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to next ht in file
+ cur_hash_table += (db->hash_table_size + 1); //pointer to next hashtable in memory
+ }
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 1; /* not found */
+}
+
+int KISSDB_put(KISSDB *db, const void *key, const void *value)
+{
+ Kdb_wrlock(&db->shmem_info->rwlock);
+
+ uint8_t tmp[TMP_BUFFER_LENGTH];
+ uint64_t current = 0x00;
+ const uint8_t *kptr;
+ unsigned long klen, i;
+ uint64_t hash = KISSDB_hash(key, db->key_size) % (uint64_t) db->hash_table_size;
+ int64_t offset, endoffset, htoffset, lasthtoffset;
+ Hashtable_slot_s *cur_hash_table;
+ Kdb_bool result = Kdb_false;
+ Kdb_bool temp = Kdb_false;
+ uint64_t crc = 0x00;
+ long n;
+ char delimiter[8] = "||||||||";
+
+#ifdef __writeThrough
+ //if new hashtable was appended, remap shared memory block to adress space
+ if(db->old_mapped_size < db->shmem_info->shmem_size)
+ {
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp);
+ if(db->shmem_ht_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ res = remapKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size,db->shmem_info->shmem_size);
+ if (res == Kdb_false)
+ return KISSDB_ERROR_REMAP_SHM;
+ db->old_mapped_size = db->shmem_info->shmem_size;
+ }
+#endif
+ lasthtoffset = htoffset = KISSDB_HEADER_SIZE;
+ cur_hash_table = db->hash_tables; //pointer to current hashtable in memory
+
+ for (i = 0; i < db->shmem_info->num_hash_tables; ++i)
+ {
+ offset = cur_hash_table[hash].offsetA; //fileoffset to data in file
+ if (offset >= KISSDB_HEADER_SIZE || offset < 0) //if a key with same hash is already in this slot or the same key must be overwritten
+ {
+ // if slot is marked as deleted, use this slot and negate the offset in order to reuse the existing data block
+ if(offset < 0)
+ {
+ offset = -offset; //get original offset where data was deleted
+ //printf("Overwriting slot for key: [%s] which was deleted before, offsetA: %d \n",key, offset);
+ if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to fileoffset where the key can be found
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, key, db->key_size) != db->key_size ) //write key
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, value, db->value_size) != db->value_size ) //write value
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+ // write same key and value again here because slot was deleted an can be reused like an initial write
+#ifdef __useBackups
+ if (write( db->fd, key, db->key_size) != db->key_size ) //write key
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, value, db->value_size) != db->value_size ) //write value
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ //seek back to hashtbale slot
+ if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move to beginning of hashtable slot in file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+#ifndef __useBackups
+ cur_hash_table[hash].offsetA = offset; //write the offset to the data in the memory-hashtable slot
+ if (write( db->fd, &offset, sizeof(int64_t)) != sizeof(int64_t) ) //write the offsetA to the data in the file-hashtable slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+
+#ifdef __useBackups
+ crc = 0x00;
+ crc = (uint32_t) pcoCrc32(crc, (unsigned char*)value, db->value_size);
+ cur_hash_table[hash].offsetA = offset; //write the offset to the data in the memory-hashtable slot
+ cur_hash_table[hash].checksumA = crc;
+ offset += (db->key_size + db->value_size);
+ cur_hash_table[hash].offsetB = offset; //write the offset to the data in the memory-hashtable slot
+ cur_hash_table[hash].checksumB = crc;
+ cur_hash_table[hash].current = 0x00;
+
+ if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ //fsync(db->fd) //associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush()
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+ }
+
+ //overwrite existing if key matches
+ // if cur_hash_table[hash].current == 0x00 -> offsetA is latest so write to offsetB else offsetB is latest and write to offsetA
+#ifdef __useBackups
+ if( cur_hash_table[hash].current == 0x00 )
+ offset = cur_hash_table[hash].offsetA; //0x00 -> offsetA is latest
+ else
+ offset = cur_hash_table[hash].offsetB; //else offsetB is latest
+#endif
+ if (lseek(db->fd, offset, SEEK_SET) == -1) //move filepointer to fileoffset where valid data can be found
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+ kptr = (const uint8_t *) key; //pointer to search key
+ klen = db->key_size;
+ while (klen)
+ {
+ n = (long) read(db->fd, tmp, (klen > sizeof(tmp)) ? sizeof(tmp) : klen);
+ if (n > 0)
+ {
+ if (memcmp(kptr, tmp, n)) //if search key does not match with key in file
+ goto put_no_match_next_hash_table;
+ kptr += n;
+ klen -= (unsigned long) n;
+ }
+ }
+
+ //if key matches -> seek to currently non valid data block for this key
+#ifdef __useBackups
+ if( cur_hash_table[hash].current == 0x00 )
+ offset = cur_hash_table[hash].offsetB; // 0x00 -> offsetA is latest so write new data to offsetB which holds old data
+ else
+ offset = cur_hash_table[hash].offsetA; // offsetB is latest so write new data to offsetA which holds old data
+
+ if (lseek(db->fd, offset, SEEK_SET) == -1)//move filepointer to fileoffset where backup data can be found
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, key, db->key_size) != db->key_size ) //write key
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ if (write( db->fd, value, db->value_size) != db->value_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ // seek back to slot in header for update of checksum and flag
+ if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move to beginning of hashtable slot in file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+ //generate crc for value
+ crc = 0x00;
+ crc = (uint64_t) pcoCrc32(crc, (unsigned char*)value, db->value_size);
+ current = 0x00;
+ Hashtable_slot_s slot = cur_hash_table[hash];
+
+ // check current flag and decide what parts of hashtable slot in file must be updated
+ if( cur_hash_table[hash].current == 0x00 ) //offsetA is latest -> modify settings of B
+ {
+ int seek = sizeof(slot.offsetA) + sizeof(slot.checksumA) + sizeof(slot.offsetB);
+ lseek(db->fd, seek , SEEK_CUR); //move to checksumB in file
+ if( write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t)) //write checksumB to file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ current = 0x01;
+ if( write( db->fd, &current, sizeof(uint64_t)) != sizeof(uint64_t)) //write current to hashtbale slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ cur_hash_table[hash].checksumB = crc;
+ cur_hash_table[hash].current = current;
+ }
+ else //offsetB is latest -> modify settings of A
+ {
+
+ int seek = sizeof(slot.offsetA);
+ lseek(db->fd, seek , SEEK_CUR); //move to checksumA in file
+ if( write( db->fd, &crc, sizeof(uint64_t)) != sizeof(uint64_t)) //write checksumA to file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ seek = sizeof(slot.offsetB) + sizeof(slot.checksumB);;
+ lseek(db->fd, seek , SEEK_CUR); //move to checksumA in file
+ current = 0x00;
+ if( write( db->fd, &current, sizeof(uint64_t)) != sizeof(uint64_t))//write current to hashtbale slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ cur_hash_table[hash].checksumA = crc;
+ cur_hash_table[hash].current = current;
+ }
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; //success
+ }
+ else //if key is not already inserted
+ {
+ /* add new data if an empty hash table slot is discovered */
+ if (lseek(db->fd, 0, SEEK_END) == -1) //filepointer to the end of the file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ endoffset = lseek(db->fd, 0, SEEK_CUR); //filepointer position
+ if (write( db->fd, key, db->key_size) != db->key_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, value, db->value_size) != db->value_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+
+ // write same key and value again here --> initial write
+#ifdef __useBackups
+ if (write( db->fd, key, db->key_size) != db->key_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, value, db->value_size) != db->value_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ if (lseek(db->fd, htoffset + (sizeof(Hashtable_slot_s) * hash), SEEK_SET) == -1) //move filepointer to file-hashtable slot in file (offsetA)
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#ifndef __useBackups
+ if (write( db->fd, &endoffset, sizeof(int64_t)) != sizeof(int64_t) ) //write the offsetA to the data in the file-hashtable slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ cur_hash_table[hash].offsetA = endoffset; //write the offsetA to the data in the memory-hashtable slot
+#endif
+
+#ifdef __useBackups
+ crc = 0x00;
+ crc = (uint64_t) pcoCrc32(crc, (unsigned char*) value, db->value_size);
+ offset = endoffset + (db->key_size + db->value_size);
+ cur_hash_table[hash].offsetA = endoffset; //write the offsetA to the data in the memory-hashtable slot
+ cur_hash_table[hash].checksumA = crc;
+ cur_hash_table[hash].offsetB = offset; //write the offset to the data in the memory-hashtable slot
+ cur_hash_table[hash].checksumB = crc;
+ cur_hash_table[hash].current = 0x00;
+ current = 0x00; //current
+
+ if (write( db->fd, &cur_hash_table[hash], sizeof(Hashtable_slot_s)) != sizeof(Hashtable_slot_s) ) //write updated data in the file-hashtable slot
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+ //fsync(db->fd) // associating a file stream with a synchronous file descriptor means that an fsync() call is not needed on the file descriptor after the fflush()
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+ }
+ put_no_match_next_hash_table: lasthtoffset = htoffset;
+ htoffset = cur_hash_table[db->hash_table_size].offsetA; // fileoffset to the next file-hashtable
+ cur_hash_table += (db->hash_table_size + 1); //pointer to the next memory-hashtable
+ }
+
+ /* if no existing slots, add a new page of hash table entries */
+ if (lseek(db->fd, 0, SEEK_END) == -1) //Filepointer to the end of file
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if(db->shmem_info->num_hash_tables > 0) //only write delimiter if first hashtable has been written (first delimiter is written by open call)
+ {
+ if (write( db->fd, &delimiter, sizeof(delimiter)) != sizeof(delimiter) ) //write delimiter
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ }
+ endoffset = lseek(db->fd, 0, SEEK_CUR);
+
+ //if new size would exceed old shared memory size-> allocate additional memory to shared memory (+ db->hash_table_size_bytes)
+ if( (db->hash_table_size_bytes * (db->shmem_info->num_hash_tables + 1)) > db->shmem_info->shmem_size)
+ {
+ if (db->shmem_ht_fd <= 0)
+ {
+ db->shmem_ht_fd = kdbShmemOpen(db->shmem_ht_name, db->old_mapped_size, &temp);
+ if(db->shmem_ht_fd < 0)
+ return KISSDB_ERROR_OPEN_SHM;
+ }
+ result = resizeKdbShmem(db->shmem_ht_fd, &db->hash_tables, db->old_mapped_size, db->old_mapped_size + db->hash_table_size_bytes);
+ if (result == Kdb_false)
+ {
+ return KISSDB_ERROR_RESIZE_SHM;
+ }
+ else
+ {
+ db->shmem_info->shmem_size = db->old_mapped_size + db->hash_table_size_bytes;
+ db->old_mapped_size = db->shmem_info->shmem_size;
+ }
+ }
+
+ //if( currentHtOffset <= db->old_mapped_size / sizeof(Hashtable_slot_s) )
+ cur_hash_table = &(db->hash_tables[(db->hash_table_size + 1) * db->shmem_info->num_hash_tables]);
+ //else
+ // return KISSDB_ERROR_ACCESS_VIOLATION;
+ memset(cur_hash_table, 0, db->hash_table_size_bytes); //hashtable init
+ cur_hash_table[hash].offsetA = endoffset + db->hash_table_size_bytes; /* where new entry will go (behind the new Ht that gets written)*/
+
+#ifdef __useBackups
+ crc = 0x00;
+ crc = (uint64_t) pcoCrc32(crc, (unsigned char*)value, db->value_size);
+ cur_hash_table[hash].checksumA = crc;
+ cur_hash_table[hash].checksumB = crc;
+ cur_hash_table[hash].offsetB = cur_hash_table[hash].offsetA + (db->key_size + db->value_size);//write the offset to the data in the memory-hashtable slot
+ cur_hash_table[hash].current = 0x00;
+#endif
+
+ // write new hashtable at the end of the file
+ if (write( db->fd, cur_hash_table, db->hash_table_size_bytes) != db->hash_table_size_bytes )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ // write key behind new hashtable
+ if (write( db->fd, key, db->key_size) != db->key_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ // write value behind key
+ if (write( db->fd, value, db->value_size) != db->value_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ // write same key and value again here --> initial write
+#ifdef __useBackups
+ if (write( db->fd, key, db->key_size) != db->key_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, value, db->value_size) != db->value_size )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+#endif
+
+ //if a hashtable exists, update link to new hashtable
+ if (db->shmem_info->num_hash_tables)
+ {
+ if (lseek(db->fd, lasthtoffset + (sizeof(Hashtable_slot_s) * db->hash_table_size), SEEK_SET) == -1)
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ if (write( db->fd, &endoffset, sizeof(int64_t)) != sizeof(int64_t) )
+ {
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return KISSDB_ERROR_IO;
+ }
+ db->hash_tables[((db->hash_table_size + 1) * (db->shmem_info->num_hash_tables - 1)) + db->hash_table_size].offsetA = endoffset; //update link to new hashtable in old hashtable
+ }
+ ++db->shmem_info->num_hash_tables;
+ //fsync(db->fd)
+ Kdb_unlock(&db->shmem_info->rwlock);
+ return 0; /* success */
+}
+
+
+
+#if 0
+/*
+ * prints the offsets stored in the shared Hashtable
+ */
+void printSharedHashtable(KISSDB *db)
+{
+ Hashtable_slot_s *cur_hash_table;
+ cur_hash_table = db->hash_tables;
+ unsigned long k;
+ unsigned long x = (db->hash_table_size * db->shmem_info->num_hash_tables);
+ //printf("Address of SHARED HT_NUMBER: %p \n", &db->shmem_info->num_hash_tables);
+ printf("Address of SHARED HEADER: %p \n", &cur_hash_table);
+ Header_s* ptr;
+ printf("HT Struct sizes: %d, %d, %d, %d,%d, %d, %d, %d\n", sizeof(ptr->KdbV), sizeof(ptr->checksum), sizeof(ptr->closeFailed), sizeof(ptr->closeOk), sizeof(ptr->hash_table_size),sizeof(ptr->key_size),sizeof(ptr->value_size),sizeof(ptr->delimiter));
+ printf("HEADER SIZE: %d \n", sizeof(Header_s));
+ printf("Hashtable_slot_s SIZE: %d \n", sizeof(Hashtable_slot_s));
+ for (k = 0; k < x; k++)
+ {
+ if (db->hash_tables[k].offsetA != 0)
+ {
+ printf("offsetA [%lu]: %" PRId64 " \n", k, db->hash_tables[k].offsetA);
+ printf("checksumA[%lu]: %" PRIu64 " \n", k, db->hash_tables[k].checksumA);
+ printf("offsetB [%lu]: %" PRId64 " \n", k, db->hash_tables[k].offsetB);
+ printf("checksumB[%lu]: %" PRIu64 " \n", k, db->hash_tables[k].checksumB);
+ printf("current [%lu]: %" PRIu64 " \n", k, db->hash_tables[k].current);
+ }
+ }
+}
+#endif
+
+
+void KISSDB_Iterator_init(KISSDB *db, KISSDB_Iterator *dbi)
+{
+ dbi->db = db;
+ dbi->h_no = 0; // number of read hashtables
+ dbi->h_idx = 0; // index in current hashtable
+}
+
+
+int KISSDB_Iterator_next(KISSDB_Iterator *dbi, void *kbuf, void *vbuf)
+{
+ int64_t offset;
+ Kdb_rdlock(&dbi->db->shmem_info->rwlock);
+
+ if ((dbi->h_no < (dbi->db->shmem_info->num_hash_tables)) && (dbi->h_idx < dbi->db->hash_table_size))
+ {
+ //TODO check for currently valid data block flag and use this offset instead of offsetA
+ while (!(offset = dbi->db->hash_tables[((dbi->db->hash_table_size + 1) * dbi->h_no) + dbi->h_idx].offsetA))
+ {
+ if (++dbi->h_idx >= dbi->db->hash_table_size)
+ {
+ dbi->h_idx = 0;
+ if (++dbi->h_no >= (dbi->db->shmem_info->num_hash_tables))
+ {
+ Kdb_unlock(&dbi->db->shmem_info->rwlock);
+ return 0;
+ }
+ }
+ }
+
+ if (lseek(dbi->db->fd, offset, SEEK_SET) == -1)
+ return KISSDB_ERROR_IO;
+ if (read(dbi->db->fd, kbuf, dbi->db->key_size) != dbi->db->key_size)
+ return KISSDB_ERROR_IO;
+ if (vbuf != NULL)
+ {
+ if (read(dbi->db->fd, vbuf, dbi->db->value_size) != dbi->db->value_size)
+ return KISSDB_ERROR_IO;
+ }
+ else
+ {
+ if (lseek(dbi->db->fd, dbi->db->value_size, SEEK_CUR) == -1)
+ return KISSDB_ERROR_IO;
+ }
+
+ if (++dbi->h_idx >= dbi->db->hash_table_size)
+ {
+ dbi->h_idx = 0;
+ ++dbi->h_no;
+ }
+ Kdb_unlock(&dbi->db->shmem_info->rwlock);
+ return 1;
+ }
+ Kdb_unlock(&dbi->db->shmem_info->rwlock);
+ return 0;
+}
+
+
+
+int readHeader(KISSDB* db, uint16_t* hash_table_size, uint64_t* key_size, uint64_t* value_size)
+{
+ //set Filepointer to the beginning of the file
+ if (lseek(db->fd, 0, SEEK_SET) == -1)
+ return KISSDB_ERROR_IO;
+ //mmap header from beginning of file
+ int mapFlag = PROT_WRITE | PROT_READ;
+ Header_s* ptr = 0;
+ ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0);
+ if (ptr == MAP_FAILED)
+ return KISSDB_ERROR_IO;
+
+ if ((ptr->KdbV[0] != 'K') || (ptr->KdbV[1] != 'd') || (ptr->KdbV[2] != 'B') || (ptr->KdbV[3] != KISSDB_VERSION))
+ return KISSDB_ERROR_CORRUPT_DBFILE;
+
+ if (!ptr->hash_table_size)
+ return KISSDB_ERROR_CORRUPT_DBFILE;
+ (*hash_table_size) = (uint16_t) ptr->hash_table_size;
+
+ if (!ptr->key_size)
+ return KISSDB_ERROR_CORRUPT_DBFILE;
+ (*key_size) = (uint64_t) ptr->key_size;
+
+ if (!ptr->value_size)
+ return KISSDB_ERROR_CORRUPT_DBFILE;
+ (*value_size) = (uint64_t) ptr->value_size;
+
+ //sync changes with file
+ if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE))
+ return KISSDB_ERROR_IO;
+
+ //unmap memory
+ if (0 != munmap(ptr, KISSDB_HEADER_SIZE))
+ return KISSDB_ERROR_IO;
+ return 0;
+}
+
+
+
+
+int writeHeader(KISSDB* db, uint16_t* hash_table_size, uint64_t* key_size, uint64_t* value_size)
+{
+ Header_s* ptr = 0;
+ int ret= 0;
+
+ //Seek to beginning of file
+ if (lseek(db->fd, 0, SEEK_SET) == -1)
+ return KISSDB_ERROR_IO;
+
+ //ftruncate file to needed size for header
+ ret = ftruncate(db->fd, KISSDB_HEADER_SIZE);
+ if (ret < 0)
+ return KISSDB_ERROR_IO;
+
+ //mmap header from beginning of file
+ int mapFlag = PROT_WRITE | PROT_READ;
+ ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0);
+ if (ptr == MAP_FAILED)
+ return KISSDB_ERROR_IO;
+
+ ptr->KdbV[0] = 'K';
+ ptr->KdbV[1] = 'd';
+ ptr->KdbV[2] = 'B';
+ ptr->KdbV[3] = KISSDB_VERSION;
+ ptr->KdbV[4] = '-';
+ ptr->KdbV[5] = '-';
+ ptr->KdbV[6] = '-';
+ ptr->KdbV[7] = '-';
+ ptr->checksum = 0x00;
+ ptr->closeFailed = 0x00; //remove closeFailed flag
+ ptr->closeOk = 0x01; //set closeOk flag
+ ptr->hash_table_size = (uint64_t)(*hash_table_size);
+ ptr->key_size = (uint64_t)(*key_size);
+ ptr->value_size = (uint64_t)(*value_size);
+ memcpy(ptr->delimiter,"||||||||", 8);
+
+ //sync changes with file
+ if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE))
+ return KISSDB_ERROR_IO;
+
+ //unmap memory
+ if (0 != munmap(ptr, KISSDB_HEADER_SIZE))
+ return KISSDB_ERROR_IO;
+ return 0;
+}
+
+
+int checkErrorFlags(KISSDB* db)
+{
+ //mmap header from beginning of file
+ int mapFlag = PROT_WRITE | PROT_READ;
+ Header_s* ptr = 0;
+ ptr = (Header_s*) mmap(NULL, KISSDB_HEADER_SIZE, mapFlag, MAP_SHARED, db->fd, 0);
+ if (ptr == MAP_FAILED)
+ return KISSDB_ERROR_IO;
+ //uint64_t crc = 0;
+
+#ifdef __checkerror
+ //check if closeFailed flag is set
+ if(ptr->closeFailed == 0x01)
+ {
+ //TODO implement verifyHashtableCS
+
+ //if closeFailed flag is set, something went wrong at last close -> so check crc
+ db->shmem_info->crc_invalid = Kdb_true; //check crc for further reads
+
+#if 0
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB -> closeFailed flag is set: "), DLT_UINT64(ptr->closeFailed));
+ crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s));
+ if(ptr->checksum != 0) //do not check if database is currently in creation
+ {
+ if (crc != ptr->checksum)
+ {
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CHECKSUM IN HEADER : "), DLT_UINT64(ptr->checksum), DLT_STRING(" != CHECKSUM CALCULATED: "), DLT_UINT64(crc));
+ //db->shmem_info->crc_invalid = Kdb_true; //check datablocks at further reads
+ //return KISSDB_ERROR_CORRUPT_DBFILE; //previous close failed and checksum invalid -> error state -> return error
+ }
+ else
+ {
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CECHKSUM IN HEADER: "), DLT_UINT64(ptr->checksum), DLT_STRING(" == CHECKSUM CALCULATED: "), DLT_UINT64(crc));
+ //db->shmem_info->crc_invalid = Kdb_false; //do not check datablocks at further reads
+ }
+ }
+ else
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Do not check checksum, database in creation: "), DLT_STRING(db->shmem_ht_name));
+#endif
+ }
+ else
+ {
+ //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: closeFailed flag is not set: "), DLT_UINT64(ptr->closeFailed));
+ ptr->closeFailed = 0x01; //NO: create close failed flag
+ }
+
+
+ //check if closeOk flag is set
+ if(ptr->closeOk == 0x01)
+ {
+ //DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB -> closeOk flag is set: "), DLT_UINT64(ptr->closeOk));
+ ptr->closeOk = 0x00;
+ }
+ else
+ {
+ //if closeOK is not set , something went wrong at last close
+ db->shmem_info->crc_invalid = Kdb_true; //do crc check at read
+
+#if 0
+ crc = (uint64_t) pcoCalcCrc32Csum(db->fd, sizeof(Header_s));
+ if(ptr->checksum != 0) //do not check if database is currently in creation
+ {
+ if (crc != ptr->checksum)
+ {
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CHECKSUM IN HEADER : "), DLT_UINT64(ptr->checksum), DLT_STRING(" != CHECKSUM CALCULATED: "), DLT_UINT64(crc));
+ //db->shmem_info->crc_invalid = Kdb_true;
+ //return KISSDB_ERROR_CORRUPT_DBFILE; //previous close failed and checksum invalid -> error state -> return error
+ }
+ else
+ {
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("OPENING DB: "), DLT_STRING(db->shmem_ht_name), DLT_STRING(" CECHKSUM IN HEADER: "), DLT_UINT64(ptr->checksum), DLT_STRING(" == CHECKSUM CALCULATED: "), DLT_UINT64(crc));
+ //db->shmem_info->crc_invalid = Kdb_false;
+ }
+ }
+ else
+ {
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_INFO, DLT_STRING("Do not check checksum, database in creation: "), DLT_STRING(db->shmem_ht_name));
+ }
+
+ DLT_LOG(persComLldbDLTCtx, DLT_LOG_WARN, DLT_STRING("OPENING DB -> closeOk flag is not set: "), DLT_UINT64(ptr->closeOk));
+#endif
+
+
+ }
+#endif
+ //sync changes with file
+ if (0 != msync(ptr, KISSDB_HEADER_SIZE, MS_SYNC | MS_INVALIDATE))
+ return KISSDB_ERROR_IO;
+
+ //unmap memory
+ if (0 != munmap(ptr, KISSDB_HEADER_SIZE))
+ return KISSDB_ERROR_IO;
+
+ return 0;
+}