summaryrefslogtreecommitdiff
path: root/redis/commands/graph/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/commands/graph/commands.py')
-rw-r--r--redis/commands/graph/commands.py146
1 files changed, 123 insertions, 23 deletions
diff --git a/redis/commands/graph/commands.py b/redis/commands/graph/commands.py
index fe4224b..762ab42 100644
--- a/redis/commands/graph/commands.py
+++ b/redis/commands/graph/commands.py
@@ -3,7 +3,16 @@ from redis.exceptions import ResponseError
from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
-from .query_result import QueryResult
+from .query_result import AsyncQueryResult, QueryResult
+
+PROFILE_CMD = "GRAPH.PROFILE"
+RO_QUERY_CMD = "GRAPH.RO_QUERY"
+QUERY_CMD = "GRAPH.QUERY"
+DELETE_CMD = "GRAPH.DELETE"
+SLOWLOG_CMD = "GRAPH.SLOWLOG"
+CONFIG_CMD = "GRAPH.CONFIG"
+LIST_CMD = "GRAPH.LIST"
+EXPLAIN_CMD = "GRAPH.EXPLAIN"
class GraphCommands:
@@ -52,33 +61,28 @@ class GraphCommands:
query = q
# handle query parameters
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
- cmd = "GRAPH.PROFILE"
+ cmd = PROFILE_CMD
else:
- cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
+ cmd = RO_QUERY_CMD if read_only else QUERY_CMD
command = [cmd, self.name, query, "--compact"]
# include timeout is specified
- if timeout:
- if not isinstance(timeout, int):
- raise Exception("Timeout argument must be a positive integer")
- command += ["timeout", timeout]
+ if isinstance(timeout, int):
+ command.extend(["timeout", timeout])
+ elif timeout is not None:
+ raise Exception("Timeout argument must be a positive integer")
# issue query
try:
response = self.execute_command(*command)
return QueryResult(self, response, profile)
except ResponseError as e:
- if "wrong number of arguments" in str(e):
- print(
- "Note: RedisGraph Python requires server version 2.2.8 or above"
- ) # noqa
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return self.query(q, params, timeout, read_only=False)
@@ -106,7 +110,7 @@ class GraphCommands:
For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa
"""
self._clear_schema()
- return self.execute_command("GRAPH.DELETE", self.name)
+ return self.execute_command(DELETE_CMD, self.name)
# declared here, to override the built in redis.db.flush()
def flush(self):
@@ -146,7 +150,7 @@ class GraphCommands:
3. The issued query.
4. The amount of time needed for its execution, in milliseconds.
"""
- return self.execute_command("GRAPH.SLOWLOG", self.name)
+ return self.execute_command(SLOWLOG_CMD, self.name)
def config(self, name, value=None, set=False):
"""
@@ -170,14 +174,14 @@ class GraphCommands:
raise DataError(
"``value`` can be provided only when ``set`` is True"
) # noqa
- return self.execute_command("GRAPH.CONFIG", *params)
+ return self.execute_command(CONFIG_CMD, *params)
def list_keys(self):
"""
Lists all graph keys in the keyspace.
For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa
"""
- return self.execute_command("GRAPH.LIST")
+ return self.execute_command(LIST_CMD)
def execution_plan(self, query, params=None):
"""
@@ -188,10 +192,9 @@ class GraphCommands:
query: the query that will be executed
params: query parameters
"""
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
- plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
+ plan = self.execute_command(EXPLAIN_CMD, self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)
@@ -206,8 +209,105 @@ class GraphCommands:
query: the query that will be executed
params: query parameters
"""
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
+
+ plan = self.execute_command(EXPLAIN_CMD, self.name, query)
+ return ExecutionPlan(plan)
+
+
+class AsyncGraphCommands(GraphCommands):
+ async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
+ """
+ Executes a query against the graph.
+ For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa
+
+ Args:
+
+ q : str
+ The query.
+ params : dict
+ Query parameters.
+ timeout : int
+ Maximum runtime for read queries in milliseconds.
+ read_only : bool
+ Executes a readonly query if set to True.
+ profile : bool
+ Return details on results produced by and time
+ spent in each operation.
+ """
+
+ # maintain original 'q'
+ query = q
+
+ # handle query parameters
+ query = self._build_params_header(params) + query
+
+ # construct query command
+ # ask for compact result-set format
+ # specify known graph version
+ if profile:
+ cmd = PROFILE_CMD
+ else:
+ cmd = RO_QUERY_CMD if read_only else QUERY_CMD
+ command = [cmd, self.name, query, "--compact"]
+
+ # include timeout is specified
+ if isinstance(timeout, int):
+ command.extend(["timeout", timeout])
+ elif timeout is not None:
+ raise Exception("Timeout argument must be a positive integer")
+
+ # issue query
+ try:
+ response = await self.execute_command(*command)
+ return await AsyncQueryResult().initialize(self, response, profile)
+ except ResponseError as e:
+ if "unknown command" in str(e) and read_only:
+ # `GRAPH.RO_QUERY` is unavailable in older versions.
+ return await self.query(q, params, timeout, read_only=False)
+ raise e
+ except VersionMismatchException as e:
+ # client view over the graph schema is out of sync
+ # set client version and refresh local schema
+ self.version = e.version
+ self._refresh_schema()
+ # re-issue query
+ return await self.query(q, params, timeout, read_only)
+
+ async def execution_plan(self, query, params=None):
+ """
+ Get the execution plan for given query,
+ GRAPH.EXPLAIN returns an array of operations.
+
+ Args:
+ query: the query that will be executed
+ params: query parameters
+ """
+ query = self._build_params_header(params) + query
- plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
+ plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
+ if isinstance(plan[0], bytes):
+ plan = [b.decode() for b in plan]
+ return "\n".join(plan)
+
+ async def explain(self, query, params=None):
+ """
+ Get the execution plan for given query,
+ GRAPH.EXPLAIN returns ExecutionPlan object.
+
+ Args:
+ query: the query that will be executed
+ params: query parameters
+ """
+ query = self._build_params_header(params) + query
+
+ plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
return ExecutionPlan(plan)
+
+ async def flush(self):
+ """
+ Commit the graph and reset the edges and the nodes to zero length.
+ """
+ await self.commit()
+ self.nodes = {}
+ self.edges = []