Source code for pymarc.reader

# This file is part of pymarc. It is subject to the license terms in the
# LICENSE file found in the top-level directory of this distribution and at
# https://opensource.org/licenses/BSD-2-Clause. pymarc may be copied, modified,
# propagated, or distributed according to the terms contained in the LICENSE
# file.

"""Pymarc Reader."""
import os
import sys
import json

from io import IOBase, BytesIO, StringIO
from typing import Callable, BinaryIO, IO, Iterator, Union

from pymarc.constants import END_OF_RECORD
from pymarc import Record, Field, Subfield
from pymarc import exceptions


[docs] class Reader: """A base class for all iterating readers in the pymarc package.""" def __iter__(self): return self
[docs] class MARCReader(Reader): """An iterator class for reading a file of MARC21 records. Simple usage: .. code-block:: python from pymarc import MARCReader ## pass in a file object reader = MARCReader(open('file.dat', 'rb')) for record in reader: ... ## pass in marc in transmission format reader = MARCReader(rawmarc) for record in reader: ... If you would like to have your Record object contain unicode strings use the to_unicode parameter: .. code-block:: python reader = MARCReader(open('file.dat', 'rb'), to_unicode=True) This will decode from MARC-8 or utf-8 depending on the value in the MARC leader at position 9. Upon serialization of the Record object to MARC21, the resulting output will be utf-8 encoded and the value in the MARC leader at position 9 will be set appropriately to indicate the change of character encoding. If you find yourself in the unfortunate position of having data that is utf-8 encoded without the leader set appropriately you can use the force_utf8 parameter: .. code-block:: python reader = MARCReader(open('file.dat', 'rb'), to_unicode=True, force_utf8=True) If you find yourself in the unfortunate position of having data that is mostly utf-8 encoded but with a few non-utf-8 characters, you can also use the utf8_handling parameter, which takes the same values ('strict', 'replace', and 'ignore') as the Python Unicode codecs (see http://docs.python.org/library/codecs.html for more info). Although, it's not legal in MARC-21 to use anything but MARC-8 or UTF-8, but if you have a file in incorrect encode and you know what it is, you can try to use your encode in parameter "file_encoding". MARCReader parses data in a permissive way and gives the user full control on what to do in case wrong record is encountered. Whenever any error is found reader returns ``None`` instead of regular record object. The exception information and corresponding data are available through reader.current_exception and reader.current_chunk properties: .. code-block:: python reader = MARCReader(open('file.dat', 'rb')) for record in reader: if record is None: print( "Current chunk: ", reader.current_chunk, " was ignored because the following exception raised: ", reader.current_exception ) else: # do something with record """ _current_chunk = None _current_exception = None file_handle: IO @property def current_chunk(self): """Current chunk.""" return self._current_chunk @property def current_exception(self): """Current exception.""" return self._current_exception def __init__( self, marc_target: Union[BinaryIO, bytes], to_unicode: bool = True, force_utf8: bool = False, hide_utf8_warnings: bool = False, utf8_handling: str = "strict", file_encoding: str = "iso8859-1", permissive: bool = False, ) -> None: """The constructor to which you can pass either raw marc or a file-like object. Basically the argument you pass in should be raw MARC in transmission format or an object that responds to read(). """ super(MARCReader, self).__init__() self.to_unicode = to_unicode self.force_utf8 = force_utf8 self.hide_utf8_warnings = hide_utf8_warnings self.utf8_handling = utf8_handling self.file_encoding = file_encoding self.permissive = permissive if isinstance(marc_target, bytes): self.file_handle = BytesIO(marc_target) else: self.file_handle = marc_target
[docs] def close(self) -> None: """Close the handle.""" self.file_handle.close()
def __next__(self): """Read and parse the next record.""" if self._current_exception: if isinstance(self._current_exception, exceptions.FatalReaderError): raise StopIteration self._current_chunk = None self._current_exception = None self._current_chunk = first5 = self.file_handle.read(5) if not first5: raise StopIteration if len(first5) < 5: self._current_exception = exceptions.TruncatedRecord() return None try: length = int(first5) except ValueError: self._current_exception = exceptions.RecordLengthInvalid() return None chunk = self.file_handle.read(length - 5) chunk = first5 + chunk self._current_chunk = chunk if len(self._current_chunk) < length: self._current_exception = exceptions.TruncatedRecord() return None if self._current_chunk[-1] != ord(END_OF_RECORD): self._current_exception = exceptions.EndOfRecordNotFound() return None try: return Record( chunk, to_unicode=self.to_unicode, force_utf8=self.force_utf8, hide_utf8_warnings=self.hide_utf8_warnings, utf8_handling=self.utf8_handling, file_encoding=self.file_encoding, ) except Exception as ex: self._current_exception = ex
[docs] def map_records(f: Callable, *files: BytesIO) -> None: """Applies a given function to each record in a batch. You can pass in multiple batches. .. code-block:: python def print_title(r): print(r['245']) map_records(print_title, file('marc.dat')) """ for file in files: list(map(f, MARCReader(file)))
[docs] class JSONReader(Reader): """JSON Reader.""" file_handle: IO def __init__( self, marc_target: Union[bytes, str], encoding: str = "utf-8", stream: bool = False, ) -> None: """The constructor to which you can pass either raw marc or a file-like object. Basically the argument you pass in should be raw JSON in transmission format or an object that responds to read(). """ self.encoding = encoding if isinstance(marc_target, IOBase): self.file_handle = marc_target else: if isinstance(marc_target, str) and os.path.exists(marc_target): self.file_handle = open(marc_target, "r") else: self.file_handle = StringIO(marc_target) # type: ignore if stream: sys.stderr.write( "Streaming not yet implemented, your data will be loaded into memory\n" ) self.records = json.load(self.file_handle, strict=False) def __iter__(self) -> Iterator: if hasattr(self.records, "__iter__") and not isinstance(self.records, dict): self.iter = iter(self.records) else: self.iter = iter([self.records]) return self def __next__(self) -> Iterator: jobj = next(self.iter) rec = Record() rec.leader = jobj["leader"] for field in jobj["fields"]: k, v = list(field.items())[0] if "subfields" in v and hasattr(v, "update"): # flatten m-i-j dict to list in pymarc subfields: list = [] for sub in v["subfields"]: for code, value in sub.items(): subfields.append(Subfield(code=code, value=value)) fld = Field( tag=k, subfields=subfields, indicators=[v["ind1"], v["ind2"]] ) else: fld = Field(tag=k, data=v) rec.add_field(fld) return rec