diff options
author | Nicholas Car <nick@kurrawong.net> | 2022-08-13 05:26:04 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-12 21:26:04 +0200 |
commit | a4b930553118bc835ba424f5ebf79e53053890b5 (patch) | |
tree | 7527d66c1c30f6b6aa63fac6e63e2c5ff5f504fb /rdflib/tools | |
parent | 131d9e66e8515aa81d776969d42f58c72bc68f86 (diff) | |
download | rdflib-a4b930553118bc835ba424f5ebf79e53053890b5.tar.gz |
add chunk serializer & tests (#1968)
This file provides a single function `serialize_in_chunks()` which can serialize a
Graph into a number of NT files with a maximum number of triples or maximum file size.
There is an option to preserve any prefixes declared for the original graph in the first
file, which will be a Turtle file.
Co-authored-by: Iwan Aucamp <aucampia@gmail.com>
Diffstat (limited to 'rdflib/tools')
-rw-r--r-- | rdflib/tools/chunk_serializer.py | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/rdflib/tools/chunk_serializer.py b/rdflib/tools/chunk_serializer.py new file mode 100644 index 00000000..cb18d399 --- /dev/null +++ b/rdflib/tools/chunk_serializer.py @@ -0,0 +1,132 @@ +""" +This file provides a single function `serialize_in_chunks()` which can serialize a +Graph into a number of NT files with a maximum number of triples or maximum file size. + +There is an option to preserve any prefixes declared for the original graph in the first +file, which will be a Turtle file. +""" + +from contextlib import ExitStack, contextmanager +from pathlib import Path +from typing import TYPE_CHECKING, BinaryIO, Generator, Optional, Tuple + +from rdflib.graph import Graph +from rdflib.plugins.serializers.nt import _nt_row + +# from rdflib.term import Literal + +# if TYPE_CHECKING: +# from rdflib.graph import _TriplePatternType + +__all__ = ["serialize_in_chunks"] + + +def serialize_in_chunks( + g: Graph, + max_triples: int = 10000, + max_file_size_kb: Optional[int] = None, + file_name_stem: str = "chunk", + output_dir: Optional[Path] = None, + write_prefixes: bool = False, +) -> None: + """ + Serializes a given Graph into a series of n-triples with a given length. + + :param g: + The graph to serialize. + + :param max_file_size_kb: + Maximum size per NT file in kB (1,000 bytes) + Equivalent to ~6,000 triples, depending on Literal sizes. + + :param max_triples: + Maximum size per NT file in triples + Equivalent to lines in file. + + If both this parameter and max_file_size_kb are set, max_file_size_kb will be used. + + :param file_name_stem: + Prefix of each file name. + e.g. "chunk" = chunk_000001.nt, chunk_000002.nt... + + :param output_dir: + The directory you want the files to be written to. + + :param write_prefixes: + The first file created is a Turtle file containing original graph prefixes. + + + See ``../test/test_tools/test_chunk_serializer.py`` for examples of this in use. + """ + + if output_dir is None: + output_dir = Path.cwd() + + if not output_dir.is_dir(): + raise ValueError( + "If you specify an output_dir, it must actually be a directory!" + ) + + @contextmanager + def _start_new_file(file_no: int) -> Generator[Tuple[Path, BinaryIO], None, None]: + if TYPE_CHECKING: + # this is here because mypy gets a bit confused + assert output_dir is not None + fp = Path(output_dir) / f"{file_name_stem}_{str(file_no).zfill(6)}.nt" + with open(fp, "wb") as fh: + yield fp, fh + + def _serialize_prefixes(g: Graph) -> str: + pres = [] + for k, v in g.namespace_manager.namespaces(): + pres.append(f"PREFIX {k}: <{v}>") + + return "\n".join(sorted(pres)) + "\n" + + if write_prefixes: + with open( + Path(output_dir) / f"{file_name_stem}_000000.ttl", "w", encoding="utf-8" + ) as fh: + fh.write(_serialize_prefixes(g)) + + bytes_written = 0 + with ExitStack() as xstack: + if max_file_size_kb is not None: + max_file_size = max_file_size_kb * 1000 + file_no = 1 if write_prefixes else 0 + for i, t in enumerate(g.triples((None, None, None))): + row_bytes = _nt_row(t).encode("utf-8") + if len(row_bytes) > max_file_size: + raise ValueError( + f"cannot write triple {t!r} as it's serialized size of {row_bytes / 1000} exceeds max_file_size_kb = {max_file_size_kb}" + ) + if i == 0: + fp, fhb = xstack.enter_context(_start_new_file(file_no)) + bytes_written = 0 + elif (bytes_written + len(row_bytes)) >= max_file_size: + file_no += 1 + fp, fhb = xstack.enter_context(_start_new_file(file_no)) + bytes_written = 0 + + bytes_written += fhb.write(row_bytes) + + else: + # count the triples in the graph + graph_length = len(g) + + if graph_length <= max_triples: + # the graph is less than max so just NT serialize the whole thing + g.serialize( + destination=Path(output_dir) / f"{file_name_stem}_all.nt", + format="nt", + ) + else: + # graph_length is > max_lines, make enough files for all graph + # no_files = math.ceil(graph_length / max_triples) + file_no = 1 if write_prefixes else 0 + for i, t in enumerate(g.triples((None, None, None))): + if i % max_triples == 0: + fp, fhb = xstack.enter_context(_start_new_file(file_no)) + file_no += 1 + fhb.write(_nt_row(t).encode("utf-8")) + return |