summaryrefslogtreecommitdiff
path: root/rdflib/tools
diff options
context:
space:
mode:
authorNicholas Car <nick@kurrawong.net>2022-08-13 05:26:04 +1000
committerGitHub <noreply@github.com>2022-08-12 21:26:04 +0200
commita4b930553118bc835ba424f5ebf79e53053890b5 (patch)
tree7527d66c1c30f6b6aa63fac6e63e2c5ff5f504fb /rdflib/tools
parent131d9e66e8515aa81d776969d42f58c72bc68f86 (diff)
downloadrdflib-a4b930553118bc835ba424f5ebf79e53053890b5.tar.gz
add chunk serializer & tests (#1968)
This file provides a single function `serialize_in_chunks()` which can serialize a Graph into a number of NT files with a maximum number of triples or maximum file size. There is an option to preserve any prefixes declared for the original graph in the first file, which will be a Turtle file. Co-authored-by: Iwan Aucamp <aucampia@gmail.com>
Diffstat (limited to 'rdflib/tools')
-rw-r--r--rdflib/tools/chunk_serializer.py132
1 files changed, 132 insertions, 0 deletions
diff --git a/rdflib/tools/chunk_serializer.py b/rdflib/tools/chunk_serializer.py
new file mode 100644
index 00000000..cb18d399
--- /dev/null
+++ b/rdflib/tools/chunk_serializer.py
@@ -0,0 +1,132 @@
+"""
+This file provides a single function `serialize_in_chunks()` which can serialize a
+Graph into a number of NT files with a maximum number of triples or maximum file size.
+
+There is an option to preserve any prefixes declared for the original graph in the first
+file, which will be a Turtle file.
+"""
+
+from contextlib import ExitStack, contextmanager
+from pathlib import Path
+from typing import TYPE_CHECKING, BinaryIO, Generator, Optional, Tuple
+
+from rdflib.graph import Graph
+from rdflib.plugins.serializers.nt import _nt_row
+
+# from rdflib.term import Literal
+
+# if TYPE_CHECKING:
+# from rdflib.graph import _TriplePatternType
+
+__all__ = ["serialize_in_chunks"]
+
+
+def serialize_in_chunks(
+ g: Graph,
+ max_triples: int = 10000,
+ max_file_size_kb: Optional[int] = None,
+ file_name_stem: str = "chunk",
+ output_dir: Optional[Path] = None,
+ write_prefixes: bool = False,
+) -> None:
+ """
+ Serializes a given Graph into a series of n-triples with a given length.
+
+ :param g:
+ The graph to serialize.
+
+ :param max_file_size_kb:
+ Maximum size per NT file in kB (1,000 bytes)
+ Equivalent to ~6,000 triples, depending on Literal sizes.
+
+ :param max_triples:
+ Maximum size per NT file in triples
+ Equivalent to lines in file.
+
+ If both this parameter and max_file_size_kb are set, max_file_size_kb will be used.
+
+ :param file_name_stem:
+ Prefix of each file name.
+ e.g. "chunk" = chunk_000001.nt, chunk_000002.nt...
+
+ :param output_dir:
+ The directory you want the files to be written to.
+
+ :param write_prefixes:
+ The first file created is a Turtle file containing original graph prefixes.
+
+
+ See ``../test/test_tools/test_chunk_serializer.py`` for examples of this in use.
+ """
+
+ if output_dir is None:
+ output_dir = Path.cwd()
+
+ if not output_dir.is_dir():
+ raise ValueError(
+ "If you specify an output_dir, it must actually be a directory!"
+ )
+
+ @contextmanager
+ def _start_new_file(file_no: int) -> Generator[Tuple[Path, BinaryIO], None, None]:
+ if TYPE_CHECKING:
+ # this is here because mypy gets a bit confused
+ assert output_dir is not None
+ fp = Path(output_dir) / f"{file_name_stem}_{str(file_no).zfill(6)}.nt"
+ with open(fp, "wb") as fh:
+ yield fp, fh
+
+ def _serialize_prefixes(g: Graph) -> str:
+ pres = []
+ for k, v in g.namespace_manager.namespaces():
+ pres.append(f"PREFIX {k}: <{v}>")
+
+ return "\n".join(sorted(pres)) + "\n"
+
+ if write_prefixes:
+ with open(
+ Path(output_dir) / f"{file_name_stem}_000000.ttl", "w", encoding="utf-8"
+ ) as fh:
+ fh.write(_serialize_prefixes(g))
+
+ bytes_written = 0
+ with ExitStack() as xstack:
+ if max_file_size_kb is not None:
+ max_file_size = max_file_size_kb * 1000
+ file_no = 1 if write_prefixes else 0
+ for i, t in enumerate(g.triples((None, None, None))):
+ row_bytes = _nt_row(t).encode("utf-8")
+ if len(row_bytes) > max_file_size:
+ raise ValueError(
+ f"cannot write triple {t!r} as it's serialized size of {row_bytes / 1000} exceeds max_file_size_kb = {max_file_size_kb}"
+ )
+ if i == 0:
+ fp, fhb = xstack.enter_context(_start_new_file(file_no))
+ bytes_written = 0
+ elif (bytes_written + len(row_bytes)) >= max_file_size:
+ file_no += 1
+ fp, fhb = xstack.enter_context(_start_new_file(file_no))
+ bytes_written = 0
+
+ bytes_written += fhb.write(row_bytes)
+
+ else:
+ # count the triples in the graph
+ graph_length = len(g)
+
+ if graph_length <= max_triples:
+ # the graph is less than max so just NT serialize the whole thing
+ g.serialize(
+ destination=Path(output_dir) / f"{file_name_stem}_all.nt",
+ format="nt",
+ )
+ else:
+ # graph_length is > max_lines, make enough files for all graph
+ # no_files = math.ceil(graph_length / max_triples)
+ file_no = 1 if write_prefixes else 0
+ for i, t in enumerate(g.triples((None, None, None))):
+ if i % max_triples == 0:
+ fp, fhb = xstack.enter_context(_start_new_file(file_no))
+ file_no += 1
+ fhb.write(_nt_row(t).encode("utf-8"))
+ return