summaryrefslogtreecommitdiff
path: root/storage/csv/ha_tina.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/csv/ha_tina.cc')
-rw-r--r--storage/csv/ha_tina.cc231
1 files changed, 146 insertions, 85 deletions
diff --git a/storage/csv/ha_tina.cc b/storage/csv/ha_tina.cc
index 07a4ffc65c5..9a7781e017d 100644
--- a/storage/csv/ha_tina.cc
+++ b/storage/csv/ha_tina.cc
@@ -46,10 +46,9 @@ TODO:
#endif
#include "mysql_priv.h"
-
+#include <mysql/plugin.h>
#include "ha_tina.h"
-#include <mysql/plugin.h>
/*
uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + uchar
@@ -69,6 +68,10 @@ static int free_share(TINA_SHARE *share);
static int read_meta_file(File meta_file, ha_rows *rows);
static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
+extern "C" void tina_get_status(void* param, int concurrent_insert);
+extern "C" void tina_update_status(void* param);
+extern "C" my_bool tina_check_status(void* param);
+
/* Stuff for shares */
pthread_mutex_t tina_mutex;
static HASH tina_open_tables;
@@ -93,11 +96,11 @@ int sort_set (tina_set *a, tina_set *b)
return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
}
-static byte* tina_get_key(TINA_SHARE *share,uint *length,
+static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length=share->table_name_length;
- return (byte*) share->table_name;
+ return (uchar*) share->table_name;
}
static int tina_init_func(void *p)
@@ -144,7 +147,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
initialize its members.
*/
if (!(share=(TINA_SHARE*) hash_search(&tina_open_tables,
- (byte*) table_name,
+ (uchar*) table_name,
length)))
{
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
@@ -164,6 +167,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
share->rows_recorded= 0;
share->update_file_opened= FALSE;
share->tina_write_opened= FALSE;
+ share->data_file_version= 0;
strmov(share->table_name, table_name);
fn_format(share->data_file_name, table_name, "", CSV_EXT,
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
@@ -174,7 +178,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
goto error;
share->saved_data_file_length= file_stat.st_size;
- if (my_hash_insert(&tina_open_tables, (byte*) share))
+ if (my_hash_insert(&tina_open_tables, (uchar*) share))
goto error;
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
@@ -203,7 +207,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
error:
pthread_mutex_unlock(&tina_mutex);
- my_free((gptr) share, MYF(0));
+ my_free((uchar*) share, MYF(0));
return NULL;
}
@@ -236,7 +240,7 @@ static int read_meta_file(File meta_file, ha_rows *rows)
DBUG_ENTER("ha_tina::read_meta_file");
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
- if (my_read(meta_file, (byte*)meta_buffer, META_BUFFER_SIZE, 0)
+ if (my_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
!= META_BUFFER_SIZE)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
@@ -305,7 +309,7 @@ static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
*ptr= (uchar)dirty;
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
- if (my_write(meta_file, (byte *)meta_buffer, META_BUFFER_SIZE, 0)
+ if (my_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
!= META_BUFFER_SIZE)
DBUG_RETURN(-1);
@@ -376,10 +380,10 @@ static int free_share(TINA_SHARE *share)
share->tina_write_opened= FALSE;
}
- hash_delete(&tina_open_tables, (byte*) share);
+ hash_delete(&tina_open_tables, (uchar*) share);
thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex);
- my_free((gptr) share, MYF(0));
+ my_free((uchar*) share, MYF(0));
}
pthread_mutex_unlock(&tina_mutex);
@@ -441,10 +445,10 @@ ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
*/
current_position(0), next_position(0), local_saved_data_file_length(0),
file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
- records_is_known(0)
+ local_data_file_version(0), records_is_known(0)
{
/* Set our original buffers from pre-allocated memory */
- buffer.set((char*)byte_buffer, IO_SIZE, system_charset_info);
+ buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
chain= chain_buffer;
file_buff= new Transparent_file();
}
@@ -454,7 +458,7 @@ ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
Encode a buffer into the quoted format.
*/
-int ha_tina::encode_quote(byte *buf)
+int ha_tina::encode_quote(uchar *buf)
{
char attribute_buffer[1024];
String attribute(attribute_buffer, sizeof(attribute_buffer),
@@ -558,7 +562,7 @@ int ha_tina::chain_append()
if (chain_alloced)
{
/* Must cast since my_malloc unlike malloc doesn't have a void ptr */
- if ((chain= (tina_set *) my_realloc((gptr)chain,
+ if ((chain= (tina_set *) my_realloc((uchar*)chain,
chain_size, MYF(MY_WME))) == NULL)
return -1;
}
@@ -584,12 +588,13 @@ int ha_tina::chain_append()
/*
Scans for a row.
*/
-int ha_tina::find_current_row(byte *buf)
+int ha_tina::find_current_row(uchar *buf)
{
off_t end_offset, curr_offset= current_position;
int eoln_len;
my_bitmap_map *org_bitmap;
int error;
+ bool read_all;
DBUG_ENTER("ha_tina::find_current_row");
/*
@@ -601,6 +606,8 @@ int ha_tina::find_current_row(byte *buf)
local_saved_data_file_length, &eoln_len)) == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE);
+ /* We must read all columns in case a table is opened for update */
+ read_all= !bitmap_is_clear_all(table->write_set);
/* Avoid asserts in ::store() for columns that are not going to be updated */
org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
error= HA_ERR_CRASHED_ON_USAGE;
@@ -609,37 +616,41 @@ int ha_tina::find_current_row(byte *buf)
for (Field **field=table->field ; *field ; field++)
{
+ char curr_char;
+
buffer.length(0);
- if (curr_offset < end_offset &&
- file_buff->get_value(curr_offset) == '"')
+ if (curr_offset >= end_offset)
+ goto err;
+ curr_char= file_buff->get_value(curr_offset);
+ if (curr_char == '"')
{
curr_offset++; // Incrementpast the first quote
- for(;curr_offset < end_offset; curr_offset++)
+ for(; curr_offset < end_offset; curr_offset++)
{
+ curr_char= file_buff->get_value(curr_offset);
// Need to convert line feeds!
- if (file_buff->get_value(curr_offset) == '"' &&
- ((file_buff->get_value(curr_offset + 1) == ',') ||
- (curr_offset == end_offset -1 )))
+ if (curr_char == '"' &&
+ (curr_offset == end_offset - 1 ||
+ file_buff->get_value(curr_offset + 1) == ','))
{
curr_offset+= 2; // Move past the , and the "
break;
}
- if (file_buff->get_value(curr_offset) == '\\' &&
- curr_offset != (end_offset - 1))
+ if (curr_char == '\\' && curr_offset != (end_offset - 1))
{
curr_offset++;
- if (file_buff->get_value(curr_offset) == 'r')
+ curr_char= file_buff->get_value(curr_offset);
+ if (curr_char == 'r')
buffer.append('\r');
- else if (file_buff->get_value(curr_offset) == 'n' )
+ else if (curr_char == 'n' )
buffer.append('\n');
- else if ((file_buff->get_value(curr_offset) == '\\') ||
- (file_buff->get_value(curr_offset) == '"'))
- buffer.append(file_buff->get_value(curr_offset));
+ else if (curr_char == '\\' || curr_char == '"')
+ buffer.append(curr_char);
else /* This could only happed with an externally created file */
{
buffer.append('\\');
- buffer.append(file_buff->get_value(curr_offset));
+ buffer.append(curr_char);
}
}
else // ordinary symbol
@@ -650,36 +661,30 @@ int ha_tina::find_current_row(byte *buf)
*/
if (curr_offset == end_offset - 1)
goto err;
- buffer.append(file_buff->get_value(curr_offset));
+ buffer.append(curr_char);
}
}
}
- else if (my_isdigit(system_charset_info,
- file_buff->get_value(curr_offset)))
+ else
{
- for(;curr_offset < end_offset; curr_offset++)
+ for(; curr_offset < end_offset; curr_offset++)
{
- if (file_buff->get_value(curr_offset) == ',')
+ curr_char= file_buff->get_value(curr_offset);
+ if (curr_char == ',')
{
- curr_offset+= 1; // Move past the ,
+ curr_offset++; // Skip the ,
break;
}
-
- if (my_isdigit(system_charset_info, file_buff->get_value(curr_offset)))
- buffer.append(file_buff->get_value(curr_offset));
- else if (file_buff->get_value(curr_offset) == '.')
- buffer.append(file_buff->get_value(curr_offset));
- else
- goto err;
+ buffer.append(curr_char);
}
}
- else
+
+ if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
{
- goto err;
+ if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
+ CHECK_FIELD_WARN))
+ goto err;
}
-
- if (bitmap_is_set(table->read_set, (*field)->field_index))
- (*field)->store(buffer.ptr(), buffer.length(), system_charset_info);
}
next_position= end_offset + eoln_len;
error= 0;
@@ -785,18 +790,6 @@ void ha_tina::update_status()
}
-bool ha_tina::check_if_locking_is_allowed(uint sql_command,
- ulong type, TABLE *table,
- uint count, uint current,
- uint *system_count,
- bool called_by_privileged_thread)
-{
- if (!called_by_privileged_thread)
- return check_if_log_table_locking_is_allowed(sql_command, type, table);
-
- return TRUE;
-}
-
/*
Open a database file. Keep in mind that tables are caches, so
this will not be called for every request. Any sort of positions
@@ -815,6 +808,7 @@ int ha_tina::open(const char *name, int mode, uint open_options)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
}
+ local_data_file_version= share->data_file_version;
if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
DBUG_RETURN(0);
@@ -851,7 +845,7 @@ int ha_tina::close(void)
of the file and appends the data. In an error case it really should
just truncate to the original position (this is not done yet).
*/
-int ha_tina::write_row(byte * buf)
+int ha_tina::write_row(uchar * buf)
{
int size;
DBUG_ENTER("ha_tina::write_row");
@@ -871,7 +865,7 @@ int ha_tina::write_row(byte * buf)
DBUG_RETURN(-1);
/* use pwrite, as concurrent reader could have changed the position */
- if (my_write(share->tina_write_filedes, (byte*)buffer.ptr(), size,
+ if (my_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1);
@@ -904,6 +898,7 @@ int ha_tina::open_update_temp_file_if_needed()
0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
return 1;
share->update_file_opened= TRUE;
+ temp_file_length= 0;
}
return 0;
}
@@ -916,7 +911,7 @@ int ha_tina::open_update_temp_file_if_needed()
This will be called in a table scan right before the previous ::rnd_next()
call.
*/
-int ha_tina::update_row(const byte * old_data, byte * new_data)
+int ha_tina::update_row(const uchar * old_data, uchar * new_data)
{
int size;
DBUG_ENTER("ha_tina::update_row");
@@ -928,15 +923,23 @@ int ha_tina::update_row(const byte * old_data, byte * new_data)
size= encode_quote(new_data);
+ /*
+ During update we mark each updating record as deleted
+ (see the chain_append()) then write new one to the temporary data file.
+ At the end of the sequence in the rnd_end() we append all non-marked
+ records from the data file to the temporary data file then rename it.
+ The temp_file_length is used to calculate new data file length.
+ */
if (chain_append())
DBUG_RETURN(-1);
if (open_update_temp_file_if_needed())
DBUG_RETURN(-1);
- if (my_write(update_temp_file, (byte*)buffer.ptr(), size,
+ if (my_write(update_temp_file, (uchar*)buffer.ptr(), size,
MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1);
+ temp_file_length+= size;
/* UPDATE should never happen on the log tables */
DBUG_ASSERT(!share->is_log_table);
@@ -954,7 +957,7 @@ int ha_tina::update_row(const byte * old_data, byte * new_data)
The table will then be deleted/positioned based on the ORDER (so RANDOM,
DESC, ASC).
*/
-int ha_tina::delete_row(const byte * buf)
+int ha_tina::delete_row(const uchar * buf)
{
DBUG_ENTER("ha_tina::delete_row");
ha_statistic_increment(&SSV::ha_delete_count);
@@ -963,6 +966,11 @@ int ha_tina::delete_row(const byte * buf)
DBUG_RETURN(-1);
stats.records--;
+ /* Update shared info */
+ DBUG_ASSERT(share->rows_recorded);
+ pthread_mutex_lock(&share->mutex);
+ share->rows_recorded--;
+ pthread_mutex_unlock(&share->mutex);
/* DELETE should never happen on the log table */
DBUG_ASSERT(!share->is_log_table);
@@ -971,6 +979,33 @@ int ha_tina::delete_row(const byte * buf)
}
+/**
+ @brief Initialize the data file.
+
+ @details Compare the local version of the data file with the shared one.
+ If they differ, there are some changes behind and we have to reopen
+ the data file to make the changes visible.
+ Call @c file_buff->init_buff() at the end to read the beginning of the
+ data file into buffer.
+
+ @retval 0 OK.
+ @retval 1 There was an error.
+*/
+
+int ha_tina::init_data_file()
+{
+ if (local_data_file_version != share->data_file_version)
+ {
+ local_data_file_version= share->data_file_version;
+ if (my_close(data_file, MYF(0)) ||
+ (data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
+ return 1;
+ }
+ file_buff->init_buff(data_file);
+ return 0;
+}
+
+
/*
All table scans call this first.
The order of a table scan is:
@@ -1007,9 +1042,8 @@ int ha_tina::rnd_init(bool scan)
DBUG_ENTER("ha_tina::rnd_init");
/* set buffer to the beginning of the file */
- file_buff->init_buff(data_file);
- if (share->crashed)
- DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
+ if (share->crashed || init_data_file())
+ DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
current_position= next_position= 0;
stats.records= 0;
@@ -1033,7 +1067,7 @@ int ha_tina::rnd_init(bool scan)
NULL and "". This is ok since this table handler is for spreadsheets and
they don't know about them either :)
*/
-int ha_tina::rnd_next(byte *buf)
+int ha_tina::rnd_next(uchar *buf)
{
int rc;
DBUG_ENTER("ha_tina::rnd_next");
@@ -1065,7 +1099,7 @@ int ha_tina::rnd_next(byte *buf)
its just a position. Look at the bdb code if you want to see a case
where something other then a number is stored.
*/
-void ha_tina::position(const byte *record)
+void ha_tina::position(const uchar *record)
{
DBUG_ENTER("ha_tina::position");
my_store_ptr(ref, ref_length, current_position);
@@ -1078,7 +1112,7 @@ void ha_tina::position(const byte *record)
my_get_ptr() retrieves the data for you.
*/
-int ha_tina::rnd_pos(byte * buf, byte *pos)
+int ha_tina::rnd_pos(uchar * buf, uchar *pos)
{
DBUG_ENTER("ha_tina::rnd_pos");
ha_statistic_increment(&SSV::ha_read_rnd_next_count);
@@ -1174,15 +1208,18 @@ int ha_tina::rnd_end()
while ((file_buffer_start != -1)) // while not end of file
{
bool in_hole= get_write_pos(&write_end, ptr);
+ off_t write_length= write_end - write_begin;
/* if there is something to write, write it */
- if ((write_end - write_begin) &&
- (my_write(update_temp_file,
- (byte*)(file_buff->ptr() +
- (write_begin - file_buff->start())),
- write_end - write_begin, MYF_RW)))
- goto error;
-
+ if (write_length)
+ {
+ if (my_write(update_temp_file,
+ (uchar*) (file_buff->ptr() +
+ (write_begin - file_buff->start())),
+ write_length, MYF_RW))
+ goto error;
+ temp_file_length+= write_length;
+ }
if (in_hole)
{
/* skip hole */
@@ -1230,11 +1267,26 @@ int ha_tina::rnd_end()
if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1))
DBUG_RETURN(-1);
/*
+ As we reopened the data file, increase share->data_file_version
+ in order to force other threads waiting on a table lock and
+ have already opened the table to reopen the data file.
+ That makes the latest changes become visible to them.
+ Update local_data_file_version as no need to reopen it in the
+ current thread.
+ */
+ share->data_file_version++;
+ local_data_file_version= share->data_file_version;
+ /*
The datafile is consistent at this point and the write filedes is
closed, so nothing worrying will happen to it in case of a crash.
Here we record this fact to the meta-file.
*/
(void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
+ /*
+ Update local_saved_data_file_length with the real length of the
+ data file.
+ */
+ local_saved_data_file_length= temp_file_length;
}
DBUG_RETURN(0);
@@ -1266,7 +1318,7 @@ error:
int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
{
char repaired_fname[FN_REFLEN];
- byte *buf;
+ uchar *buf;
File repair_file;
int rc;
ha_rows rows_repaired= 0;
@@ -1282,11 +1334,12 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
/* Don't assert in field::val() functions */
table->use_all_columns();
- if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
+ if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* position buffer to the start of the file */
- file_buff->init_buff(data_file);
+ if (init_data_file())
+ DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
/*
Local_saved_data_file_length is initialized during the lock phase.
@@ -1300,6 +1353,7 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
/* Read the file row-by-row. If everything is ok, repair is not needed. */
while (!(rc= find_current_row(buf)))
{
+ thd_inc_row_count(thd);
rows_repaired++;
current_position= next_position;
}
@@ -1338,7 +1392,7 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
{
write_end= min(file_buff->end(), current_position);
if ((write_end - write_begin) &&
- (my_write(repair_file, (byte*)file_buff->ptr(),
+ (my_write(repair_file, (uchar*)file_buff->ptr(),
write_end - write_begin, MYF_RW)))
DBUG_RETURN(-1);
@@ -1392,6 +1446,11 @@ int ha_tina::delete_all_rows()
rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
stats.records=0;
+ /* Update shared info */
+ pthread_mutex_lock(&share->mutex);
+ share->rows_recorded= 0;
+ pthread_mutex_unlock(&share->mutex);
+ local_saved_data_file_length= 0;
DBUG_RETURN(rc);
}
@@ -1442,17 +1501,18 @@ int ha_tina::create(const char *name, TABLE *table_arg,
int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
{
int rc= 0;
- byte *buf;
+ uchar *buf;
const char *old_proc_info;
ha_rows count= share->rows_recorded;
DBUG_ENTER("ha_tina::check");
old_proc_info= thd_proc_info(thd, "Checking table");
- if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
+ if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* position buffer to the start of the file */
- file_buff->init_buff(data_file);
+ if (init_data_file())
+ DBUG_RETURN(HA_ERR_CRASHED);
/*
Local_saved_data_file_length is initialized during the lock phase.
@@ -1465,6 +1525,7 @@ int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
/* Read the file row-by-row. If everything is ok, repair is not needed. */
while (!(rc= find_current_row(buf)))
{
+ thd_inc_row_count(thd);
count--;
current_position= next_position;
}