diff options
author | Daniel Lowrey <rdlowrey@php.net> | 2014-03-13 12:55:33 -0600 |
---|---|---|
committer | Daniel Lowrey <rdlowrey@php.net> | 2014-03-17 06:31:15 -0600 |
commit | 2ee4c987e68078d6d767c58cb0065bc34ff39ead (patch) | |
tree | 9ad1a2d362b1650419ff1acb1fa3444c858c4ff3 | |
parent | 8cde7473362ab6bb4efa1678221dd69281e6d9c5 (diff) | |
download | php-git-2ee4c987e68078d6d767c58cb0065bc34ff39ead.tar.gz |
Support async pgsql connections and non-blocking queries
- New functions (each accepts a pgsql $connection resource):
. pg_connect_poll
. pg_socket
. pg_consume_input
. pg_flush
- Modified functions
The following functions now additionally return zero if the
underlying socket is set to non-blocking mode and the send
operation does not complete immediately. Previously these
functions returned only boolean TRUE/FALSE and blocked
execution while polling until all data was sent:
. pg_send_execute
. pg_send_prepare
. pg_send_query
. pg_send_query_params
- New constants
Used with pg_connect() to initiate an asynchronous connection
attempt:
. PGSQL_CONNECT_ASYNC
Used with pg_connection_status() to determine the current state
of an async connection attempt:
. PGSQL_CONNECTION_STARTED
. PGSQL_CONNECTION_MADE
. PGSQL_CONNECTION_AWAITING_RESPONSE
. PGSQL_CONNECTION_AUTH_OK
. PGSQL_CONNECTION_SSL_STARTUP
. PGSQL_CONNECTION_SETENV
Used with pg_connect_poll() to determine the result of an
async connection attempt:
. PGSQL_POLLING_FAILED
. PGSQL_POLLING_READING
. PGSQL_POLLING_WRITING
. PGSQL_POLLING_OK
. PGSQL_POLLING_ACTIVE
- Polling via returned pg_socket() stream
pg_socket() returns a read-only socket stream that may be
cast to a file descriptor for select (and similar) polling
operations. Blocking behavior of the pgsql connection socket
can be controlled by calling stream_set_blocking() on the
stream returned by pg_socket().
-rw-r--r-- | ext/pgsql/pgsql.c | 479 | ||||
-rw-r--r-- | ext/pgsql/php_pgsql.h | 24 | ||||
-rwxr-xr-x | ext/pgsql/tests/29nb_async_connect.phpt | 44 | ||||
-rwxr-xr-x | ext/pgsql/tests/30nb_async_query_params.phpt | 78 | ||||
-rwxr-xr-x | ext/pgsql/tests/31nb_async_query_prepared.phpt | 112 | ||||
-rw-r--r-- | ext/pgsql/tests/32nb_async_query.phpt | 84 | ||||
-rw-r--r-- | ext/pgsql/tests/nonblocking.inc | 38 |
7 files changed, 761 insertions, 98 deletions
diff --git a/ext/pgsql/pgsql.c b/ext/pgsql/pgsql.c index bd9a3f6f11..f5e142a775 100644 --- a/ext/pgsql/pgsql.c +++ b/ext/pgsql/pgsql.c @@ -115,6 +115,10 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_pconnect, 0, 0, 1) ZEND_ARG_INFO(0, database) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_connect_poll, 0, 0, 0) + ZEND_ARG_INFO(0, connection) +ZEND_END_ARG_INFO() + #if HAVE_PQPARAMETERSTATUS ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_parameter_status, 0, 0, 1) ZEND_ARG_INFO(0, connection) @@ -526,6 +530,18 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_get_pid, 0, 0, 0) ZEND_ARG_INFO(0, connection) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_socket, 0, 0, 1) + ZEND_ARG_INFO(0, connection) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_consume_input, 0, 0, 1) + ZEND_ARG_INFO(0, connection) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_flush, 0, 0, 1) + ZEND_ARG_INFO(0, connection) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_meta_data, 0, 0, 2) ZEND_ARG_INFO(0, db) ZEND_ARG_INFO(0, table) @@ -574,6 +590,7 @@ const zend_function_entry pgsql_functions[] = { /* connection functions */ PHP_FE(pg_connect, arginfo_pg_connect) PHP_FE(pg_pconnect, arginfo_pg_pconnect) + PHP_FE(pg_connect_poll, arginfo_pg_connect_poll) PHP_FE(pg_close, arginfo_pg_close) PHP_FE(pg_connection_status, arginfo_pg_connection_status) PHP_FE(pg_connection_busy, arginfo_pg_connection_busy) @@ -643,6 +660,9 @@ const zend_function_entry pgsql_functions[] = { #endif /* async message function */ PHP_FE(pg_get_notify, arginfo_pg_get_notify) + PHP_FE(pg_socket, arginfo_pg_socket) + PHP_FE(pg_consume_input,arginfo_pg_consume_input) + PHP_FE(pg_flush, arginfo_pg_flush) PHP_FE(pg_get_pid, arginfo_pg_get_pid) /* error message functions */ PHP_FE(pg_result_error, arginfo_pg_result_error) @@ -1095,6 +1115,7 @@ PHP_MINIT_FUNCTION(pgsql) #endif /* For connection option */ REGISTER_LONG_CONSTANT("PGSQL_CONNECT_FORCE_NEW", PGSQL_CONNECT_FORCE_NEW, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECT_ASYNC", PGSQL_CONNECT_ASYNC, CONST_CS | CONST_PERSISTENT); /* For pg_fetch_array() */ REGISTER_LONG_CONSTANT("PGSQL_ASSOC", PGSQL_ASSOC, CONST_CS | CONST_PERSISTENT); REGISTER_LONG_CONSTANT("PGSQL_NUM", PGSQL_NUM, CONST_CS | CONST_PERSISTENT); @@ -1102,6 +1123,18 @@ PHP_MINIT_FUNCTION(pgsql) /* For pg_connection_status() */ REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_BAD", CONNECTION_BAD, CONST_CS | CONST_PERSISTENT); REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_OK", CONNECTION_OK, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_STARTED", CONNECTION_STARTED, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_MADE", CONNECTION_MADE, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_AWAITING_RESPONSE", CONNECTION_AWAITING_RESPONSE, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_AUTH_OK", CONNECTION_AUTH_OK, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_SSL_STARTUP", CONNECTION_SSL_STARTUP, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_CONNECTION_SETENV", CONNECTION_SETENV, CONST_CS | CONST_PERSISTENT); + /* For pg_connect_poll() */ + REGISTER_LONG_CONSTANT("PGSQL_POLLING_FAILED", PGRES_POLLING_FAILED, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_POLLING_READING", PGRES_POLLING_READING, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_POLLING_WRITING", PGRES_POLLING_WRITING, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_POLLING_OK", PGRES_POLLING_OK, CONST_CS | CONST_PERSISTENT); + REGISTER_LONG_CONSTANT("PGSQL_POLLING_ACTIVE", PGRES_POLLING_ACTIVE, CONST_CS | CONST_PERSISTENT); #if HAVE_PGTRANSACTIONSTATUS /* For pg_transaction_status() */ REGISTER_LONG_CONSTANT("PGSQL_TRANSACTION_IDLE", PQTRANS_IDLE, CONST_CS | CONST_PERSISTENT); @@ -1401,17 +1434,35 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) php_error_docref(NULL TSRMLS_CC, E_WARNING, "Cannot create new link. Too many open links (%ld)", PGG(num_links)); goto err; } - if (connstring) { - pgsql = PQconnectdb(connstring); + + /* Non-blocking connect */ + if (connect_type & PGSQL_CONNECT_ASYNC) { + if (connstring) { + pgsql = PQconnectStart(connstring); + if (pgsql==NULL || PQstatus(pgsql)==CONNECTION_BAD) { + PHP_PQ_ERROR("Unable to connect to PostgreSQL server: %s", pgsql); + if (pgsql) { + PQfinish(pgsql); + } + goto err; + } + } else { + php_error_docref(NULL TSRMLS_CC, E_WARNING, "Connection string required for async connections"); + goto err; + } } else { - pgsql = PQsetdb(host,port,options,tty,dbname); - } - if (pgsql==NULL || PQstatus(pgsql)==CONNECTION_BAD) { - PHP_PQ_ERROR("Unable to connect to PostgreSQL server: %s", pgsql); - if (pgsql) { - PQfinish(pgsql); + if (connstring) { + pgsql = PQconnectdb(connstring); + } else { + pgsql = PQsetdb(host,port,options,tty,dbname); + } + if (pgsql==NULL || PQstatus(pgsql)==CONNECTION_BAD) { + PHP_PQ_ERROR("Unable to connect to PostgreSQL server: %s", pgsql); + if (pgsql) { + PQfinish(pgsql); + } + goto err; } - goto err; } /* add it to the list */ @@ -1463,6 +1514,31 @@ PHP_FUNCTION(pg_connect) } /* }}} */ +/* {{{ proto resource pg_connect_poll(resource connection) + Poll the status of an in-progress async PostgreSQL connection attempt*/ +PHP_FUNCTION(pg_connect_poll) +{ + zval *pgsql_link; + int id = -1; + PGconn *pgsql; + int ret; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &pgsql_link) == FAILURE) { + return; + } + + if (pgsql_link == NULL && id == -1) { + RETURN_FALSE; + } + + ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); + + ret = PQconnectPoll(pgsql); + + RETURN_LONG(ret); +} +/* }}} */ + /* {{{ proto resource pg_pconnect(string connection_string | [string host, string port [, string options [, string tty,]]] string database) Open a persistent PostgreSQL connection */ PHP_FUNCTION(pg_pconnect) @@ -4724,6 +4800,16 @@ PHP_FUNCTION(pg_connection_busy) } /* }}} */ +static int _php_pgsql_link_has_results(PGconn *pgsql) +{ + PGresult *result; + while ((result = PQgetResult(pgsql))) { + PQclear(result); + return 1; + } + return 0; +} + /* {{{ proto bool pg_send_query(resource connection, string query) Send asynchronous query */ PHP_FUNCTION(pg_send_query) @@ -4733,48 +4819,63 @@ PHP_FUNCTION(pg_send_query) int len; int id = -1; PGconn *pgsql; - PGresult *res; - int leftover = 0; + int is_non_blocking; int ret; - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rs", - &pgsql_link, &query, &len) == FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rs", &pgsql_link, &query, &len) == FAILURE) { return; } ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); - if (PQ_SETNONBLOCKING(pgsql, 1)) { + is_non_blocking = PQisnonblocking(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 1) == -1) { php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to nonblocking mode"); RETURN_FALSE; } - while ((res = PQgetResult(pgsql))) { - PQclear(res); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "There are results on this connection. Call pg_get_result() until it returns FALSE"); + + if (_php_pgsql_link_has_results(pgsql)) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, + "There are results on this connection. Call pg_get_result() until it returns FALSE"); } - if (!PQsendQuery(pgsql, query)) { - if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { - PQreset(pgsql); - } + + if (is_non_blocking) { if (!PQsendQuery(pgsql, query)) { RETURN_FALSE; } - } - /* Wait to finish sending buffer */ - while ((ret = PQflush(pgsql))) { - if (ret == -1) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); - break; + ret = PQflush(pgsql); + } else { + if (!PQsendQuery(pgsql, query)) { + if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { + PQreset(pgsql); + } + if (!PQsendQuery(pgsql, query)) { + RETURN_FALSE; + } + } + + /* Wait to finish sending buffer */ + while ((ret = PQflush(pgsql))) { + if (ret == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); + break; + } + usleep(10000); + } + + if (PQ_SETNONBLOCKING(pgsql, 0)) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); } - usleep(10000); } - if (PQ_SETNONBLOCKING(pgsql, 0)) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); + + if (ret == 0) { + RETURN_TRUE; + } else if (ret == -1) { + RETURN_FALSE; + } else { + RETURN_LONG(0); } - RETURN_TRUE; } /* }}} */ @@ -4789,8 +4890,7 @@ PHP_FUNCTION(pg_send_query_params) char *query; int query_len, id = -1; PGconn *pgsql; - PGresult *res; - int leftover = 0; + int is_non_blocking; int ret; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rsa/", &pgsql_link, &query, &query_len, &pv_param_arr) == FAILURE) { @@ -4803,16 +4903,16 @@ PHP_FUNCTION(pg_send_query_params) ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); - if (PQ_SETNONBLOCKING(pgsql, 1)) { + is_non_blocking = PQisnonblocking(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 1) == -1) { php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to nonblocking mode"); RETURN_FALSE; } - while ((res = PQgetResult(pgsql))) { - PQclear(res); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "There are results on this connection. Call pg_get_result() until it returns FALSE"); + + if (_php_pgsql_link_has_results(pgsql)) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, + "There are results on this connection. Call pg_get_result() until it returns FALSE"); } zend_hash_internal_pointer_reset(Z_ARRVAL_P(pv_param_arr)); @@ -4848,7 +4948,12 @@ PHP_FUNCTION(pg_send_query_params) } } - if (!PQsendQueryParams(pgsql, query, num_params, NULL, (const char * const *)params, NULL, NULL, 0)) { + if (PQsendQueryParams(pgsql, query, num_params, NULL, (const char * const *)params, NULL, NULL, 0)) { + _php_pgsql_free_params(params, num_params); + } else if (is_non_blocking) { + _php_pgsql_free_params(params, num_params); + RETURN_FALSE; + } else { if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { PQreset(pgsql); } @@ -4857,19 +4962,31 @@ PHP_FUNCTION(pg_send_query_params) RETURN_FALSE; } } - _php_pgsql_free_params(params, num_params); - /* Wait to finish sending buffer */ - while ((ret = PQflush(pgsql))) { - if (ret == -1) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); - break; + + if (is_non_blocking) { + ret = PQflush(pgsql); + } else { + /* Wait to finish sending buffer */ + while ((ret = PQflush(pgsql))) { + if (ret == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); + break; + } + usleep(10000); + } + + if (PQ_SETNONBLOCKING(pgsql, 0) != 0) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); } - usleep(10000); } - if (PQ_SETNONBLOCKING(pgsql, 0)) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); + + if (ret == 0) { + RETURN_TRUE; + } else if (ret == -1) { + RETURN_FALSE; + } else { + RETURN_LONG(0); } - RETURN_TRUE; } /* }}} */ #endif @@ -4883,8 +5000,7 @@ PHP_FUNCTION(pg_send_prepare) char *query, *stmtname; int stmtname_len, query_len, id = -1; PGconn *pgsql; - PGresult *res; - int leftover = 0; + int is_non_blocking; int ret; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rss", &pgsql_link, &stmtname, &stmtname_len, &query, &query_len) == FAILURE) { @@ -4897,37 +5013,54 @@ PHP_FUNCTION(pg_send_prepare) ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); - if (PQ_SETNONBLOCKING(pgsql, 1)) { + is_non_blocking = PQisnonblocking(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 1) == -1) { php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to nonblocking mode"); RETURN_FALSE; } - while ((res = PQgetResult(pgsql))) { - PQclear(res); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "There are results on this connection. Call pg_get_result() until it returns FALSE"); + + if (_php_pgsql_link_has_results(pgsql)) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, + "There are results on this connection. Call pg_get_result() until it returns FALSE"); } + if (!PQsendPrepare(pgsql, stmtname, query, 0, NULL)) { - if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { - PQreset(pgsql); - } - if (!PQsendPrepare(pgsql, stmtname, query, 0, NULL)) { + if (is_non_blocking) { RETURN_FALSE; + } else { + if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { + PQreset(pgsql); + } + if (!PQsendPrepare(pgsql, stmtname, query, 0, NULL)) { + RETURN_FALSE; + } } } - /* Wait to finish sending buffer */ - while ((ret = PQflush(pgsql))) { - if (ret == -1) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty postgres send buffer"); - break; + + if (is_non_blocking) { + ret = PQflush(pgsql); + } else { + /* Wait to finish sending buffer */ + while ((ret = PQflush(pgsql))) { + if (ret == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); + break; + } + usleep(10000); + } + if (PQ_SETNONBLOCKING(pgsql, 0) != 0) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); } - usleep(10000); } - if (PQ_SETNONBLOCKING(pgsql, 0)) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); + + if (ret == 0) { + RETURN_TRUE; + } else if (ret == -1) { + RETURN_FALSE; + } else { + RETURN_LONG(0); } - RETURN_TRUE; } /* }}} */ #endif @@ -4944,8 +5077,7 @@ PHP_FUNCTION(pg_send_execute) char *stmtname; int stmtname_len, id = -1; PGconn *pgsql; - PGresult *res; - int leftover = 0; + int is_non_blocking; int ret; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "rsa", &pgsql_link, &stmtname, &stmtname_len, &pv_param_arr) == FAILURE) { @@ -4958,16 +5090,16 @@ PHP_FUNCTION(pg_send_execute) ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); - if (PQ_SETNONBLOCKING(pgsql, 1)) { + is_non_blocking = PQisnonblocking(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 1) == -1) { php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to nonblocking mode"); RETURN_FALSE; } - while ((res = PQgetResult(pgsql))) { - PQclear(res); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "There are results on this connection. Call pg_get_result() until it returns FALSE"); + + if (_php_pgsql_link_has_results(pgsql)) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, + "There are results on this connection. Call pg_get_result() until it returns FALSE"); } zend_hash_internal_pointer_reset(Z_ARRVAL_P(pv_param_arr)); @@ -4975,8 +5107,8 @@ PHP_FUNCTION(pg_send_execute) if (num_params > 0) { int i = 0; params = (char **)safe_emalloc(sizeof(char *), num_params, 0); - - for(i = 0; i < num_params; i++) { + + for (i = 0; i < num_params; i++) { if (zend_hash_get_current_data(Z_ARRVAL_P(pv_param_arr), (void **) &tmp) == FAILURE) { php_error_docref(NULL TSRMLS_CC, E_WARNING,"Error getting parameter"); _php_pgsql_free_params(params, num_params); @@ -5003,7 +5135,12 @@ PHP_FUNCTION(pg_send_execute) } } - if (!PQsendQueryPrepared(pgsql, stmtname, num_params, (const char * const *)params, NULL, NULL, 0)) { + if (PQsendQueryPrepared(pgsql, stmtname, num_params, (const char * const *)params, NULL, NULL, 0)) { + _php_pgsql_free_params(params, num_params); + } else if (is_non_blocking) { + _php_pgsql_free_params(params, num_params); + RETURN_FALSE; + } else { if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { PQreset(pgsql); } @@ -5012,19 +5149,30 @@ PHP_FUNCTION(pg_send_execute) RETURN_FALSE; } } - _php_pgsql_free_params(params, num_params); - /* Wait to finish sending buffer */ - while ((ret = PQflush(pgsql))) { - if (ret == -1) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty postgres send buffer"); - break; + + if (is_non_blocking) { + ret = PQflush(pgsql); + } else { + /* Wait to finish sending buffer */ + while ((ret = PQflush(pgsql))) { + if (ret == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Could not empty PostgreSQL send buffer"); + break; + } + usleep(10000); + } + if (PQ_SETNONBLOCKING(pgsql, 0) != 0) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); } - usleep(10000); } - if (PQ_SETNONBLOCKING(pgsql, 0)) { - php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to blocking mode"); + + if (ret == 0) { + RETURN_TRUE; + } else if (ret == -1) { + RETURN_FALSE; + } else { + RETURN_LONG(0); } - RETURN_TRUE; } /* }}} */ #endif @@ -5169,6 +5317,141 @@ PHP_FUNCTION(pg_get_pid) } /* }}} */ +static size_t php_pgsql_fd_write(php_stream *stream, const char *buf, size_t count TSRMLS_DC) +{ + return 0; +} + +static size_t php_pgsql_fd_read(php_stream *stream, char *buf, size_t count TSRMLS_DC) +{ + return 0; +} + +static int php_pgsql_fd_close(php_stream *stream, int close_handle TSRMLS_DC) +{ + return EOF; +} + +static int php_pgsql_fd_flush(php_stream *stream TSRMLS_DC) +{ + return FAILURE; +} + +static int php_pgsql_fd_set_option(php_stream *stream, int option, int value, void *ptrparam TSRMLS_DC) +{ + PGconn *pgsql = (PGconn *) stream->abstract; + switch (option) { + case PHP_STREAM_OPTION_BLOCKING: + return PQ_SETNONBLOCKING(pgsql, value); + default: + return FAILURE; + } +} + +static int php_pgsql_fd_cast(php_stream *stream, int cast_as, void **ret TSRMLS_DC) +{ + PGconn *pgsql = (PGconn *) stream->abstract; + int fd_number; + + switch (cast_as) { + case PHP_STREAM_AS_FD_FOR_SELECT: + case PHP_STREAM_AS_FD: + case PHP_STREAM_AS_SOCKETD: + if (ret) { + fd_number = PQsocket(pgsql); + if (fd_number == -1) { + return FAILURE; + } + + *(php_socket_t *)ret = fd_number; + return SUCCESS; + } + default: + return FAILURE; + } +} + +/* {{{ proto resource pg_socket(resource) + Get a read-only handle to the socket underlying the pgsql connection */ +PHP_FUNCTION(pg_socket) +{ + zval *pgsql_link; + php_stream *stream; + PGconn *pgsql; + int id = -1; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &pgsql_link) == FAILURE) { + return; + } + + ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); + + stream = php_stream_alloc(&php_stream_pgsql_fd_ops, pgsql, NULL, "r"); + + if (stream) { + php_stream_to_zval(stream, return_value); + return; + } + + RETURN_FALSE; +} +/* }}} */ + +/* {{{ proto bool pg_consume_input(resource) + Reads input on the connection */ +PHP_FUNCTION(pg_consume_input) +{ + zval *pgsql_link; + int id = -1; + PGconn *pgsql; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &pgsql_link) == FAILURE) { + return; + } + + ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); + + RETURN_BOOL(PQconsumeInput(pgsql)); +} +/* }}} */ + +/* {{{ proto mixed pg_flush(resource) + Flush outbound query data on the connection */ +PHP_FUNCTION(pg_flush) +{ + zval *pgsql_link; + int id = -1; + PGconn *pgsql; + int ret; + int is_non_blocking; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "r", &pgsql_link) == FAILURE) { + return; + } + + ZEND_FETCH_RESOURCE2(pgsql, PGconn *, &pgsql_link, id, "PostgreSQL link", le_link, le_plink); + + is_non_blocking = PQisnonblocking(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 1) == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Cannot set connection to nonblocking mode"); + RETURN_FALSE; + } + + ret = PQflush(pgsql); + + if (is_non_blocking == 0 && PQ_SETNONBLOCKING(pgsql, 0) == -1) { + php_error_docref(NULL TSRMLS_CC, E_NOTICE, "Failed resetting connection to blocking mode"); + } + + switch (ret) { + case 0: RETURN_TRUE; break; + case 1: RETURN_LONG(0); break; + default: RETURN_FALSE; + } +} +/* }}} */ + /* {{{ php_pgsql_meta_data * TODO: Add meta_data cache for better performance */ diff --git a/ext/pgsql/php_pgsql.h b/ext/pgsql/php_pgsql.h index b6caf9d201..62f20dad57 100644 --- a/ext/pgsql/php_pgsql.h +++ b/ext/pgsql/php_pgsql.h @@ -67,6 +67,7 @@ PHP_MINFO_FUNCTION(pgsql); /* connection functions */ PHP_FUNCTION(pg_connect); PHP_FUNCTION(pg_pconnect); +PHP_FUNCTION(pg_connect_poll); PHP_FUNCTION(pg_close); PHP_FUNCTION(pg_connection_reset); PHP_FUNCTION(pg_connection_status); @@ -134,6 +135,9 @@ PHP_FUNCTION(pg_field_is_null); PHP_FUNCTION(pg_field_table); /* async message functions */ PHP_FUNCTION(pg_get_notify); +PHP_FUNCTION(pg_socket); +PHP_FUNCTION(pg_consume_input); +PHP_FUNCTION(pg_flush); PHP_FUNCTION(pg_get_pid); /* error message functions */ PHP_FUNCTION(pg_result_error); @@ -191,6 +195,7 @@ PHP_FUNCTION(pg_select); /* connection options - ToDo: Add async connection option */ #define PGSQL_CONNECT_FORCE_NEW (1<<1) +#define PGSQL_CONNECT_ASYNC (1<<2) /* php_pgsql_convert options */ #define PGSQL_CONV_IGNORE_DEFAULT (1<<1) /* Do not use DEAFULT value by removing field from returned array */ #define PGSQL_CONV_FORCE_NULL (1<<2) /* Convert to NULL if string is null string */ @@ -222,6 +227,13 @@ static void php_pgsql_get_field_info(INTERNAL_FUNCTION_PARAMETERS, int entry_typ static void php_pgsql_data_info(INTERNAL_FUNCTION_PARAMETERS, int entry_type); static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS,int entry_type); +static size_t php_pgsql_fd_write(php_stream *stream, const char *buf, size_t count TSRMLS_DC); +static size_t php_pgsql_fd_read(php_stream *stream, char *buf, size_t count TSRMLS_DC); +static int php_pgsql_fd_close(php_stream *stream, int close_handle TSRMLS_DC); +static int php_pgsql_fd_flush(php_stream *stream TSRMLS_DC); +static int php_pgsql_fd_set_option(php_stream *stream, int option, int value, void *ptrparam TSRMLS_DC); +static int php_pgsql_fd_cast(php_stream *stream, int cast_as, void **ret TSRMLS_DC); + typedef enum _php_pgsql_data_type { /* boolean */ PG_BOOL, @@ -284,6 +296,18 @@ typedef struct _php_pgsql_notice { size_t len; } php_pgsql_notice; +static php_stream_ops php_stream_pgsql_fd_ops = { + php_pgsql_fd_write, + php_pgsql_fd_read, + php_pgsql_fd_close, + php_pgsql_fd_flush, + "PostgreSQL link", + NULL, /* seek */ + php_pgsql_fd_cast, /* cast */ + NULL, /* stat */ + php_pgsql_fd_set_option +}; + ZEND_BEGIN_MODULE_GLOBALS(pgsql) long default_link; /* default link when connection is omitted */ long num_links,num_persistent; diff --git a/ext/pgsql/tests/29nb_async_connect.phpt b/ext/pgsql/tests/29nb_async_connect.phpt new file mode 100755 index 0000000000..fc3868a26d --- /dev/null +++ b/ext/pgsql/tests/29nb_async_connect.phpt @@ -0,0 +1,44 @@ +--TEST-- +PostgreSQL non-blocking async connect +--SKIPIF-- +<?php +include("skipif.inc"); +?> +--FILE-- +<?php + +include('config.inc'); +include('nonblocking.inc'); + +if (!$db = pg_connect($conn_str, PGSQL_CONNECT_ASYNC)) { + die("pg_connect() error"); +} elseif (pg_connection_status($db) === PGSQL_CONNECTION_BAD) { + die("pg_connect() error"); +} elseif ($db_socket = pg_socket($db)) { + stream_set_blocking($db_socket, FALSE); +} else { + die("pg_socket() error"); +} + +while (TRUE) { + switch ($status = pg_connect_poll($db)) { + case PGSQL_POLLING_READING: + if (nb_is_readable($db_socket)) { break 2; } + break; + case PGSQL_POLLING_WRITING: + if (nb_is_writable($db_socket)) { break 2; } + break; + case PGSQL_POLLING_FAILED: + die("async connection failed"); + case PGSQL_POLLING_OK: + break 2; + } +} +assert(pg_connection_status($db) === PGSQL_CONNECTION_MADE); +echo "OK"; + +pg_close($db); + +?> +--EXPECT-- +OK diff --git a/ext/pgsql/tests/30nb_async_query_params.phpt b/ext/pgsql/tests/30nb_async_query_params.phpt new file mode 100755 index 0000000000..a88769b038 --- /dev/null +++ b/ext/pgsql/tests/30nb_async_query_params.phpt @@ -0,0 +1,78 @@ +--TEST--
+PostgreSQL non-blocking async query params
+--SKIPIF--
+<?php
+include("skipif.inc");
+if (!function_exists('pg_send_query_params')) die('skip function pg_send_query_params() does not exist');
+?>
+--FILE--
+<?php
+
+include('config.inc');
+include('nonblocking.inc');
+
+$db = pg_connect($conn_str);
+
+$version = pg_version($db);
+if ($version['protocol'] < 3) {
+ echo "OK";
+ exit(0);
+}
+
+$db_socket = pg_socket($db);
+stream_set_blocking($db_socket, false);
+
+$sent = pg_send_query_params($db, "SELECT * FROM ".$table_name." WHERE num > \$1;", array(100));
+if ($sent === FALSE) {
+ echo "pg_send_query_params() error\n";
+} elseif ($sent === 0) {
+ nb_flush($db, $db_socket);
+}
+
+nb_consume($db, $db_socket);
+
+if (!($result = pg_get_result($db))) {
+ echo "pg_get_result() error\n";
+}
+if (!($rows = pg_num_rows($result))) {
+ echo "pg_num_rows() error\n";
+}
+for ($i=0; $i < $rows; $i++) {
+ pg_fetch_array($result, $i, PGSQL_NUM);
+}
+for ($i=0; $i < $rows; $i++) {
+ pg_fetch_object($result);
+}
+for ($i=0; $i < $rows; $i++) {
+ pg_fetch_row($result, $i);
+}
+for ($i=0; $i < $rows; $i++) {
+ pg_fetch_result($result, $i, 0);
+}
+
+pg_num_rows(pg_query_params($db, "SELECT * FROM ".$table_name." WHERE num > \$1;", array(100)));
+pg_num_fields(pg_query_params($db, "SELECT * FROM ".$table_name." WHERE num > \$1;", array(100)));
+pg_field_name($result, 0);
+pg_field_num($result, $field_name);
+pg_field_size($result, 0);
+pg_field_type($result, 0);
+pg_field_prtlen($result, 0);
+pg_field_is_null($result, 0);
+
+$sent = pg_send_query_params($db, "INSERT INTO ".$table_name." VALUES (\$1, \$2);", array(9999, "A'BC"));
+
+if ($sent === FALSE) {
+ echo "pg_send_query_params() error\n";
+} elseif ($sent === 0) {
+ nb_flush($db, $db_socket);
+}
+
+pg_last_oid($result);
+pg_free_result($result);
+
+pg_close($db);
+
+echo "OK";
+?>
+--EXPECT--
+OK
diff --git a/ext/pgsql/tests/31nb_async_query_prepared.phpt b/ext/pgsql/tests/31nb_async_query_prepared.phpt new file mode 100755 index 0000000000..d82ae798e8 --- /dev/null +++ b/ext/pgsql/tests/31nb_async_query_prepared.phpt @@ -0,0 +1,112 @@ +--TEST-- +PostgreSQL non-blocking async prepared queries +--SKIPIF-- +<?php +include("skipif.inc"); +if (!function_exists('pg_send_prepare')) die('skip function pg_send_prepare() does not exist'); +?> +--FILE-- +<?php + +include('config.inc'); +include('nonblocking.inc'); + +$db = pg_connect($conn_str); + +$version = pg_version($db); +if ($version['protocol'] < 3) { + echo "OK"; + exit(0); +} + +$db_socket = pg_socket($db); +stream_set_blocking($db_socket, false); + +$nb_send = pg_send_prepare($db, 'php_test', "SELECT * FROM ".$table_name." WHERE num > \$1;"); +if ($nb_send === FALSE) { + echo "pg_send_prepare() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} +pg_free_result($result); + +$nb_send = pg_send_execute($db, 'php_test', array(100)); +if ($nb_send === FALSE) { + echo "pg_send_execute() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} + +if (!($rows = pg_num_rows($result))) { + echo "pg_num_rows() error\n"; +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_array($result, $i, PGSQL_NUM); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_object($result); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_row($result, $i); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_result($result, $i, 0); +} + +pg_num_rows(pg_query_params($db, "SELECT * FROM ".$table_name." WHERE num > \$1;", array(100))); +pg_num_fields(pg_query_params($db, "SELECT * FROM ".$table_name." WHERE num > \$1;", array(100))); +pg_field_name($result, 0); +pg_field_num($result, $field_name); +pg_field_size($result, 0); +pg_field_type($result, 0); +pg_field_prtlen($result, 0); +pg_field_is_null($result, 0); + +$nb_send = pg_send_prepare($db, "php_test2", "INSERT INTO ".$table_name." VALUES (\$1, \$2);"); +if ($nb_send === FALSE) { + echo "pg_send_prepare() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} +pg_free_result($result); + +$nb_send = pg_send_execute($db, "php_test2", array(9999, "A'BC")); +if ($nb_send === FALSE) { + echo "pg_send_execute() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} + +pg_last_oid($result); +pg_free_result($result); +pg_close($db); + +echo "OK"; +?> +--EXPECT-- +OK + diff --git a/ext/pgsql/tests/32nb_async_query.phpt b/ext/pgsql/tests/32nb_async_query.phpt new file mode 100644 index 0000000000..7858a60eea --- /dev/null +++ b/ext/pgsql/tests/32nb_async_query.phpt @@ -0,0 +1,84 @@ +--TEST-- +PostgreSQL non-blocking async queries +--SKIPIF-- +<?php +include("skipif.inc"); +if (!function_exists('pg_send_prepare')) die('skip function pg_send_prepare() does not exist'); +?> +--FILE-- +<?php + +include('config.inc'); +include('nonblocking.inc'); + +$db = pg_connect($conn_str); + +$version = pg_version($db); +if ($version['protocol'] < 3) { + echo "OK"; + exit(0); +} + +$db_socket = pg_socket($db); +stream_set_blocking($db_socket, false); + +$nb_send = pg_send_query($db, "SELECT * FROM ".$table_name.";"); +if ($nb_send === FALSE) { + echo "pg_send_query() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} + +if (!($rows = pg_num_rows($result))) { + echo "pg_num_rows() error\n"; +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_array($result, $i, PGSQL_NUM); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_object($result); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_row($result, $i); +} +for ($i=0; $i < $rows; $i++) { + pg_fetch_result($result, $i, 0); +} + +pg_num_rows(pg_query($db, "SELECT * FROM ".$table_name.";")); +pg_num_fields(pg_query($db, "SELECT * FROM ".$table_name.";")); +pg_field_name($result, 0); +pg_field_num($result, $field_name); +pg_field_size($result, 0); +pg_field_type($result, 0); +pg_field_prtlen($result, 0); +pg_field_is_null($result, 0); + +$nb_send = pg_send_query($db, "INSERT INTO ".$table_name." VALUES (8888, 'GGG');"); +if ($nb_send === FALSE) { + echo "pg_send_query() error\n"; +} elseif ($nb_send === 0) { + nb_flush($db, $db_socket); +} + +nb_consume($db, $db_socket); + +if (!($result = pg_get_result($db))) { + echo "pg_get_result() error\n"; +} + +pg_last_oid($result); +pg_free_result($result); +pg_close($db); + +echo "OK"; +?> +--EXPECT-- +OK + diff --git a/ext/pgsql/tests/nonblocking.inc b/ext/pgsql/tests/nonblocking.inc new file mode 100644 index 0000000000..4cf7c09a51 --- /dev/null +++ b/ext/pgsql/tests/nonblocking.inc @@ -0,0 +1,38 @@ +<?php + +function nb_is_readable($stream, $timeout = 1) { + $r = [$stream]; $w = []; $e = []; + return (bool) stream_select($r, $w, $e, $timeout, 0); +}; +function nb_is_writable($stream, $timeout = 1) { + $r = []; $w = [$stream]; $e = []; + return (bool) stream_select($r, $w, $e, $timeout, 0); +}; +function nb_flush($db, $db_socket) { + while (TRUE) { + if (! nb_is_writable($db_socket)) { + continue; + } + $flush = pg_flush($db); + if ($flush === TRUE) { + break; // All data flushed + } elseif ($flush === FALSE) { + echo "pg_flush() error\n"; + break; + } + } +}; +function nb_consume($db, $db_socket) { + while (TRUE) { + if (!nb_is_readable($db_socket)) { + continue; + } elseif (!pg_consume_input($db)) { + echo "pg_consume_input() error\n"; + break; + } elseif (!pg_connection_busy($db)) { + break; // All data consumed + } + + } +}; + |