summaryrefslogtreecommitdiff
path: root/lang/python/wiredtiger.i
diff options
context:
space:
mode:
authorMichael Cahill <michael.cahill@wiredtiger.com>2014-05-08 14:33:51 +1000
committerMichael Cahill <michael.cahill@wiredtiger.com>2014-05-08 14:33:51 +1000
commit151f14dace93d64db7276568401718596d0813e7 (patch)
tree5d89eaaa4ba61cf73f2d8f768e47857421140b67 /lang/python/wiredtiger.i
parent30edbac897e908c13a2793655ebc343a02316d3e (diff)
parentf00d326480d33d617e5e7df3ee5d795a676af1bc (diff)
downloadmongo-151f14dace93d64db7276568401718596d0813e7.tar.gz
Merge branch 'develop' into json-dump
Conflicts: src/cursor/cur_std.c src/include/cursor.h src/include/packing.i
Diffstat (limited to 'lang/python/wiredtiger.i')
-rw-r--r--lang/python/wiredtiger.i315
1 files changed, 297 insertions, 18 deletions
diff --git a/lang/python/wiredtiger.i b/lang/python/wiredtiger.i
index d63b440986f..cfa9b51ea25 100644
--- a/lang/python/wiredtiger.i
+++ b/lang/python/wiredtiger.i
@@ -52,10 +52,18 @@ from packing import pack, unpack
%typemap(in, numinputs=0) WT_SESSION ** (WT_SESSION *temp = NULL) {
$1 = &temp;
}
+%typemap(in, numinputs=0) WT_ASYNC_OP ** (WT_ASYNC_OP *temp = NULL) {
+ $1 = &temp;
+}
%typemap(in, numinputs=0) WT_CURSOR ** (WT_CURSOR *temp = NULL) {
$1 = &temp;
}
+%typemap(in) WT_ASYNC_CALLBACK * (PyObject *callback_obj = NULL) %{
+ callback_obj = $input;
+ $1 = &pyApiAsyncCallback;
+%}
+
%typemap(in, numinputs=0) WT_EVENT_HANDLER * %{
$1 = &pyApiEventHandler;
%}
@@ -80,6 +88,32 @@ from packing import pack, unpack
}
}
}
+%typemap(argout) WT_ASYNC_OP ** {
+ $result = SWIG_NewPointerObj(SWIG_as_voidptr(*$1),
+ SWIGTYPE_p___wt_async_op, 0);
+ if (*$1 != NULL) {
+ PY_CALLBACK *pcb;
+
+ (*$1)->c.flags |= WT_CURSTD_RAW;
+ PyObject_SetAttrString($result, "is_column",
+ PyBool_FromLong(strcmp((*$1)->key_format, "r") == 0));
+ PyObject_SetAttrString($result, "key_format",
+ PyString_InternFromString((*$1)->key_format));
+ PyObject_SetAttrString($result, "value_format",
+ PyString_InternFromString((*$1)->value_format));
+
+ if (__wt_calloc_def((WT_ASYNC_OP_IMPL *)(*$1), 1, &pcb) != 0)
+ SWIG_exception_fail(SWIG_MemoryError, "WT calloc failed");
+ else {
+ pcb->pyobj = $result;
+ Py_XINCREF(pcb->pyobj);
+ /* XXX Is there a way to avoid SWIG's numbering? */
+ pcb->pyasynccb = callback_obj5;
+ Py_XINCREF(pcb->pyasynccb);
+ (*$1)->c.lang_private = pcb;
+ }
+ }
+}
%typemap(argout) WT_CURSOR ** {
$result = SWIG_NewPointerObj(SWIG_as_voidptr(*$1),
@@ -175,7 +209,8 @@ DESTRUCTOR(__wt_session, close)
* asttribute to None, and free the PY_CALLBACK.
*/
typedef struct {
- PyObject *pyobj; /* the python Session/Cursor object */
+ PyObject *pyobj; /* the python Session/Cursor/AsyncOp object */
+ PyObject *pyasynccb; /* the callback to use for AsyncOp */
} PY_CALLBACK;
static PyObject *wtError;
@@ -214,6 +249,15 @@ class IterableCursor:
raise StopIteration
return self.cursor.get_keys() + self.cursor.get_values()
## @endcond
+
+# An abstract class, which must be subclassed with notify() overridden.
+class AsyncCallback:
+ def __init__(self):
+ raise NotImplementedError
+
+ def notify(self, op, op_ret, flags):
+ raise NotImplementedError
+
%}
/* Bail out if arg or arg.this is None, else set res to the C pointer. */
@@ -261,6 +305,7 @@ class IterableCursor:
%enddef
SELFHELPER(struct __wt_connection, connection)
+SELFHELPER(struct __wt_async_op, op)
SELFHELPER(struct __wt_session, session)
SELFHELPER(struct __wt_cursor, cursor)
@@ -285,6 +330,27 @@ do {
SWIG_ERROR_IF_NOT_SET(result);
}
+/* Async operations can return ENOMEM when no ops are available. */
+%define ENOMEM_OK(m)
+%exception m {
+retry:
+ $action
+ if (result != 0 && result != ENOMEM)
+ SWIG_ERROR_IF_NOT_SET(result);
+ else if (result == ENOMEM) {
+ __wt_sleep(0, 10000);
+ goto retry;
+ }
+}
+%enddef
+
+/* Any API that returns an enum type uses this. */
+%define ENUM_OK(m)
+%exception m {
+ $action
+}
+%enddef
+
/* Cursor positioning methods can also return WT_NOTFOUND. */
%define NOTFOUND_OK(m)
%exception m {
@@ -303,6 +369,8 @@ do {
}
%enddef
+ENOMEM_OK(__wt_connection::async_new_op)
+ENUM_OK(__wt_async_op::get_type)
NOTFOUND_OK(__wt_cursor::next)
NOTFOUND_OK(__wt_cursor::prev)
NOTFOUND_OK(__wt_cursor::remove)
@@ -316,11 +384,21 @@ COMPARE_OK(__wt_cursor::search_near)
%exception __wt_connection::search_near;
%exception __wt_connection::get_home;
%exception __wt_connection::is_new;
+%exception __wt_async_op::_set_key;
+%exception __wt_async_op::_set_value;
%exception __wt_cursor::_set_key;
%exception __wt_cursor::_set_value;
%exception wiredtiger_strerror;
%exception wiredtiger_version;
+/* WT_ASYNC_OP customization. */
+/* First, replace the varargs get / set methods with Python equivalents. */
+%ignore __wt_async_op::get_key;
+%ignore __wt_async_op::get_value;
+%ignore __wt_async_op::set_key;
+%ignore __wt_async_op::set_value;
+%immutable __wt_async_op::connection;
+
/* WT_CURSOR customization. */
/* First, replace the varargs get / set methods with Python equivalents. */
%ignore __wt_cursor::get_key;
@@ -354,6 +432,141 @@ typedef int int_void;
typedef int int_void;
%typemap(out) int_void { $result = VOID_Object; }
+%extend __wt_async_op {
+ /* Get / set keys and values */
+ void _set_key(char *data, int size) {
+ WT_ITEM k;
+ k.data = data;
+ k.size = (uint32_t)size;
+ $self->set_key($self, &k);
+ }
+
+ int_void _set_recno(uint64_t recno) {
+ WT_ITEM k;
+ uint8_t recno_buf[20];
+ size_t size;
+ int ret;
+ if ((ret = wiredtiger_struct_size(NULL,
+ &size, "r", recno)) != 0 ||
+ (ret = wiredtiger_struct_pack(NULL,
+ recno_buf, sizeof (recno_buf), "r", recno)) != 0)
+ return (ret);
+
+ k.data = recno_buf;
+ k.size = (uint32_t)size;
+ $self->set_key($self, &k);
+ return (ret);
+ }
+
+ void _set_value(char *data, int size) {
+ WT_ITEM v;
+ v.data = data;
+ v.size = (uint32_t)size;
+ $self->set_value($self, &v);
+ }
+
+ /* Don't return values, just throw exceptions on failure. */
+ int_void _get_key(char **datap, int *sizep) {
+ WT_ITEM k;
+ int ret = $self->get_key($self, &k);
+ if (ret == 0) {
+ *datap = (char *)k.data;
+ *sizep = (int)k.size;
+ }
+ return (ret);
+ }
+
+ int_void _get_recno(uint64_t *recnop) {
+ WT_ITEM k;
+ int ret = $self->get_key($self, &k);
+ if (ret == 0)
+ ret = wiredtiger_struct_unpack(NULL,
+ k.data, k.size, "q", recnop);
+ return (ret);
+ }
+
+ int_void _get_value(char **datap, int *sizep) {
+ WT_ITEM v;
+ int ret = $self->get_value($self, &v);
+ if (ret == 0) {
+ *datap = (char *)v.data;
+ *sizep = (int)v.size;
+ }
+ return (ret);
+ }
+
+ int _freecb() {
+ return (cursorFreeHandler($self));
+ }
+
+%pythoncode %{
+ def get_key(self):
+ '''get_key(self) -> object
+
+ @copydoc WT_ASYNC_OP::get_key
+ Returns only the first column.'''
+ k = self.get_keys()
+ if len(k) == 1:
+ return k[0]
+ return k
+
+ def get_keys(self):
+ '''get_keys(self) -> (object, ...)
+
+ @copydoc WT_ASYNC_OP::get_key'''
+ if self.is_column:
+ return [self._get_recno(),]
+ else:
+ return unpack(self.key_format, self._get_key())
+
+ def get_value(self):
+ '''get_value(self) -> object
+
+ @copydoc WT_ASYNC_OP::get_value
+ Returns only the first column.'''
+ v = self.get_values()
+ if len(v) == 1:
+ return v[0]
+ return v
+
+ def get_values(self):
+ '''get_values(self) -> (object, ...)
+
+ @copydoc WT_ASYNC_OP::get_value'''
+ return unpack(self.value_format, self._get_value())
+
+ def set_key(self, *args):
+ '''set_key(self) -> None
+
+ @copydoc WT_ASYNC_OP::set_key'''
+ if len(args) == 1 and type(args[0]) == tuple:
+ args = args[0]
+ if self.is_column:
+ self._set_recno(long(args[0]))
+ else:
+ # Keep the Python string pinned
+ self._key = pack(self.key_format, *args)
+ self._set_key(self._key)
+
+ def set_value(self, *args):
+ '''set_value(self) -> None
+
+ @copydoc WT_ASYNC_OP::set_value'''
+ if len(args) == 1 and type(args[0]) == tuple:
+ args = args[0]
+ # Keep the Python string pinned
+ self._value = pack(self.value_format, *args)
+ self._set_value(self._value)
+
+ def __getitem__(self, key):
+ '''Python convenience for searching'''
+ self.set_key(key)
+ if self.search() != 0:
+ raise KeyError
+ return self.get_value()
+%}
+};
+
%extend __wt_cursor {
/* Get / set keys and values */
void _set_key(char *data, int size) {
@@ -578,7 +791,13 @@ typedef int int_void;
%ignore __wt_cursor::key_format;
%ignore __wt_cursor::value_format;
%immutable __wt_session::connection;
+%immutable __wt_async_op::connection;
+%immutable __wt_async_op::uri;
+%immutable __wt_async_op::config;
+%ignore __wt_async_op::key_format;
+%ignore __wt_async_op::value_format;
+%ignore __wt_async_callback;
%ignore __wt_collator;
%ignore __wt_compressor;
%ignore __wt_config_item;
@@ -604,6 +823,7 @@ typedef int int_void;
/* Convert 'int *' to output args for wiredtiger_version */
%apply int *OUTPUT { int * };
+%rename(AsyncOp) __wt_async_op;
%rename(Cursor) __wt_cursor;
%rename(Session) __wt_session;
%rename(Connection) __wt_connection;
@@ -646,30 +866,24 @@ writeToPythonStream(const char *streamname, const char *message)
goto err;
if ((arglist = Py_BuildValue("(s)", msg)) == NULL)
goto err;
- if ((arglist2 = Py_BuildValue("()", msg)) == NULL)
+ if ((arglist2 = Py_BuildValue("()")) == NULL)
goto err;
written = PyObject_CallObject(write_method, arglist);
(void)PyObject_CallObject(flush_method, arglist2);
ret = 0;
-err: /* Release python Global Interpreter Lock */
+err: Py_XDECREF(arglist2);
+ Py_XDECREF(arglist);
+ Py_XDECREF(flush_method);
+ Py_XDECREF(write_method);
+ Py_XDECREF(se);
+ Py_XDECREF(sys);
+ Py_XDECREF(written);
+
+ /* Release python Global Interpreter Lock */
SWIG_PYTHON_THREAD_END_BLOCK;
- if (arglist2)
- Py_XDECREF(arglist2);
- if (arglist)
- Py_XDECREF(arglist);
- if (flush_method)
- Py_XDECREF(flush_method);
- if (write_method)
- Py_XDECREF(write_method);
- if (se)
- Py_XDECREF(se);
- if (sys)
- Py_XDECREF(sys);
- if (written)
- Py_XDECREF(written);
if (msg)
free(msg);
return (ret);
@@ -709,6 +923,7 @@ pythonClose(PY_CALLBACK *pcb)
ret = EINVAL; /* any non-zero value will do. */
}
Py_XDECREF(pcb->pyobj);
+ Py_XDECREF(pcb->pyasynccb);
SWIG_PYTHON_THREAD_END_BLOCK;
@@ -792,11 +1007,75 @@ pythonCloseCallback(WT_EVENT_HANDLER *handler, WT_SESSION *session,
return (ret);
}
-WT_EVENT_HANDLER pyApiEventHandler = {
+static WT_EVENT_HANDLER pyApiEventHandler = {
pythonErrorCallback, pythonMessageCallback, NULL, pythonCloseCallback
};
%}
+/* Add async callback support. */
+%{
+
+static int
+pythonAsyncCallback(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *asyncop, int opret,
+ uint32_t flags)
+{
+ int ret, t_ret;
+ PY_CALLBACK *pcb;
+ PyObject *arglist, *notify_method, *pyresult;
+ WT_ASYNC_OP_IMPL *op;
+ WT_SESSION_IMPL *session;
+
+ /*
+ * Ensure the global interpreter lock is held since we'll be
+ * making Python calls now.
+ */
+ SWIG_PYTHON_THREAD_BEGIN_BLOCK;
+
+ op = (WT_ASYNC_OP_IMPL *)asyncop;
+ session = O2S(op);
+ pcb = (PY_CALLBACK *)asyncop->c.lang_private;
+ asyncop->c.lang_private = NULL;
+ ret = 0;
+
+ if (pcb->pyasynccb == NULL)
+ goto err;
+ if ((arglist = Py_BuildValue("(Oii)", pcb->pyobj,
+ opret, flags)) == NULL)
+ goto err;
+ if ((notify_method = PyObject_GetAttrString(pcb->pyasynccb,
+ "notify")) == NULL)
+ goto err;
+
+ pyresult = PyEval_CallObject(notify_method, arglist);
+ if (pyresult == NULL || !PyArg_Parse(pyresult, "i", &ret))
+ goto err;
+
+ if (0) {
+ if (ret == 0)
+ ret = EINVAL;
+err: __wt_err(session, ret, "python async callback error");
+ }
+ Py_XDECREF(pyresult);
+ Py_XDECREF(notify_method);
+ Py_XDECREF(arglist);
+
+ SWIG_PYTHON_THREAD_END_BLOCK;
+
+ if (pcb != NULL) {
+ if ((t_ret = pythonClose(pcb) != 0) && ret == 0)
+ ret = t_ret;
+ }
+ __wt_free(session, pcb);
+
+ if (ret == 0 && (opret == 0 || opret == WT_NOTFOUND))
+ return (0);
+ else
+ return (1);
+}
+
+static WT_ASYNC_CALLBACK pyApiAsyncCallback = { pythonAsyncCallback };
+%}
+
%pythoncode %{
class stat:
'''keys for statistics cursors'''