diff options
Diffstat (limited to 'ext/mysqlnd/mysqlnd_net.c')
-rw-r--r-- | ext/mysqlnd/mysqlnd_net.c | 358 |
1 files changed, 349 insertions, 9 deletions
diff --git a/ext/mysqlnd/mysqlnd_net.c b/ext/mysqlnd/mysqlnd_net.c index e890f1c614..64768cf4f8 100644 --- a/ext/mysqlnd/mysqlnd_net.c +++ b/ext/mysqlnd/mysqlnd_net.c @@ -28,6 +28,9 @@ #include "ext/standard/sha1.h" #include "php_network.h" #include "zend_ini.h" +#ifdef MYSQLND_COMPRESSION_ENABLED +#include <zlib.h> +#endif #ifndef PHP_WIN32 #include <netinet/tcp.h> @@ -58,13 +61,13 @@ mysqlnd_set_sock_no_delay(php_stream * stream) /* }}} */ -/* {{{ mysqlnd_net::read_from_stream */ +/* {{{ mysqlnd_net::network_read */ static enum_func_status -MYSQLND_METHOD(mysqlnd_net, read_from_stream)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) +MYSQLND_METHOD(mysqlnd_net, network_read)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) { size_t to_read = count, ret; size_t old_chunk_size = conn->net->stream->chunk_size; - DBG_ENTER("mysqlnd_net::read_from_stream"); + DBG_ENTER("mysqlnd_net::network_read"); DBG_INF_FMT("count=%u", count); conn->net->stream->chunk_size = MIN(to_read, conn->net->options.net_read_buffer_size); while (to_read) { @@ -82,12 +85,12 @@ MYSQLND_METHOD(mysqlnd_net, read_from_stream)(MYSQLND * conn, zend_uchar * buffe /* }}} */ -/* {{{ mysqlnd_net::stream_write */ +/* {{{ mysqlnd_net::network_write */ static size_t -MYSQLND_METHOD(mysqlnd_net, stream_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC) +MYSQLND_METHOD(mysqlnd_net, network_write)(MYSQLND * const conn, const zend_uchar * const buf, size_t count TSRMLS_DC) { size_t ret; - DBG_ENTER("mysqlnd_net::stream_write"); + DBG_ENTER("mysqlnd_net::network_write"); ret = php_stream_write(conn->net->stream, (char *)buf, count); DBG_RETURN(ret); } @@ -95,7 +98,6 @@ MYSQLND_METHOD(mysqlnd_net, stream_write)(MYSQLND * const conn, const zend_uchar - /* {{{ mysqlnd_net::connect */ static enum_func_status MYSQLND_METHOD(mysqlnd_net, connect)(MYSQLND_NET * net, const char * const scheme, size_t scheme_len, zend_bool persistent, char **errstr, int * errcode TSRMLS_DC) @@ -185,6 +187,333 @@ MYSQLND_METHOD(mysqlnd_net, connect)(MYSQLND_NET * net, const char * const schem /* }}} */ +/* We assume that MYSQLND_HEADER_SIZE is 4 bytes !! */ +#define STORE_HEADER_SIZE(safe_storage, buffer) int4store((safe_storage), (*(uint32_t *)(buffer))) +#define RESTORE_HEADER_SIZE(buffer, safe_storage) STORE_HEADER_SIZE((safe_storage), (buffer)) + +/* {{{ mysqlnd_net::send */ +/* + IMPORTANT : It's expected that buf has place in the beginning for MYSQLND_HEADER_SIZE !!!! + This is done for performance reasons in the caller of this function. + Otherwise we will have to do send two TCP packets, or do new alloc and memcpy. + Neither are quick, thus the clients of this function are obligated to do + what they are asked for. + + `count` is actually the length of the payload data. Thus : + count + MYSQLND_HEADER_SIZE = sizeof(buf) (not the pointer but the actual buffer) +*/ +size_t +MYSQLND_METHOD(mysqlnd_net, send)(MYSQLND * const conn, char * const buf, size_t count TSRMLS_DC) +{ + zend_uchar safe_buf[((MYSQLND_HEADER_SIZE) + (sizeof(zend_uchar)) - 1) / (sizeof(zend_uchar))]; + zend_uchar *safe_storage = safe_buf; + MYSQLND_NET *net = conn->net; + size_t old_chunk_size = net->stream->chunk_size; + size_t ret, packets_sent = 1; + size_t left = count; + zend_uchar *p = (zend_uchar *) buf; + zend_uchar * compress_buf = NULL; + size_t to_be_sent; + + DBG_ENTER("mysqlnd_net::send"); + DBG_INF_FMT("conn=%llu count=%lu compression=%d", conn->thread_id, count, net->compressed); + + net->stream->chunk_size = MYSQLND_MAX_PACKET_SIZE; + + if (net->compressed == TRUE) { + size_t comp_buf_size = MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE + MIN(left, MYSQLND_MAX_PACKET_SIZE); + DBG_INF_FMT("compress_buf_size=%d", comp_buf_size); + compress_buf = emalloc(comp_buf_size); + } + + do { + to_be_sent = MIN(left, MYSQLND_MAX_PACKET_SIZE); +#ifdef MYSQLND_COMPRESSION_ENABLED + if (net->compressed == TRUE) { + /* here we need to compress the data and then write it, first comes the compressed header */ + uLong tmp_complen = to_be_sent; + size_t payload_size; + zend_uchar * uncompressed_payload = p; /* should include the header */ + int res; + + STORE_HEADER_SIZE(safe_storage, uncompressed_payload); + int3store(uncompressed_payload, to_be_sent); + int1store(uncompressed_payload + 3, net->packet_no); + + DBG_INF_FMT("compress(%p, %p, %p, %d)", (compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, p, to_be_sent + MYSQLND_HEADER_SIZE); + res = compress((compress_buf + COMPRESSED_HEADER_SIZE + MYSQLND_HEADER_SIZE), &tmp_complen, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); + if (res == Z_OK) { + DBG_INF_FMT("compression successful. compressed size=%d", tmp_complen); + int3store(compress_buf + MYSQLND_HEADER_SIZE, to_be_sent + MYSQLND_HEADER_SIZE); + payload_size = tmp_complen; + } else { + DBG_INF_FMT("compression NOT successful. error=%d Z_OK=%d Z_BUF_ERROR=%d Z_MEM_ERROR=%d", res, Z_OK, Z_BUF_ERROR, Z_MEM_ERROR); + int3store(compress_buf + MYSQLND_HEADER_SIZE, 0); + memcpy(compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, uncompressed_payload, to_be_sent + MYSQLND_HEADER_SIZE); + payload_size = to_be_sent + MYSQLND_HEADER_SIZE; + } + RESTORE_HEADER_SIZE(uncompressed_payload, safe_storage); + + int3store(compress_buf, payload_size); + int1store(compress_buf + 3, net->packet_no); + DBG_INF_FMT("writing %d bytes to the network", payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE); + ret = conn->net->m.network_write(conn, compress_buf, payload_size + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE TSRMLS_CC); + net->compressed_envelope_packet_no++; + #if WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY + if (res == Z_OK) { + size_t decompressed_size = left + MYSQLND_HEADER_SIZE; + uLongf tmp_complen2 = decompressed_size; + zend_uchar * decompressed_data = malloc(decompressed_size); + int error = uncompress(decompressed_data, &tmp_complen2, compress_buf + MYSQLND_HEADER_SIZE + COMPRESSED_HEADER_SIZE, payload_size); + if (error == Z_OK) { + int i; + DBG_INF("success decompressing"); + for (i = 0 ; i < decompressed_size; i++) { + if (i && (i % 30 == 0)) { + printf("\n\t\t"); + } + printf("%.2X ", (int)*((char*)&(decompressed_data[i]))); + DBG_INF_FMT("%.2X ", (int)*((char*)&(decompressed_data[i]))); + } + } else { + DBG_INF("error decompressing"); + } + free(decompressed_data); + } + #endif /* WHEN_WE_NEED_TO_CHECK_WHETHER_COMPRESSION_WORKS_CORRECTLY */ + } else +#endif /* MYSQLND_COMPRESSION_ENABLED */ + { + DBG_INF("no compression"); + STORE_HEADER_SIZE(safe_storage, p); + int3store(p, to_be_sent); + int1store(p + 3, net->packet_no); + ret = conn->net->m.network_write(conn, p, to_be_sent + MYSQLND_HEADER_SIZE TSRMLS_CC); + RESTORE_HEADER_SIZE(p, safe_storage); + net->compressed_envelope_packet_no++; + } + net->packet_no++; + + p += to_be_sent; + left -= to_be_sent; + packets_sent++; + /* + if left is 0 then there is nothing more to send, but if the last packet was exactly + with the size MYSQLND_MAX_PACKET_SIZE we need to send additional packet, which has + empty payload. Thus if left == 0 we check for to_be_sent being the max size. If it is + indeed it then loop once more, then to_be_sent will become 0, left will stay 0. Empty + packet will be sent and this loop will end. + */ + } while (ret && (left > 0 || to_be_sent == MYSQLND_MAX_PACKET_SIZE)); + + DBG_INF_FMT("packet_size=%d packet_no=%d", left, net->packet_no); + /* Even for zero size payload we have to send a packet */ + if (!ret) { + DBG_ERR_FMT("Can't %u send bytes", count); + conn->state = CONN_QUIT_SENT; + SET_CLIENT_ERROR(conn->error_info, CR_SERVER_GONE_ERROR, UNKNOWN_SQLSTATE, mysqlnd_server_gone); + } + + MYSQLND_INC_CONN_STATISTIC_W_VALUE3(&conn->stats, + STAT_BYTES_SENT, count + packets_sent * MYSQLND_HEADER_SIZE, + STAT_PROTOCOL_OVERHEAD_OUT, packets_sent * MYSQLND_HEADER_SIZE, + STAT_PACKETS_SENT, packets_sent); + + net->stream->chunk_size = old_chunk_size; + if (compress_buf) { + efree(compress_buf); + } + DBG_RETURN(ret); +} +/* }}} */ + + +#ifdef MYSQLND_COMPRESSION_ENABLED +/* {{{ php_mysqlnd_read_buffer_is_empty */ +static zend_bool +php_mysqlnd_read_buffer_is_empty(MYSQLND_READ_BUFFER * buffer) +{ + return buffer->len? FALSE:TRUE; +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_read */ +static void +php_mysqlnd_read_buffer_read(MYSQLND_READ_BUFFER * buffer, size_t count, zend_uchar * dest) +{ + if (buffer->len >= count) { + memcpy(dest, buffer->data + buffer->offset, count); + buffer->offset += count; + buffer->len -= count; + } +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_bytes_left */ +static size_t +php_mysqlnd_read_buffer_bytes_left(MYSQLND_READ_BUFFER * buffer) +{ + return buffer->len; +} +/* }}} */ + + +/* {{{ php_mysqlnd_read_buffer_free */ +static void +php_mysqlnd_read_buffer_free(MYSQLND_READ_BUFFER ** buffer TSRMLS_DC) +{ + DBG_ENTER("php_mysqlnd_read_buffer_free"); + if (*buffer) { + mnd_efree((*buffer)->data); + mnd_efree(*buffer); + *buffer = NULL; + } + DBG_VOID_RETURN; +} +/* }}} */ + + +/* {{{ php_mysqlnd_create_read_buffer */ +static MYSQLND_READ_BUFFER * +mysqlnd_create_read_buffer(size_t count TSRMLS_DC) +{ + MYSQLND_READ_BUFFER * ret = mnd_emalloc(sizeof(MYSQLND_READ_BUFFER)); + DBG_ENTER("mysqlnd_create_read_buffer"); + ret->is_empty = php_mysqlnd_read_buffer_is_empty; + ret->read = php_mysqlnd_read_buffer_read; + ret->bytes_left = php_mysqlnd_read_buffer_bytes_left; + ret->free_buffer = php_mysqlnd_read_buffer_free; + ret->data = mnd_emalloc(count); + ret->size = ret->len = count; + ret->offset = 0; + DBG_RETURN(ret); +} +/* }}} */ + + +/* {{{ mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer */ +static enum_func_status +mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(MYSQLND * conn, size_t net_payload_size TSRMLS_DC) +{ + MYSQLND_NET * net = conn->net; + size_t decompressed_size; + enum_func_status ret = PASS; + zend_uchar * compressed_data = NULL; + zend_uchar comp_header[COMPRESSED_HEADER_SIZE]; + DBG_ENTER("mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer"); + + /* Read the compressed header */ + if (FAIL == conn->net->m.network_read(conn, comp_header, COMPRESSED_HEADER_SIZE TSRMLS_CC)) { + DBG_RETURN(FAIL); + } + decompressed_size = uint3korr(comp_header); + + /* When decompressed_size is 0, then the data is not compressed, and we have wasted 3 bytes */ + /* we need to decompress the data */ + + if (decompressed_size) { + int error; + uLongf tmp_complen = decompressed_size; + compressed_data = emalloc(net_payload_size); + if (FAIL == conn->net->m.network_read(conn, compressed_data, net_payload_size TSRMLS_CC)) { + ret = FAIL; + goto end; + } + + net->uncompressed_data = mysqlnd_create_read_buffer(decompressed_size TSRMLS_CC); + error = uncompress(net->uncompressed_data->data, &tmp_complen, compressed_data, net_payload_size); + + DBG_INF_FMT("compressed data: decomp_len=%d compressed_size=%d", decompressed_size, net_payload_size); + if (error != Z_OK) { + DBG_ERR_FMT("Can't uncompress packet, error: %d", error); + ret = FAIL; + goto end; + } + } else { + DBG_INF_FMT("The server decided not to compress the data. Our job is easy. Copying %u bytes", net_payload_size); + net->uncompressed_data = mysqlnd_create_read_buffer(net_payload_size TSRMLS_CC); + if (FAIL == conn->net->m.network_read(conn, net->uncompressed_data->data, net_payload_size TSRMLS_CC)) { + ret = FAIL; + goto end; + } + } +end: + if (compressed_data) { + efree(compressed_data); + } + DBG_RETURN(ret); +} +#endif /* MYSQLND_COMPRESSION_ENABLED */ + + +/* {{{ mysqlnd_net::receive */ +static enum_func_status +MYSQLND_METHOD(mysqlnd_net, receive)(MYSQLND * conn, zend_uchar * buffer, size_t count TSRMLS_DC) +{ + size_t to_read = count; + zend_uchar * p = buffer; + MYSQLND_NET * net = conn->net; + + DBG_ENTER("mysqlnd_net::receive"); +#ifdef MYSQLND_COMPRESSION_ENABLED + if (net->compressed) { + if (net->uncompressed_data) { + size_t to_read_from_buffer = MIN(net->uncompressed_data->bytes_left(net->uncompressed_data), to_read); + DBG_INF_FMT("reading %u from uncompressed_data buffer", to_read_from_buffer); + if (to_read_from_buffer) { + net->uncompressed_data->read(net->uncompressed_data, to_read_from_buffer, (zend_uchar *) p); + p += to_read_from_buffer; + to_read -= to_read_from_buffer; + } + DBG_INF_FMT("left %u to read", to_read); + if (TRUE == net->uncompressed_data->is_empty(net->uncompressed_data)) { + /* Everything was consumed. This should never happen here, but for security */ + net->uncompressed_data->free_buffer(&net->uncompressed_data TSRMLS_CC); + } + } + if (to_read) { + zend_uchar net_header[MYSQLND_HEADER_SIZE]; + size_t net_payload_size; + zend_uchar packet_no; + + if (FAIL == net->m.network_read(conn, net_header, MYSQLND_HEADER_SIZE TSRMLS_CC)) { + DBG_RETURN(FAIL); + } + net_payload_size = uint3korr(net_header); + packet_no = uint1korr(net_header + 3); + if (net->compressed_envelope_packet_no != packet_no) { + DBG_ERR_FMT("Transport level: packets out of order. Expected %d received %d. Packet size=%d", + net->compressed_envelope_packet_no, packet_no, net_payload_size); + + php_error(E_WARNING, "Packets out of order. Expected %d received %d. Packet size="MYSQLND_SZ_T_SPEC, + net->compressed_envelope_packet_no, packet_no, net_payload_size); + DBG_RETURN(FAIL); + } + net->compressed_envelope_packet_no++; +#ifdef MYSQLND_DUMP_HEADER_N_BODY + DBG_INF_FMT("HEADER: hwd_packet_no=%d size=%3d", packet_no, net_payload_size); +#endif + /* Now let's read from the wire, decompress it and fill the read buffer */ + mysqlnd_read_compressed_packet_from_stream_and_fill_read_buffer(conn, net_payload_size TSRMLS_CC); + + /* + Now a bit of recursion - read from the read buffer, + if the data which we have just read from the wire + is not enough, then the recursive call will try to + satisfy it until it is satisfied. + */ + DBG_RETURN(net->m.receive(conn, p, to_read TSRMLS_CC)); + } + DBG_RETURN(PASS); + } +#endif /* MYSQLND_COMPRESSION_ENABLED */ + DBG_RETURN(net->m.network_read(conn, p, to_read TSRMLS_CC)); +} +/* }}} */ + + /* {{{ mysqlnd_net::set_client_option */ static enum_func_status MYSQLND_METHOD(mysqlnd_net, set_client_option)(MYSQLND_NET * const net, enum mysqlnd_option option, const char * const value TSRMLS_DC) @@ -264,9 +593,11 @@ mysqlnd_net_init(zend_bool persistent TSRMLS_DC) net->persistent = persistent; net->m.connect = MYSQLND_METHOD(mysqlnd_net, connect); - net->m.stream_read = MYSQLND_METHOD(mysqlnd_net, read_from_stream); - net->m.stream_write = MYSQLND_METHOD(mysqlnd_net, stream_write); + net->m.send = MYSQLND_METHOD(mysqlnd_net, send); + net->m.receive = MYSQLND_METHOD(mysqlnd_net, receive); net->m.set_client_option = MYSQLND_METHOD(mysqlnd_net, set_client_option); + net->m.network_read = MYSQLND_METHOD(mysqlnd_net, network_read); + net->m.network_write = MYSQLND_METHOD(mysqlnd_net, network_write); net->m.free_contents = MYSQLND_METHOD(mysqlnd_net, free_contents); { @@ -309,3 +640,12 @@ mysqlnd_net_free(MYSQLND_NET * net TSRMLS_DC) } /* }}} */ + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: noet sw=4 ts=4 fdm=marker + * vim<600: noet sw=4 ts=4 + */ |