summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-10-14 10:38:47 +0200
committerunknown <pekka@mysql.com>2004-10-14 10:38:47 +0200
commit28de5d36073bfb55433f04c49112bcf7706409d0 (patch)
tree99052bd7fcbaae2cdd87a2aec9d34b77cb971497 /sql/ha_ndbcluster.cc
parent68e39ecaadfa9f9c0cf632b0666d4bdcfcb9d36e (diff)
parent1b31325e870798863a4298c717e829d567502ffb (diff)
downloadmariadb-git-28de5d36073bfb55433f04c49112bcf7706409d0.tar.gz
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mysql.com:/space/pekka/ndb/version/my41-tux ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Auto merged sql/ha_ndbcluster.cc: Auto merged
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc257
1 files changed, 142 insertions, 115 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 2234c128346..f3da9393996 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -1227,114 +1227,158 @@ inline int ha_ndbcluster::next_result(byte *buf)
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
-
/*
- Set bounds for a ordered index scan, use key_range
+ Set bounds for ordered index scan.
*/
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
- const key_range *key,
- int bound)
+ const key_range *keys[2])
{
- uint key_len, key_store_len, tot_len, key_tot_len;
- byte *key_ptr;
- KEY* key_info= table->key_info + active_index;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- Field* field;
- bool key_nullable, key_null;
+ const KEY *const key_info= table->key_info + active_index;
+ const uint key_parts= key_info->key_parts;
+ uint key_tot_len[2];
+ uint tot_len;
+ int i, j;
DBUG_ENTER("set_bounds");
- DBUG_PRINT("enter", ("bound: %d", bound));
- DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts));
- DBUG_PRINT("enter", ("key->length: %d", key->length));
- DBUG_PRINT("enter", ("key->flag: %d", key->flag));
+ DBUG_PRINT("info", ("key_parts=%d", key_parts));
- // Set bounds using key data
- tot_len= 0;
- key_ptr= (byte *) key->key;
- key_tot_len= key->length;
- for (; key_part != end; key_part++)
+ for (j= 0; j <= 1; j++)
{
- field= key_part->field;
- key_len= key_part->length;
- key_store_len= key_part->store_length;
- key_nullable= (bool) key_part->null_bit;
- key_null= (field->maybe_null() && *key_ptr);
- tot_len+= key_store_len;
-
- const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"};
- DBUG_ASSERT(bound >= 0 && bound <= 4);
- DBUG_PRINT("info", ("Set Bound%s on %s %s %s",
- bounds[bound],
- field->field_name,
- key_nullable ? "NULLABLE" : "",
- key_null ? "NULL":""));
- DBUG_PRINT("info", ("Total length %d", tot_len));
-
- DBUG_DUMP("key", (char*) key_ptr, key_store_len);
-
- if (op->setBound(field->field_name,
- bound,
- key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr),
- key_null ? 0 : key_len) != 0)
- ERR_RETURN(op->getNdbError());
-
- key_ptr+= key_store_len;
-
- if (tot_len >= key_tot_len)
- break;
-
- /*
- Only one bound which is not EQ can be set
- so if this bound was not EQ, bail out and make
- a best effort attempt
- */
- if (bound != NdbIndexScanOperation::BoundEQ)
- break;
+ const key_range *key= keys[j];
+ if (key != NULL)
+ {
+ // for key->flag see ha_rkey_function
+ DBUG_PRINT("info", ("key %d length=%d flag=%d",
+ j, key->length, key->flag));
+ key_tot_len[j]= key->length;
+ }
+ else
+ {
+ DBUG_PRINT("info", ("key %d not present", j));
+ key_tot_len[j]= 0;
+ }
}
+ tot_len= 0;
- DBUG_RETURN(0);
-}
+ for (i= 0; i < key_parts; i++)
+ {
+ KEY_PART_INFO *key_part= &key_info->key_part[i];
+ Field *field= key_part->field;
+ uint part_len= key_part->length;
+ uint part_store_len= key_part->store_length;
+ bool part_nullable= (bool) key_part->null_bit;
+ // Info about each key part
+ struct part_st {
+ bool part_last;
+ const key_range *key;
+ const byte *part_ptr;
+ bool part_null;
+ int bound_type;
+ const char* bound_ptr;
+ };
+ struct part_st part[2];
+
+ for (j= 0; j <= 1; j++)
+ {
+ struct part_st &p = part[j];
+ p.key= NULL;
+ p.bound_type= -1;
+ if (tot_len < key_tot_len[j])
+ {
+ p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
+ p.key= keys[j];
+ p.part_ptr= &p.key->key[tot_len];
+ p.part_null= (field->maybe_null() && *p.part_ptr);
+ p.bound_ptr= (const char *)
+ p.part_null ? 0 : part_nullable ? p.part_ptr + 1 : p.part_ptr;
+
+ if (j == 0)
+ {
+ switch (p.key->flag)
+ {
+ case HA_READ_KEY_EXACT:
+ p.bound_type= NdbIndexScanOperation::BoundEQ;
+ break;
+ case HA_READ_KEY_OR_NEXT:
+ p.bound_type= NdbIndexScanOperation::BoundLE;
+ break;
+ case HA_READ_AFTER_KEY:
+ if (! p.part_last)
+ p.bound_type= NdbIndexScanOperation::BoundLE;
+ else
+ p.bound_type= NdbIndexScanOperation::BoundLT;
+ break;
+ default:
+ break;
+ }
+ }
+ if (j == 1) {
+ switch (p.key->flag)
+ {
+ case HA_READ_BEFORE_KEY:
+ if (! p.part_last)
+ p.bound_type= NdbIndexScanOperation::BoundGE;
+ else
+ p.bound_type= NdbIndexScanOperation::BoundGT;
+ break;
+ case HA_READ_AFTER_KEY: // weird
+ p.bound_type= NdbIndexScanOperation::BoundGE;
+ break;
+ default:
+ break;
+ }
+ }
-#ifndef DBUG_OFF
+ if (p.bound_type == -1)
+ {
+ DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
+ DBUG_ASSERT(false);
+ // Stop setting bounds but continue with what we have
+ DBUG_RETURN(0);
+ }
+ }
+ }
-const char* key_flag_strs[] =
-{ "HA_READ_KEY_EXACT",
- "HA_READ_KEY_OR_NEXT",
- "HA_READ_KEY_OR_PREV",
- "HA_READ_AFTER_KEY",
- "HA_READ_BEFORE_KEY",
- "HA_READ_PREFIX",
- "HA_READ_PREFIX_LAST",
- "HA_READ_PREFIX_LAST_OR_PREV",
- "HA_READ_MBR_CONTAIN",
- "HA_READ_MBR_INTERSECT",
- "HA_READ_MBR_WITHIN",
- "HA_READ_MBR_DISJOINT",
- "HA_READ_MBR_EQUAL"
-};
+ // Seen with e.g. b = 1 and c > 1
+ if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
+ part[1].bound_type == NdbIndexScanOperation::BoundGE &&
+ memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
+ {
+ DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
+ part[0].bound_type= NdbIndexScanOperation::BoundEQ;
+ part[1].bound_type= -1;
+ }
+ // Not seen but was in previous version
+ if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
+ part[1].bound_type == NdbIndexScanOperation::BoundGE &&
+ memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
+ {
+ DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
+ part[1].bound_type= -1;
+ }
-const int no_of_key_flags = sizeof(key_flag_strs)/sizeof(char*);
+ for (j= 0; j <= 1; j++)
+ {
+ struct part_st &p = part[j];
+ // Set bound if not done with this key
+ if (p.key != NULL)
+ {
+ DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
+ j, i, tot_len, part_len, p.part_last, p.bound_type));
+ DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
+
+ // Set bound if not cancelled via type -1
+ if (p.bound_type != -1)
+ if (op->setBound(field->field_name, p.bound_type, p.bound_ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ }
-void print_key(const key_range* key, const char* info)
-{
- if (key)
- {
- const char* str= key->flag < no_of_key_flags ?
- key_flag_strs[key->flag] : "Unknown flag";
-
- DBUG_LOCK_FILE;
- fprintf(DBUG_FILE,"%s: %s, length=%d, key=", info, str, key->length);
- uint i;
- for (i=0; i<key->length-1; i++)
- fprintf(DBUG_FILE,"%0d ", key->key[i]);
- fprintf(DBUG_FILE, "\n");
- DBUG_UNLOCK_FILE;
+ tot_len+= part_store_len;
}
- return;
+ DBUG_RETURN(0);
}
-#endif
/*
Start ordered index scan in NDB
@@ -1353,13 +1397,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
- DBUG_EXECUTE("enter", print_key(start_key, "start_key"););
- DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
-
// Check that sorted seems to be initialised
DBUG_ASSERT(sorted == 0 || sorted == 1);
- if(m_active_cursor == 0)
+ if (m_active_cursor == 0)
{
restart= false;
NdbOperation::LockMode lm=
@@ -1380,29 +1421,15 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if(op->reset_bounds())
DBUG_RETURN(ndb_err(m_active_trans));
}
-
- if (start_key &&
- set_bounds(op, start_key,
- (start_key->flag == HA_READ_KEY_EXACT) ?
- NdbIndexScanOperation::BoundEQ :
- (start_key->flag == HA_READ_AFTER_KEY) ?
- NdbIndexScanOperation::BoundLT :
- NdbIndexScanOperation::BoundLE))
- DBUG_RETURN(1);
- if (end_key)
{
- if (start_key && start_key->flag == HA_READ_KEY_EXACT)
- {
- DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key"));
- }
- else if (set_bounds(op, end_key,
- (end_key->flag == HA_READ_AFTER_KEY) ?
- NdbIndexScanOperation::BoundGE :
- NdbIndexScanOperation::BoundGT))
- DBUG_RETURN(1);
+ const key_range *keys[2]= { start_key, end_key };
+ int ret= set_bounds(op, keys);
+ if (ret)
+ DBUG_RETURN(ret);
}
- if(!restart)
+
+ if (!restart)
{
DBUG_RETURN(define_read_attrs(buf, op));
}