diff options
author | Andrey Hristov <andrey@php.net> | 2009-12-17 13:29:46 +0000 |
---|---|---|
committer | Andrey Hristov <andrey@php.net> | 2009-12-17 13:29:46 +0000 |
commit | 028bd4ba02deca8bcd30d7e45087c774ecfc33db (patch) | |
tree | 2db3ca7a4bf7af3af51cc870aa6b5b05b9db26e4 | |
parent | 22528b14a4c0055589ecc012bf6a6bd4e7bc1f57 (diff) | |
download | php-git-028bd4ba02deca8bcd30d7e45087c774ecfc33db.tar.gz |
refactoring : move more network related functions to
mysqlnd_net.c . Now communication is split on two levels:
- logical (functions send and receive)
- physical (functions network_read and network_write)
-rw-r--r-- | ext/mysqlnd/mysqlnd_loaddata.c | 8 | ||||
-rw-r--r-- | ext/mysqlnd/mysqlnd_net.c | 358 | ||||
-rw-r--r-- | ext/mysqlnd/mysqlnd_structs.h | 6 | ||||
-rw-r--r-- | ext/mysqlnd/mysqlnd_wireprotocol.c | 357 |
4 files changed, 365 insertions, 364 deletions
diff --git a/ext/mysqlnd/mysqlnd_loaddata.c b/ext/mysqlnd/mysqlnd_loaddata.c index 65db122ce3..77670ec1ff 100644 --- a/ext/mysqlnd/mysqlnd_loaddata.c +++ b/ext/mysqlnd/mysqlnd_loaddata.c @@ -182,7 +182,7 @@ mysqlnd_handle_local_infile(MYSQLND *conn, const char *filename, zend_bool *is_w if (!(conn->options.flags & CLIENT_LOCAL_FILES)) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "LOAD DATA LOCAL INFILE forbidden"); /* write empty packet to server */ - ret = mysqlnd_stream_write_w_header(conn, empty_packet, 0 TSRMLS_CC); + ret = conn->net->m.send(conn, empty_packet, 0 TSRMLS_CC); *is_warning = TRUE; goto infile_error; } @@ -202,14 +202,14 @@ mysqlnd_handle_local_infile(MYSQLND *conn, const char *filename, zend_bool *is_w infile.local_infile_error(info, conn->error_info.error, sizeof(conn->error_info.error) TSRMLS_CC); /* write empty packet to server */ - ret = mysqlnd_stream_write_w_header(conn, empty_packet, 0 TSRMLS_CC); + ret = conn->net->m.send(conn, empty_packet, 0 TSRMLS_CC); goto infile_error; } /* read data */ while ((bufsize = infile.local_infile_read (info, buf + MYSQLND_HEADER_SIZE, buflen - MYSQLND_HEADER_SIZE TSRMLS_CC)) > 0) { - if ((ret = mysqlnd_stream_write_w_header(conn, buf, bufsize TSRMLS_CC)) < 0) { + if ((ret = conn->net->m.send(conn, buf, bufsize TSRMLS_CC)) < 0) { DBG_ERR_FMT("Error during read : %d %s %s", CR_SERVER_LOST, UNKNOWN_SQLSTATE, lost_conn); SET_CLIENT_ERROR(conn->error_info, CR_SERVER_LOST, UNKNOWN_SQLSTATE, lost_conn); goto infile_error; @@ -217,7 +217,7 @@ mysqlnd_handle_local_infile(MYSQLND *conn, const char *filename, zend_bool *is_w } /* send empty packet for eof */ - if ((ret = mysqlnd_stream_write_w_header(conn, empty_packet, 0 TSRMLS_CC)) < 0) { + if ((ret = conn->net->m.send(conn, empty_packet, 0 TSRMLS_CC)) < 0) { SET_CLIENT_ERROR(conn->error_info, CR_SERVER_LOST, UNKNOWN_SQLSTATE, lost_conn); goto infile_error; } diff --git a/ext/mysqlnd/mysqlnd_net.c b/ext/mysqlnd/mysqlnd_net.c index e890f1c614..64768cf4f8 100644 --- a/ext/mysqlnd/mysqlnd_net.c +++ b/ext/mysqlnd/mysqlnd_net.c @@ -28,6 +28,9 @@ #include "ext/standard/sha1.h" #include "php_network.h" #include "zend_ini.h" +#ifdef MYSQLND_COMPRESSION_ENABLED +#include <zlib.h> +#endif #ifndef PHP_WIN32 #include <netinet/tcp.h> @@ -58,13 +61,13 @@ mysqlnd_set_sock_no_delay(php_stream * stream) /* }}} */ -/* {{{ mysqlnd_net::read_from_stream */ +/* {{{ mysqlnd_net::network_read */ static enum_func_status -MYSQLND_METHOD(mysqlnd_net, read_from_stream)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) +MYSQLND_METHOD(mysqlnd_net, network_read)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) { size_t to_read = count, ret; size_t old_chunk_size = conn->net->stream->chunk_size; - DBG_ENTER("mysqlnd_net::read_from_stream"); + DBG_ENTER("mysqlnd_net::network_read"); DBG_INF_FMT("count=%u", count); conn->net->stream->chunk_size = MIN(to_read, conn->net->options.net_read_buffer_size); while (to_read) { @@ -82,12 +85,12 @@ MYSQLND_METHOD(mysqlnd_net, read_from_stream)(MYSQLND * conn, zend_uchar * buffe /* }}} */ -/* {{{ mysqlnd_net::stream_write */ +/* {{{ mysqlnd_net::network_write */ static size_t -MYSQLND_METHOD(mysqlnd_net, stream_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC) +MYSQLND_METHOD(mysqlnd_net, network_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC) { size_t ret; - DBG_ENTER("mysqlnd_net::stream_write"); + DBG_ENTER("mysqlnd_net::network_write"); ret = php_stream_write(conn->net->stream, (char *)buf, count); DBG_RETURN(ret); } @@ -95,7 +98,6 @@ MYSQLND_METHOD(mysqlnd_net, stream_write)(MYSQLND * const conn, const zend_uchar - /* {{{ mysqlnd_net::connect */ static enum_func_status MYSQLND_METHOD(mysqlnd_net, connect)(MYSQLND_NET * net, const char * const scheme, size_t scheme_len, zend_bool persistent, char **errstr, int * errcode TSRMLS_DC) @@ -185,6 +187,333 @@ MYSQLND_METHOD(mysqlnd_net, connect)(MYSQLND_NET * net, const char * const schem /* }}} */ +/* We assume that MYSQLND_HEADER_SIZE is 4 bytes !! */ +#define STORE_HEADER_SIZE(safe_storage, buffer) int4store((safe_storage), (*(uint32_t *)(buffer))) +#define RESTORE_HEADER_SIZE(buffer, safe_storage) STORE_HEADER_SIZE((safe_storage), (buffer)) + +/* {{{ mysqlnd_net::send */ +/* + IMPORTANT : It's expected that buf has place in the beginning for MYSQLND_HEADER_SIZE !!!! + This is done for performance reasons in the caller of this function. + Otherwise we will have to do send two TCP packets, or do new alloc and memcpy. + Neither are quick, thus the clients of this function are obligated to do + what they are asked for. + + `count` is actually the length of the payload data. Thus : + count + MYSQLND_HEADER_SIZE = sizeof(buf) (not the pointer but the actual buffer) +*/ +size_t +MYSQLND_METHOD(mysqlnd_net, send)(MYSQLND * const conn, char * const buf, size_t count TSRMLS_DC) +{ + zend_uchar safe_buf[((MYSQLND_HEADER_SIZE) + (sizeof(zend_uchar)) - 1) / (sizeof(zend_uchar))]; + zend_uchar *safe_storage = safe_buf; + MYSQLND_NET *net = conn->net; + size_t old_chunk_size = net->stream->chunk_size; + size_t ret, packets_sent = 1; + size_t left = count; + zend_uchar *p = (zend_uchar *) buf; + zend_uchar * compress_buf = NULL; + size_t to_be_sent; + + DBG_ENTER("mysqlnd_net::send"); + DBG_INF_FMT("conn=%llu count=%lu compression=%d", conn->thread_id, count, net->compressed); + + net->stream->chunk_size = MYSQLND_MAX_PACKET_SIZE; + + if (net->compressed == TRUE) { + size_t comp_buf_size = MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE + MIN(left, MYSQLND_MAX_PACKET_SIZE); + DBG_INF_FMT("compress_buf_size=%d", comp_buf_size); + compress_buf = emalloc(comp_buf_size); + } + + do { + to_be_sent = MIN(left, MYSQLND_MAX_PACKET_SIZE); +#ifdef MYSQLND_COMPRESSION_ENABLED + if (net->compressed == TRUE) { + /* here we need to compress the data and then write it, first comes the compressed header */ + uLong tmp_complen = to_be_sent; + size_t payload_size; + zend_uchar * uncompressed_payload = p; /* should include the header */ + int res; + + STORE_HEADER_SIZE(safe_storage, uncompressed_payload); + int3store(uncompressed_payload, to_be_sent); + int1store(uncompressed_payload + 3, net->packet_no); + + DBG_INF_FMT("compress(%p, %p, %p, %d)", (compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, p, to_be_sent + MYSQLND_HEADER_SIZE); + res = compress((compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); + if (res == Z_OK) { + DBG_INF_FMT("compression successful. compressed size=%d", tmp_complen); + int3store(compress_buf + MYSQLND_HEADER_SIZE, to_be_sent + MYSQLND_HEADER_SIZE); + payload_size = tmp_complen; + } else { + DBG_INF_FMT("compression NOT successful. error=%d Z_OK=%d Z_BUF_ERROR=%d Z_MEM_ERROR=%d", res, Z_OK, Z_BUF_ERROR, Z_MEM_ERROR); + int3store(compress_buf + MYSQLND_HEADER_SIZE, 0); + memcpy(compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); + payload_size = to_be_sent + MYSQLND_HEADER_SIZE; + } + RESTORE_HEADER_SIZE(uncompressed_payload, safe_storage); + + int3store(compress_buf, payload_size); + int1store(compress_buf + 3, net->packet_no); + DBG_INF_FMT("writing %d bytes to the network", payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE); + ret = conn->net->m.network_write(conn, compress_buf, payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE TSRMLS_CC); + net->compressed_envelope_packet_no++; + #if WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY + if (res == Z_OK) { + size_t decompressed_size = left + MYSQLND_HEADER_SIZE; + uLongf tmp_complen2 = decompressed_size; + zend_uchar * decompressed_data = malloc(decompressed_size); + int error = uncompress(decompressed_data, &tmp_complen2, compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, payload_size); + if (error == Z_OK) { + int i; + DBG_INF("success decompressing"); + for (i = 0 ; i < decompressed_size; i++) { + if (i && (i % 30 == 0)) { + printf("\n\t\t"); + } + printf("%.2X ", (int)*((char*)&(decompressed_data[i]))); + DBG_INF_FMT("%.2X ", (int)*((char*)&(decompressed_data[i]))); + } + } else { + DBG_INF("error decompressing"); + } + free(decompressed_data); + } + #endif /* WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY */ + } else +#endif /* MYSQLND_COMPRESSION_ENABLED */ + { + DBG_INF("no compression"); + STORE_HEADER_SIZE(safe_storage, p); + int3store(p, to_be_sent); + int1store(p + 3, net->packet_no); + ret = conn->net->m.network_write(conn, p, to_be_sent + MYSQLND_HEADER_SIZE TSRMLS_CC); + RESTORE_HEADER_SIZE(p, safe_storage); + net->compressed_envelope_packet_no++; + } + net->packet_no++; + + p += to_be_sent; + left -= to_be_sent; + packets_sent++; + /* + if left is 0 then there is nothing more to send, but if the last packet was exactly + with the size MYSQLND_MAX_PACKET_SIZE we need to send additional packet, which has + empty payload. Thus if left == 0 we check for to_be_sent being the max size. If it is + indeed it then loop once more, then to_be_sent will become 0, left will stay 0. Empty + packet will be sent and this loop will end. + */ + } while (ret && (left > 0 || to_be_sent == MYSQLND_MAX_PACKET_SIZE)); + + DBG_INF_FMT("packet_size=%d packet_no=%d", left, net->packet_no); + /* Even for zero size payload we have to send a packet */ + if (!ret) { + DBG_ERR_FMT("Can't %u send bytes", count); + conn->state = CONN_QUIT_SENT; + SET_CLIENT_ERROR(conn->error_info, CR_SERVER_GONE_ERROR, UNKNOWN_SQLSTATE, mysqlnd_server_gone); + } + + MYSQLND_INC_CONN_STATISTIC_W_VALUE3(&conn->stats, + STAT_BYTES_SENT, count + packets_sent * MYSQLND_HEADER_SIZE, + STAT_PROTOCOL_OVERHEAD_OUT, packets_sent * MYSQLND_HEADER_SIZE, + STAT_PACKETS_SENT, packets_sent); + + net->stream->chunk_size = old_chunk_size; + if (compress_buf) { + efree(compress_buf); + } + DBG_RETURN(ret); +} +/* }}} */ + + +#ifdef MYSQLND_COMPRESSION_ENABLED +/* {{{ php_mysqlnd_read_buffer_is_empty */ +static zend_bool +php_mysqlnd_read_buffer_is_empty(MYSQLND_READ_BUFFER * buffer) +{ + return buffer->len? FALSE:TRUE; +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_read */ +static void +php_mysqlnd_read_buffer_read(MYSQLND_READ_BUFFER * buffer, size_t count, zend_uchar * dest) +{ + if (buffer->len >= count) { + memcpy(dest, buffer->data + buffer->offset, count); + buffer->offset += count; + buffer->len -= count; + } +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_bytes_left */ +static size_t +php_mysqlnd_read_buffer_bytes_left(MYSQLND_READ_BUFFER * buffer) +{ + return buffer->len; +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_free */ +static void +php_mysqlnd_read_buffer_free(MYSQLND_READ_BUFFER ** buffer TSRMLS_DC) +{ + DBG_ENTER("php_mysqlnd_read_buffer_free"); + if (*buffer) { + mnd_efree((*buffer)->data); + mnd_efree(*buffer); + *buffer = NULL; + } + DBG_VOID_RETURN; +} +/* }}} */ + + +/* {{{ php_mysqlnd_create_read_buffer */ +static MYSQLND_READ_BUFFER * +mysqlnd_create_read_buffer(size_t count TSRMLS_DC) +{ + MYSQLND_READ_BUFFER * ret = mnd_emalloc(sizeof(MYSQLND_READ_BUFFER)); + DBG_ENTER("mysqlnd_create_read_buffer"); + ret->is_empty = php_mysqlnd_read_buffer_is_empty; + ret->read = php_mysqlnd_read_buffer_read; + ret->bytes_left = php_mysqlnd_read_buffer_bytes_left; + ret->free_buffer = php_mysqlnd_read_buffer_free; + ret->data = mnd_emalloc(count); + ret->size = ret->len = count; + ret->offset = 0; + DBG_RETURN(ret); +} +/* }}} */ + + +/* {{{ mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer */ +static enum_func_status +mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(MYSQLND * conn, size_t net_payload_size TSRMLS_DC) +{ + MYSQLND_NET * net = conn->net; + size_t decompressed_size; + enum_func_status ret = PASS; + zend_uchar * compressed_data = NULL; + zend_uchar comp_header[COMPRESSED_HEADER_SIZE]; + DBG_ENTER("mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer"); + + /* Read the compressed header */ + if (FAIL == conn->net->m.network_read(conn, comp_header, COMPRESSED_HEADER_SIZE TSRMLS_CC)) { + DBG_RETURN(FAIL); + } + decompressed_size = uint3korr(comp_header); + + /* When decompressed_size is 0, then the data is not compressed, and we have wasted 3 bytes */ + /* we need to decompress the data */ + + if (decompressed_size) { + int error; + uLongf tmp_complen = decompressed_size; + compressed_data = emalloc(net_payload_size); + if (FAIL == conn->net->m.network_read(conn, compressed_data, net_payload_size TSRMLS_CC)) { + ret = FAIL; + goto end; + } + + net->uncompressed_data = mysqlnd_create_read_buffer(decompressed_size TSRMLS_CC); + error = uncompress(net->uncompressed_data->data, &tmp_complen, compressed_data, net_payload_size); + + DBG_INF_FMT("compressed data: decomp_len=%d compressed_size=%d", decompressed_size, net_payload_size); + if (error != Z_OK) { + DBG_ERR_FMT("Can't uncompress packet, error: %d", error); + ret = FAIL; + goto end; + } + } else { + DBG_INF_FMT("The server decided not to compress the data. Our job is easy. Copying %u bytes", net_payload_size); + net->uncompressed_data = mysqlnd_create_read_buffer(net_payload_size TSRMLS_CC); + if (FAIL == conn->net->m.network_read(conn, net->uncompressed_data->data, net_payload_size TSRMLS_CC)) { + ret = FAIL; + goto end; + } + } +end: + if (compressed_data) { + efree(compressed_data); + } + DBG_RETURN(ret); +} +#endif /* MYSQLND_COMPRESSION_ENABLED */ + + +/* {{{ mysqlnd_net::receive */ +static enum_func_status +MYSQLND_METHOD(mysqlnd_net, receive)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) +{ + size_t to_read = count; + zend_uchar * p = buffer; + MYSQLND_NET * net = conn->net; + + DBG_ENTER("mysqlnd_net::receive"); +#ifdef MYSQLND_COMPRESSION_ENABLED + if (net->compressed) { + if (net->uncompressed_data) { + size_t to_read_from_buffer = MIN(net->uncompressed_data->bytes_left(net->uncompressed_data), to_read); + DBG_INF_FMT("reading %u from uncompressed_data buffer", to_read_from_buffer); + if (to_read_from_buffer) { + net->uncompressed_data->read(net->uncompressed_data, to_read_from_buffer, (zend_uchar *) p); + p += to_read_from_buffer; + to_read -= to_read_from_buffer; + } + DBG_INF_FMT("left %u to read", to_read); + if (TRUE == net->uncompressed_data->is_empty(net->uncompressed_data)) { + /* Everything was consumed. This should never happen here, but for security */ + net->uncompressed_data->free_buffer(&net->uncompressed_data TSRMLS_CC); + } + } + if (to_read) { + zend_uchar net_header[MYSQLND_HEADER_SIZE]; + size_t net_payload_size; + zend_uchar packet_no; + + if (FAIL == net->m.network_read(conn, net_header, MYSQLND_HEADER_SIZE TSRMLS_CC)) { + DBG_RETURN(FAIL); + } + net_payload_size = uint3korr(net_header); + packet_no = uint1korr(net_header + 3); + if (net->compressed_envelope_packet_no != packet_no) { + DBG_ERR_FMT("Transport level: packets out of order. Expected %d received %d. Packet size=%d", + net->compressed_envelope_packet_no, packet_no, net_payload_size); + + php_error(E_WARNING, "Packets out of order. Expected %d received %d. Packet size="MYSQLND_SZ_T_SPEC, + net->compressed_envelope_packet_no, packet_no, net_payload_size); + DBG_RETURN(FAIL); + } + net->compressed_envelope_packet_no++; +#ifdef MYSQLND_DUMP_HEADER_N_BODY + DBG_INF_FMT("HEADER: hwd_packet_no=%d size=%3d", packet_no, net_payload_size); +#endif + /* Now let's read from the wire, decompress it and fill the read buffer */ + mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(conn, net_payload_size TSRMLS_CC); + + /* + Now a bit of recursion - read from the read buffer, + if the data which we have just read from the wire + is not enough, then the recursive call will try to + satisfy it until it is satisfied. + */ + DBG_RETURN(net->m.receive(conn, p, to_read TSRMLS_CC)); + } + DBG_RETURN(PASS); + } +#endif /* MYSQLND_COMPRESSION_ENABLED */ + DBG_RETURN(net->m.network_read(conn, p, to_read TSRMLS_CC)); +} +/* }}} */ + + /* {{{ mysqlnd_net::set_client_option */ static enum_func_status MYSQLND_METHOD(mysqlnd_net, set_client_option)(MYSQLND_NET * const net, enum mysqlnd_option option, const char * const value TSRMLS_DC) @@ -264,9 +593,11 @@ mysqlnd_net_init(zend_bool persistent TSRMLS_DC) net->persistent = persistent; net->m.connect = MYSQLND_METHOD(mysqlnd_net, connect); - net->m.stream_read = MYSQLND_METHOD(mysqlnd_net, read_from_stream); - net->m.stream_write = MYSQLND_METHOD(mysqlnd_net, stream_write); + net->m.send = MYSQLND_METHOD(mysqlnd_net, send); + net->m.receive = MYSQLND_METHOD(mysqlnd_net, receive); net->m.set_client_option = MYSQLND_METHOD(mysqlnd_net, set_client_option); + net->m.network_read = MYSQLND_METHOD(mysqlnd_net, network_read); + net->m.network_write = MYSQLND_METHOD(mysqlnd_net, network_write); net->m.free_contents = MYSQLND_METHOD(mysqlnd_net, free_contents); { @@ -309,3 +640,12 @@ mysqlnd_net_free(MYSQLND_NET * net TSRMLS_DC) } /* }}} */ + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: noet sw=4 ts=4 fdm=marker + * vim<600: noet sw=4 ts=4 + */ diff --git a/ext/mysqlnd/mysqlnd_structs.h b/ext/mysqlnd/mysqlnd_structs.h index a791e7dc95..68776d12b6 100644 --- a/ext/mysqlnd/mysqlnd_structs.h +++ b/ext/mysqlnd/mysqlnd_structs.h @@ -231,9 +231,11 @@ typedef struct st_mysqlnd_read_buffer { struct st_mysqlnd_net_methods { enum_func_status (*connect)(MYSQLND_NET * net, const char * const scheme, size_t scheme_len, zend_bool persistent, char **errstr, int * errcode TSRMLS_DC); - enum_func_status (*stream_read)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC); - size_t (*stream_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC); + size_t (*send)(MYSQLND * const conn, char * const buf, size_t count TSRMLS_DC); + size_t (*receive)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC); enum_func_status (*set_client_option)(MYSQLND_NET * const net, enum_mysqlnd_option option, const char * const value TSRMLS_DC); + enum_func_status (*network_read)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC); + size_t (*network_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC); void (*free_contents)(MYSQLND_NET * net TSRMLS_DC); }; diff --git a/ext/mysqlnd/mysqlnd_wireprotocol.c b/ext/mysqlnd/mysqlnd_wireprotocol.c index cb870be025..de8143a04b 100644 --- a/ext/mysqlnd/mysqlnd_wireprotocol.c +++ b/ext/mysqlnd/mysqlnd_wireprotocol.c @@ -27,15 +27,11 @@ #include "mysqlnd_block_alloc.h" #include "ext/standard/sha1.h" #include "zend_ini.h" -#ifdef MYSQLND_COMPRESSION_ENABLED -#include <zlib.h> -#endif #define MYSQLND_SILENT 1 #define MYSQLND_DUMP_HEADER_N_BODY - #define PACKET_READ_HEADER_AND_BODY(packet, conn, buf, buf_size, packet_type_as_text, packet_type) \ { \ DBG_INF_FMT("buf=%p size=%u", (buf), (buf_size)); \ @@ -51,7 +47,7 @@ (buf_size), (packet)->header.size, (packet)->header.size - (buf_size)); \ DBG_RETURN(FAIL); \ }\ - if (FAIL == mysqlnd_read_body((conn), &((packet)->header), (buf) TSRMLS_CC)) { \ + if (FAIL == conn->net->m.receive((conn), (buf), (packet)->header.size TSRMLS_CC)) { \ CONN_SET_STATE(conn, CONN_QUIT_SENT); \ SET_CLIENT_ERROR(conn->error_info, CR_SERVER_GONE_ERROR, UNKNOWN_SQLSTATE, mysqlnd_server_gone);\ php_error_docref(NULL TSRMLS_CC, E_WARNING, "%s", mysqlnd_server_gone); \ @@ -205,72 +201,6 @@ zend_uchar *php_mysqlnd_net_store_length(zend_uchar *packet, uint64_t length) /* }}} */ -#ifdef MYSQLND_COMPRESSION_ENABLED -/* {{{ php_mysqlnd_read_buffer_is_empty */ -static zend_bool -php_mysqlnd_read_buffer_is_empty(MYSQLND_READ_BUFFER * buffer) -{ - return buffer->len? FALSE:TRUE; -} -/* }}} */ - - -/* {{{ php_mysqlnd_read_buffer_read */ -static void -php_mysqlnd_read_buffer_read(MYSQLND_READ_BUFFER * buffer, size_t count, zend_uchar * dest) -{ - if (buffer->len >= count) { - memcpy(dest, buffer->data + buffer->offset, count); - buffer->offset += count; - buffer->len -= count; - } -} -/* }}} */ - - -/* {{{ php_mysqlnd_read_buffer_bytes_left */ -static size_t -php_mysqlnd_read_buffer_bytes_left(MYSQLND_READ_BUFFER * buffer) -{ - return buffer->len; -} -/* }}} */ - - -/* {{{ php_mysqlnd_read_buffer_free */ -static void -php_mysqlnd_read_buffer_free(MYSQLND_READ_BUFFER ** buffer TSRMLS_DC) -{ - DBG_ENTER("php_mysqlnd_read_buffer_free"); - if (*buffer) { - mnd_efree((*buffer)->data); - mnd_efree(*buffer); - *buffer = NULL; - } - DBG_VOID_RETURN; -} -/* }}} */ - - -/* {{{ php_mysqlnd_create_read_buffer */ -static MYSQLND_READ_BUFFER * -mysqlnd_create_read_buffer(size_t count TSRMLS_DC) -{ - MYSQLND_READ_BUFFER * ret = mnd_emalloc(sizeof(MYSQLND_READ_BUFFER)); - DBG_ENTER("mysqlnd_create_read_buffer"); - ret->is_empty = php_mysqlnd_read_buffer_is_empty; - ret->read = php_mysqlnd_read_buffer_read; - ret->bytes_left = php_mysqlnd_read_buffer_bytes_left; - ret->free_buffer = php_mysqlnd_read_buffer_free; - ret->data = mnd_emalloc(count); - ret->size = ret->len = count; - ret->offset = 0; - DBG_RETURN(ret); -} -/* }}} */ -#endif - - /* {{{ php_mysqlnd_consume_uneaten_data */ #ifdef MYSQLND_DO_WIRE_CHECK_BEFORE_COMMAND size_t php_mysqlnd_consume_uneaten_data(MYSQLND * const conn, enum php_mysqlnd_server_command cmd TSRMLS_DC) @@ -353,268 +283,6 @@ php_mysqlnd_read_error_from_line(zend_uchar *buf, size_t buf_len, /* }}} */ -/* We assume that MYSQLND_HEADER_SIZE is 4 bytes !! */ -#define STORE_HEADER_SIZE(safe_storage, buffer) int4store((safe_storage), (*(uint32_t *)(buffer))) -#define RESTORE_HEADER_SIZE(buffer, safe_storage) STORE_HEADER_SIZE((safe_storage), (buffer)) - -/* {{{ mysqlnd_stream_write_w_header */ -/* - IMPORTANT : It's expected that buf has place in the beginning for MYSQLND_HEADER_SIZE !!!! - This is done for performance reasons in the caller of this function. - Otherwise we will have to do send two TCP packets, or do new alloc and memcpy. - Neither are quick, thus the clients of this function are obligated to do - what they are asked for. - - `count` is actually the length of the payload data. Thus : - count + MYSQLND_HEADER_SIZE = sizeof(buf) (not the pointer but the actual buffer) -*/ -size_t mysqlnd_stream_write_w_header(MYSQLND * const conn, char * const buf, size_t count TSRMLS_DC) -{ - zend_uchar safe_buf[((MYSQLND_HEADER_SIZE) + (sizeof(zend_uchar)) - 1) / (sizeof(zend_uchar))]; - zend_uchar *safe_storage = safe_buf; - MYSQLND_NET *net = conn->net; - size_t old_chunk_size = net->stream->chunk_size; - size_t ret, packets_sent = 1; - size_t left = count; - zend_uchar *p = (zend_uchar *) buf; - zend_uchar * compress_buf = NULL; - size_t to_be_sent; - - DBG_ENTER("mysqlnd_stream_write_w_header"); - DBG_INF_FMT("conn=%llu count=%lu compression=%d", conn->thread_id, count, net->compressed); - - net->stream->chunk_size = MYSQLND_MAX_PACKET_SIZE; - - if (net->compressed == TRUE) { - size_t comp_buf_size = MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE + MIN(left, MYSQLND_MAX_PACKET_SIZE); - DBG_INF_FMT("compress_buf_size=%d", comp_buf_size); - compress_buf = emalloc(comp_buf_size); - } - - do { - to_be_sent = MIN(left, MYSQLND_MAX_PACKET_SIZE); -#ifdef MYSQLND_COMPRESSION_ENABLED - if (net->compressed == TRUE) { - /* here we need to compress the data and then write it, first comes the compressed header */ - uLong tmp_complen = to_be_sent; - size_t payload_size; - zend_uchar * uncompressed_payload = p; /* should include the header */ - int res; - - STORE_HEADER_SIZE(safe_storage, uncompressed_payload); - int3store(uncompressed_payload, to_be_sent); - int1store(uncompressed_payload + 3, net->packet_no); - - DBG_INF_FMT("compress(%p, %p, %p, %d)", (compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, p, to_be_sent + MYSQLND_HEADER_SIZE); - res = compress((compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); - if (res == Z_OK) { - DBG_INF_FMT("compression successful. compressed size=%d", tmp_complen); - int3store(compress_buf + MYSQLND_HEADER_SIZE, to_be_sent + MYSQLND_HEADER_SIZE); - payload_size = tmp_complen; - } else { - DBG_INF_FMT("compression NOT successful. error=%d Z_OK=%d Z_BUF_ERROR=%d Z_MEM_ERROR=%d", res, Z_OK, Z_BUF_ERROR, Z_MEM_ERROR); - int3store(compress_buf + MYSQLND_HEADER_SIZE, 0); - memcpy(compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); - payload_size = to_be_sent + MYSQLND_HEADER_SIZE; - } - RESTORE_HEADER_SIZE(uncompressed_payload, safe_storage); - - int3store(compress_buf, payload_size); - int1store(compress_buf + 3, net->packet_no); - DBG_INF_FMT("writing %d bytes to the network", payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE); - ret = conn->net->m.stream_write(conn, compress_buf, payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE TSRMLS_CC); - net->compressed_envelope_packet_no++; - #if WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY - if (res == Z_OK) { - size_t decompressed_size = left + MYSQLND_HEADER_SIZE; - uLongf tmp_complen2 = decompressed_size; - zend_uchar * decompressed_data = malloc(decompressed_size); - int error = uncompress(decompressed_data, &tmp_complen2, compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, payload_size); - if (error == Z_OK) { - int i; - DBG_INF("success decompressing"); - for (i = 0 ; i < decompressed_size; i++) { - if (i && (i % 30 == 0)) { - printf("\n\t\t"); - } - printf("%.2X ", (int)*((char*)&(decompressed_data[i]))); - DBG_INF_FMT("%.2X ", (int)*((char*)&(decompressed_data[i]))); - } - } else { - DBG_INF("error decompressing"); - } - free(decompressed_data); - } - #endif /* WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY */ - } else -#endif /* MYSQLND_COMPRESSION_ENABLED */ - { - DBG_INF("no compression"); - STORE_HEADER_SIZE(safe_storage, p); - int3store(p, to_be_sent); - int1store(p + 3, net->packet_no); - ret = conn->net->m.stream_write(conn, p, to_be_sent + MYSQLND_HEADER_SIZE TSRMLS_CC); - RESTORE_HEADER_SIZE(p, safe_storage); - net->compressed_envelope_packet_no++; - } - net->packet_no++; - - p += to_be_sent; - left -= to_be_sent; - packets_sent++; - /* - if left is 0 then there is nothing more to send, but if the last packet was exactly - with the size MYSQLND_MAX_PACKET_SIZE we need to send additional packet, which has - empty payload. Thus if left == 0 we check for to_be_sent being the max size. If it is - indeed it then loop once more, then to_be_sent will become 0, left will stay 0. Empty - packet will be sent and this loop will end. - */ - } while (ret && (left > 0 || to_be_sent == MYSQLND_MAX_PACKET_SIZE)); - - DBG_INF_FMT("packet_size=%d packet_no=%d", left, net->packet_no); - /* Even for zero size payload we have to send a packet */ - if (!ret) { - DBG_ERR_FMT("Can't %u send bytes", count); - conn->state = CONN_QUIT_SENT; - SET_CLIENT_ERROR(conn->error_info, CR_SERVER_GONE_ERROR, UNKNOWN_SQLSTATE, mysqlnd_server_gone); - } - - MYSQLND_INC_CONN_STATISTIC_W_VALUE3(&conn->stats, - STAT_BYTES_SENT, count + packets_sent * MYSQLND_HEADER_SIZE, - STAT_PROTOCOL_OVERHEAD_OUT, packets_sent * MYSQLND_HEADER_SIZE, - STAT_PACKETS_SENT, packets_sent); - - net->stream->chunk_size = old_chunk_size; - if (compress_buf) { - efree(compress_buf); - } - DBG_RETURN(ret); -} -/* }}} */ - - -#ifdef MYSQLND_COMPRESSION_ENABLED -/* {{{ mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer */ -static enum_func_status -mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(MYSQLND * conn, size_t net_payload_size TSRMLS_DC) -{ - MYSQLND_NET * net = conn->net; - size_t decompressed_size; - enum_func_status ret = PASS; - zend_uchar * compressed_data = NULL; - zend_uchar comp_header[COMPRESSED_HEADER_SIZE]; - DBG_ENTER("mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer"); - - /* Read the compressed header */ - if (FAIL == conn->net->m.stream_read(conn, comp_header, COMPRESSED_HEADER_SIZE TSRMLS_CC)) { - DBG_RETURN(FAIL); - } - decompressed_size = uint3korr(comp_header); - - /* When decompressed_size is 0, then the data is not compressed, and we have wasted 3 bytes */ - /* we need to decompress the data */ - - if (decompressed_size) { - int error; - uLongf tmp_complen = decompressed_size; - compressed_data = emalloc(net_payload_size); - if (FAIL == conn->net->m.stream_read(conn, compressed_data, net_payload_size TSRMLS_CC)) { - ret = FAIL; - goto end; - } - - net->uncompressed_data = mysqlnd_create_read_buffer(decompressed_size TSRMLS_CC); - error = uncompress(net->uncompressed_data->data, &tmp_complen, compressed_data, net_payload_size); - - DBG_INF_FMT("compressed data: decomp_len=%d compressed_size=%d", decompressed_size, net_payload_size); - if (error != Z_OK) { - DBG_ERR_FMT("Can't uncompress packet, error: %d", error); - ret = FAIL; - goto end; - } - } else { - DBG_INF_FMT("The server decided not to compress the data. Our job is easy. Copying %u bytes", net_payload_size); - net->uncompressed_data = mysqlnd_create_read_buffer(net_payload_size TSRMLS_CC); - if (FAIL == conn->net->m.stream_read(conn, net->uncompressed_data->data, net_payload_size TSRMLS_CC)) { - ret = FAIL; - goto end; - } - } -end: - if (compressed_data) { - efree(compressed_data); - } - DBG_RETURN(ret); -} -#endif /* MYSQLND_COMPRESSION_ENABLED */ - - -/* {{{ mysqlnd_real_read */ -static enum_func_status -mysqlnd_real_read(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) -{ - size_t to_read = count; - zend_uchar * p = buffer; - MYSQLND_NET * net = conn->net; - - DBG_ENTER("mysqlnd_real_read"); -#ifdef MYSQLND_COMPRESSION_ENABLED - if (net->compressed) { - if (net->uncompressed_data) { - size_t to_read_from_buffer = MIN(net->uncompressed_data->bytes_left(net->uncompressed_data), to_read); - DBG_INF_FMT("reading %u from uncompressed_data buffer", to_read_from_buffer); - if (to_read_from_buffer) { - net->uncompressed_data->read(net->uncompressed_data, to_read_from_buffer, (zend_uchar *) p); - p += to_read_from_buffer; - to_read -= to_read_from_buffer; - } - DBG_INF_FMT("left %u to read", to_read); - if (TRUE == net->uncompressed_data->is_empty(net->uncompressed_data)) { - /* Everything was consumed. This should never happen here, but for security */ - net->uncompressed_data->free_buffer(&net->uncompressed_data TSRMLS_CC); - } - } - if (to_read) { - zend_uchar net_header[MYSQLND_HEADER_SIZE]; - size_t net_payload_size; - zend_uchar packet_no; - - if (FAIL == net->m.stream_read(conn, net_header, MYSQLND_HEADER_SIZE TSRMLS_CC)) { - DBG_RETURN(FAIL); - } - net_payload_size = uint3korr(net_header); - packet_no = uint1korr(net_header + 3); - if (net->compressed_envelope_packet_no != packet_no) { - DBG_ERR_FMT("Transport level: packets out of order. Expected %d received %d. Packet size=%d", - net->compressed_envelope_packet_no, packet_no, net_payload_size); - - php_error(E_WARNING, "Packets out of order. Expected %d received %d. Packet size="MYSQLND_SZ_T_SPEC, - net->compressed_envelope_packet_no, packet_no, net_payload_size); - DBG_RETURN(FAIL); - } - net->compressed_envelope_packet_no++; -#ifdef MYSQLND_DUMP_HEADER_N_BODY - DBG_INF_FMT("HEADER: hwd_packet_no=%d size=%3d", packet_no, net_payload_size); -#endif - /* Now let's read from the wire, decompress it and fill the read buffer */ - mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(conn, net_payload_size TSRMLS_CC); - - /* - Now a bit of recursion - read from the read buffer, - if the data which we have just read from the wire - is not enough, then the recursive call will try to - satisfy it until it is satisfied. - */ - DBG_RETURN(mysqlnd_real_read(conn, p, to_read TSRMLS_CC)); - } - DBG_RETURN(PASS); - } -#endif /* MYSQLND_COMPRESSION_ENABLED */ - DBG_RETURN(net->m.stream_read(conn, p, to_read TSRMLS_CC)); -} -/* }}} */ - - /* {{{ mysqlnd_read_header */ static enum_func_status mysqlnd_read_header(MYSQLND * conn, mysqlnd_packet_header * header TSRMLS_DC) @@ -624,7 +292,7 @@ mysqlnd_read_header(MYSQLND * conn, mysqlnd_packet_header * header TSRMLS_DC) DBG_ENTER("mysqlnd_read_header_name"); DBG_INF_FMT("compressed=%d conn_id=%u", net->compressed, conn->thread_id); - if (FAIL == mysqlnd_real_read(conn, buffer, MYSQLND_HEADER_SIZE TSRMLS_CC)) { + if (FAIL == net->m.receive(conn, buffer, MYSQLND_HEADER_SIZE TSRMLS_CC)) { DBG_RETURN(FAIL); } @@ -658,16 +326,6 @@ mysqlnd_read_header(MYSQLND * conn, mysqlnd_packet_header * header TSRMLS_DC) /* }}} */ -/* {{{ mysqlnd_read_body */ -static enum_func_status -mysqlnd_read_body(MYSQLND *conn, mysqlnd_packet_header * header, zend_uchar * store_buf TSRMLS_DC) -{ - DBG_ENTER(mysqlnd_read_body_name); - DBG_RETURN(mysqlnd_real_read(conn, store_buf, header->size TSRMLS_CC)); -} -/* }}} */ - - /* {{{ php_mysqlnd_greet_read */ static enum_func_status php_mysqlnd_greet_read(void *_packet, MYSQLND *conn TSRMLS_DC) @@ -872,7 +530,7 @@ size_t php_mysqlnd_auth_write(void *_packet, MYSQLND *conn TSRMLS_DC) /* Handle CLIENT_CONNECT_WITH_DB */ /* no \0 for no DB */ - DBG_RETURN(mysqlnd_stream_write_w_header(conn, buffer, p - buffer - MYSQLND_HEADER_SIZE TSRMLS_CC)); + DBG_RETURN(conn->net->m.send(conn, buffer, p - buffer - MYSQLND_HEADER_SIZE TSRMLS_CC)); } /* }}} */ @@ -1069,7 +727,7 @@ size_t php_mysqlnd_cmd_write(void *_packet, MYSQLND *conn TSRMLS_DC) char buffer[MYSQLND_HEADER_SIZE + 1]; int1store(buffer + MYSQLND_HEADER_SIZE, packet->command); - written = mysqlnd_stream_write_w_header(conn, buffer, 1 TSRMLS_CC); + written = conn->net->m.send(conn, buffer, 1 TSRMLS_CC); } else { size_t tmp_len = packet->arg_len + 1 + MYSQLND_HEADER_SIZE, ret; zend_uchar *tmp, *p; @@ -1081,7 +739,7 @@ size_t php_mysqlnd_cmd_write(void *_packet, MYSQLND *conn TSRMLS_DC) memcpy(p, packet->argument, packet->arg_len); - ret = mysqlnd_stream_write_w_header(conn, (char *)tmp, tmp_len - MYSQLND_HEADER_SIZE TSRMLS_CC); + ret = conn->net->m.send(conn, (char *)tmp, tmp_len - MYSQLND_HEADER_SIZE TSRMLS_CC); if (tmp != net->cmd_buffer.buffer) { MYSQLND_INC_CONN_STATISTIC(&conn->stats, STAT_CMD_BUFFER_TOO_SMALL); mnd_efree(tmp); @@ -1404,6 +1062,7 @@ void php_mysqlnd_rset_field_free_mem(void *_packet, zend_bool alloca TSRMLS_DC) /* }}} */ +/* {{{ php_mysqlnd_read_row_ex */ static enum_func_status php_mysqlnd_read_row_ex(MYSQLND *conn, MYSQLND_MEMORY_POOL * result_set_memory_pool, MYSQLND_MEMORY_POOL_CHUNK **buffer, @@ -1459,7 +1118,7 @@ php_mysqlnd_read_row_ex(MYSQLND *conn, MYSQLND_MEMORY_POOL * result_set_memory_p p = (*buffer)->ptr + (*data_size - header.size); } - if ((ret = mysqlnd_read_body(conn, &header, p TSRMLS_CC))) { + if ((ret = conn->net->m.receive(conn, p, header.size TSRMLS_CC))) { DBG_ERR("Empty row packet body"); php_error(E_WARNING, "Empty row packet body"); break; @@ -1476,6 +1135,7 @@ php_mysqlnd_read_row_ex(MYSQLND *conn, MYSQLND_MEMORY_POOL * result_set_memory_p *data_size -= prealloc_more_bytes; DBG_RETURN(ret); } +/* }}} */ /* {{{ php_mysqlnd_rowp_read_binary_protocol */ @@ -1915,7 +1575,6 @@ void php_mysqlnd_rowp_free_mem(void *_packet, zend_bool alloca TSRMLS_DC) /* }}} */ - /* {{{ php_mysqlnd_stats_read */ static enum_func_status php_mysqlnd_stats_read(void *_packet, MYSQLND *conn TSRMLS_DC) |