# This file is part of pymarc. It is subject to the license terms in the
# LICENSE file found in the top-level directory of this distribution and at
# https://opensource.org/licenses/BSD-2-Clause. pymarc may be copied, modified,
# propagated, or distributed according to the terms contained in the LICENSE
# file.
"""Pymarc Record."""
import json
import logging
import re
import unicodedata
import warnings
from typing import List, Optional, Dict, Tuple, Any, Pattern
from pymarc.constants import DIRECTORY_ENTRY_LEN, END_OF_RECORD, LEADER_LEN
from pymarc.exceptions import (
BadSubfieldCodeWarning,
BaseAddressInvalid,
BaseAddressNotFound,
FieldNotFound,
MissingLinkedFields,
NoFieldsFound,
RecordDirectoryInvalid,
RecordLeaderInvalid,
TruncatedRecord,
)
from pymarc.field import (
END_OF_FIELD,
SUBFIELD_INDICATOR,
Field,
RawField,
Subfield,
map_marc8_field,
)
from pymarc.leader import Leader
from pymarc.marc8 import marc8_to_unicode
isbn_regex: Pattern = re.compile(r"([0-9\-xX]+)")
[docs]
class Record:
"""A class for representing a MARC record.
Each Record object is made up of multiple Field objects. You'll probably want to look
at the docs for :class:`Field <pymarc.record.Field>` to see how to fully use a Record
object.
Basic usage:
.. code-block:: python
field = Field(
tag = '245',
indicators = ['0','1'],
subfields = [
Subfield(code='a', value='The pragmatic programmer : '),
Subfield(code='b', value='from journeyman to master /'),
Subfield(code='c', value='Andrew Hunt, David Thomas.'),
])
record.add_field(field)
Or creating a record from a chunk of MARC in transmission format:
.. code-block:: python
record = Record(data=chunk)
Or getting a record as serialized MARC21.
.. code-block:: python
raw = record.as_marc()
You'll normally want to use a MARCReader object to iterate through
MARC records in a file.
"""
__slots__ = ("leader", "fields", "pos", "force_utf8", "to_unicode", "__pos")
def __init__(
self,
data: str = "",
to_unicode: bool = True,
force_utf8: bool = False,
hide_utf8_warnings: bool = False,
utf8_handling: str = "strict",
leader: str = " " * LEADER_LEN,
file_encoding: str = "iso8859-1",
) -> None:
"""Initialize a Record."""
self.leader: Any = Leader(leader[0:10] + "22" + leader[12:20] + "4500")
self.fields: List = list()
self.pos: int = 0
self.force_utf8: bool = force_utf8
self.to_unicode: bool = to_unicode
if len(data) > 0:
self.decode_marc(
data,
to_unicode=to_unicode,
force_utf8=force_utf8,
hide_utf8_warnings=hide_utf8_warnings,
utf8_handling=utf8_handling,
encoding=file_encoding,
)
elif force_utf8:
self.leader = self.leader[0:9] + "a" + self.leader[10:]
def __str__(self) -> str:
"""Will return a prettified version of the record in MARCMaker format.
See :func:`Field.__str__() <pymarc.record.Field.__str__>` for more information.
"""
# join is significantly faster than concatenation
text_list: List = [f"=LDR {self.leader}"]
text_list.extend([str(field) for field in self.fields])
text: str = "\n".join(text_list) + "\n"
return text
[docs]
def get(self, tag: str, default: Optional[Field] = None) -> Optional[Field]:
"""Implements a dict-like get with a default value.
If `tag` is not found, then the default value will be returned.
The default value should be a Field instance.
.. code-block:: python
# returns None if 999 not in record.
record.get('999')
# returns the default if 999 not in record.
record.get('999', Field(tag="999", indicators=[" ", " "]))
"""
try:
return self[tag]
except KeyError:
return default
def __getitem__(self, tag: str) -> Field:
"""Allows a shorthand lookup by tag.
Follows Python behavior and raises KeyError if `tag` is not in the record.
.. code-block:: python
record['245']
"""
if tag not in self:
raise KeyError
fields: List[Field] = self.get_fields(tag)
if len(fields) == 0:
raise KeyError
return fields[0]
def __contains__(self, tag: str) -> bool:
"""Allows a shorthand test of tag membership.
.. code-block:: python
'245' in record
"""
for f in self.fields:
if f.tag == tag:
return True
return False
def __iter__(self):
self.__pos = 0
return self
def __next__(self) -> Field:
if self.__pos >= len(self.fields):
raise StopIteration
self.__pos += 1
return self.fields[self.__pos - 1]
[docs]
def add_field(self, *fields):
"""Add pymarc.Field objects to a Record object.
Optionally you can pass in multiple fields.
"""
self.fields.extend(fields)
[docs]
def add_grouped_field(self, *fields) -> None:
"""Add pymarc.Field objects to a Record object and sort them "grouped".
Which means, attempting to maintain a loose numeric order per the MARC standard
for "Organization of the record" (http://www.loc.gov/marc/96principl.html).
Optionally you can pass in multiple fields.
"""
for f in fields:
if len(self.fields) == 0 or not f.tag.isdigit():
self.fields.append(f)
continue
self._sort_fields(f, "grouped")
[docs]
def add_ordered_field(self, *fields) -> None:
"""Add pymarc.Field objects to a Record object and sort them "ordered".
Which means, attempting to maintain a strict numeric order.
Optionally you can pass in multiple fields.
"""
for f in fields:
if len(self.fields) == 0 or not f.tag.isdigit():
self.fields.append(f)
continue
self._sort_fields(f, "ordered")
def _sort_fields(self, field: Field, mode: str) -> None:
"""Sort fields by `mode`."""
if mode == "grouped":
tag = int(field.tag[0])
else:
tag = int(field.tag)
i, last_tag = 0, 0
for selff in self.fields:
i += 1
if not selff.tag.isdigit():
self.fields.insert(i - 1, field)
break
if mode == "grouped":
last_tag = int(selff.tag[0])
else:
last_tag = int(selff.tag)
if last_tag > tag:
self.fields.insert(i - 1, field)
break
if len(self.fields) == i:
self.fields.append(field)
break
[docs]
def remove_field(self, *fields) -> None:
"""Remove one or more pymarc.Field objects from a Record object."""
for f in fields:
try:
self.fields.remove(f)
except ValueError:
raise FieldNotFound
[docs]
def remove_fields(self, *tags) -> None:
"""Remove all the fields with the tags passed to the function.
.. code-block:: python
# remove all the fields marked with tags '200' or '899'.
self.remove_fields('200', '899')
"""
self.fields[:] = (field for field in self.fields if field.tag not in tags)
[docs]
def get_fields(self, *args) -> List[Field]:
"""Return a list of all the fields in a record tags matching `args`.
.. code-block:: python
title = record.get_fields('245')
If no fields with the specified tag are found then an empty list is returned.
If you are interested in more than one tag you can pass it as multiple arguments.
.. code-block:: python
subjects = record.get_fields('600', '610', '650')
If no tag is passed in to get_fields() a list of all the fields will be
returned.
"""
if len(args) == 0:
return self.fields
return [f for f in self.fields if f.tag in args]
[docs]
def get_linked_fields(self, field: Field) -> List[Field]:
"""Given a field that is not an 880, retrieve a list of any linked 880 fields."""
num = field.linkage_occurrence_num()
fields = self.get_fields("880")
linked_fields = list(
filter(lambda f: f.linkage_occurrence_num() == num, fields)
)
if num is not None and not linked_fields:
raise MissingLinkedFields(field)
return linked_fields
[docs]
def decode_marc(
self,
marc,
to_unicode: bool = True,
force_utf8: bool = False,
hide_utf8_warnings: bool = False,
utf8_handling: str = "strict",
encoding: str = "iso8859-1",
) -> None:
"""Populate the object based on the `marc`` record in transmission format.
The Record constructor actually uses decode_marc() behind the scenes when you
pass in a chunk of MARC data to it.
"""
# extract record leader
self.leader = marc[0:LEADER_LEN].decode("ascii")
if len(self.leader) != LEADER_LEN:
raise RecordLeaderInvalid
if self.leader[9] == "a" or self.force_utf8:
encoding = "utf-8"
# extract the byte offset where the record data starts
base_address = int(marc[12:17])
if base_address <= 0:
raise BaseAddressNotFound
if base_address >= len(marc):
raise BaseAddressInvalid
if len(marc) < int(self.leader[:5]):
raise TruncatedRecord
# extract directory, base_address-1 is used since the
# director ends with an END_OF_FIELD byte
directory = marc[LEADER_LEN : base_address - 1].decode("ascii")
# determine the number of fields in record
if len(directory) % DIRECTORY_ENTRY_LEN != 0:
raise RecordDirectoryInvalid
field_total: int = len(directory) // DIRECTORY_ENTRY_LEN
# add fields to our record using directory offsets
field_count: int = 0
while field_count < field_total:
entry_start = field_count * DIRECTORY_ENTRY_LEN
entry_end = entry_start + DIRECTORY_ENTRY_LEN
entry = directory[entry_start:entry_end]
entry_tag = entry[0:3]
entry_length = int(entry[3:7])
entry_offset = int(entry[7:12])
entry_data = marc[
base_address
+ entry_offset : base_address
+ entry_offset
+ entry_length
- 1
]
# assume controlfields are numeric; replicates ruby-marc behavior
if entry_tag < "010" and entry_tag.isdigit():
if to_unicode:
field = Field(tag=entry_tag, data=entry_data.decode(encoding))
else:
field = RawField(tag=entry_tag, data=entry_data)
else:
subfields = list()
subs = entry_data.split(SUBFIELD_INDICATOR.encode("ascii"))
# The MARC spec requires there to be two indicators in a
# field. However experience in the wild has shown that
# indicators are sometimes missing, and sometimes there
# are too many. Rather than throwing an exception because
# we can't find what we want and rejecting the field, or
# barfing on the whole record we'll try to use what we can
# find. This means missing indicators will be recorded as
# blank spaces, and any more than 2 are dropped on the floor.
first_indicator = second_indicator = " "
subs[0] = subs[0].decode("ascii")
if len(subs[0]) == 0:
logging.warning("missing indicators: %s", entry_data)
first_indicator = second_indicator = " "
elif len(subs[0]) == 1:
logging.warning("only 1 indicator found: %s", entry_data)
first_indicator = subs[0][0]
second_indicator = " "
elif len(subs[0]) > 2:
logging.warning("more than 2 indicators found: %s", entry_data)
first_indicator = subs[0][0]
second_indicator = subs[0][1]
else:
first_indicator = subs[0][0]
second_indicator = subs[0][1]
for subfield in subs[1:]:
skip_bytes = 1
if len(subfield) == 0:
continue
try:
code = subfield[0:1].decode("ascii")
except UnicodeDecodeError:
warnings.warn(BadSubfieldCodeWarning())
code, skip_bytes = normalize_subfield_code(subfield)
data = subfield[skip_bytes:]
if to_unicode:
if self.leader[9] == "a" or force_utf8:
data = data.decode("utf-8", utf8_handling)
elif encoding == "iso8859-1":
data = marc8_to_unicode(data, hide_utf8_warnings)
else:
data = data.decode(encoding)
coded = Subfield(code=code, value=data)
subfields.append(coded)
if to_unicode:
field = Field(
tag=entry_tag,
indicators=[first_indicator, second_indicator],
subfields=subfields,
)
else:
field = RawField(
tag=entry_tag,
indicators=[first_indicator, second_indicator],
subfields=subfields,
)
self.add_field(field)
field_count += 1
if field_count == 0:
raise NoFieldsFound
[docs]
def as_marc(self) -> bytes:
"""Returns the record serialized as MARC21."""
fields = b""
directory = b""
offset = 0
if self.to_unicode:
if isinstance(self.leader, Leader):
self.leader.coding_scheme = "a"
else:
self.leader = self.leader[0:9] + "a" + self.leader[10:]
# build the directory
# each element of the directory includes the tag, the byte length of
# the field and the offset from the base address where the field data
# can be found
if self.leader[9] == "a" or self.force_utf8:
encoding = "utf-8"
else:
encoding = "iso8859-1"
for field in self.fields:
if isinstance(field, RawField):
field_data = field.as_marc()
else:
field_data = field.as_marc(encoding=encoding)
fields += field_data
if field.tag.isdigit():
directory += ("%03d" % int(field.tag)).encode(encoding)
else:
directory += ("%03s" % field.tag).encode(encoding)
directory += ("%04d%05d" % (len(field_data), offset)).encode(encoding)
offset += len(field_data)
# directory ends with an end of field
directory += END_OF_FIELD.encode(encoding)
# field data ends with an end of record
fields += END_OF_RECORD.encode(encoding)
# the base address where the directory ends and the field data begins
base_address = LEADER_LEN + len(directory)
# figure out the length of the record
record_length = base_address + len(fields)
# update the leader with the current record length and base address
# the lengths are fixed width and zero padded
strleader = f"{record_length:0>5}{self.leader[5:12]}{base_address:0>5}{self.leader[17:]}"
leader = strleader.encode(encoding)
return leader + directory + fields
# alias for backwards compatibility
as_marc21 = as_marc
[docs]
def as_dict(self) -> Dict[str, str]:
"""Turn a MARC record into a dictionary, which is used for ``as_json``."""
record: Dict = {"leader": str(self.leader), "fields": []}
for field in self:
if field.is_control_field():
record["fields"].append({field.tag: field.data})
else:
record["fields"].append(
{
field.tag: {
"ind1": field.indicator1,
"ind2": field.indicator2,
"subfields": [{s.code: s.value} for s in field.subfields],
}
}
)
return record # as dict
[docs]
def as_json(self, **kwargs) -> str:
"""Serialize a record as JSON.
See:
https://web.archive.org/web/20151112001548/http://dilettantes.code4lib.org/blog/2010/09/a-proposal-to-serialize-marc-in-json
"""
return json.dumps(self.as_dict(), **kwargs)
@property
def title(self) -> Optional[str]:
"""Returns the title of the record (245 $a and $b)."""
title_field: Optional[Field] = self.get("245")
if not title_field:
return None
title: Optional[str] = title_field.get("a")
if title:
subtitle = title_field.get("b")
if subtitle:
title += f" {subtitle}"
return title
@property
def issn_title(self) -> Optional[str]:
"""Returns the key title of the record (222 $a and $b)."""
title_field: Optional[Field] = self.get("222")
if not title_field:
return None
title: Optional[str] = title_field.get("a")
if title:
subtitle = title_field.get("b")
if subtitle:
title += f" {subtitle}"
return title
@property
def isbn(self) -> Optional[str]:
"""Returns the first ISBN in the record or None if one is not present.
The returned ISBN will be all numeric, except for an
x/X which may occur in the checksum position. Dashes and
extraneous information will be automatically removed. If you need
this information you'll want to look directly at the 020 field,
e.g. record['020']['a']. Values that do not match the regex will not
be returned.
"""
isbn_field: Optional[Field] = self.get("020")
if not isbn_field:
return None
isbn_number: Optional[str] = isbn_field.get("a")
if not isbn_number:
return None
match = isbn_regex.search(isbn_number) # type: ignore
if match:
return match.group(1).replace("-", "")
return None
@property
def issn(self) -> Optional[str]:
"""Returns the ISSN number [022]['a'] in the record or None."""
field = self.get("022")
return field.get("a") if (field and "a" in field) else None
@property
def issnl(self) -> Optional[str]:
"""Returns the ISSN-L number [022]['l'] of the record or None."""
field = self.get("022")
return field["l"] if (field and "l" in field) else None
@property
def sudoc(self) -> Optional[str]:
"""Returns a Superintendent of Documents (SuDoc) classification number.
Note: More information can be found at the following URL:
https://www.fdlp.gov/classification-guidelines/introduction-to-the-classification-guidelines
"""
field = self.get("086")
return field.format_field() if field else None
@property
def author(self) -> Optional[str]:
"""Returns the author from field 100, 110 or 111."""
field = self.get("100") or self.get("110") or self.get("111")
return field.format_field() if field else None
@property
def uniformtitle(self) -> Optional[str]:
"""Returns the uniform title from field 130 or 240."""
field = self.get("130") or self.get("240")
return field.format_field() if field else None
@property
def series(self) -> List[Field]:
"""Returns series fields.
Note: 490 supersedes the 440 series statement which was both
series statement and added entry. 8XX fields are added entries.
"""
return self.get_fields("440", "490", "800", "810", "811", "830")
@property
def subjects(self) -> List[Field]:
"""Returns subjects fields.
Note: Fields 690-699 are considered "local" added entry fields but
occur with some frequency in OCLC and RLIN records.
"""
# fmt: off
return self.get_fields(
"600", "610", "611", "630", "648", "650", "651", "653", "654", "655",
"656", "657", "658", "662", "690", "691", "696", "697", "698", "699",
)
# fmt: on
@property
def addedentries(self) -> List[Field]:
"""Returns Added entries fields.
Note: Fields 790-799 are considered "local" added entry fields but
occur with some frequency in OCLC and RLIN records.
"""
# fmt: off
return self.get_fields(
"700", "710", "711", "720", "730", "740", "752", "753", "754", "790",
"791", "792", "793", "796", "797", "798", "799",
)
# fmt: on
@property
def location(self) -> List[Field]:
"""Returns location field (852)."""
return self.get_fields("852")
@property
def notes(self) -> List[Field]:
"""Return notes fields (all 5xx fields)."""
# fmt: off
return self.get_fields(
"500", "501", "502", "504", "505", "506", "507", "508", "510", "511",
"513", "514", "515", "516", "518", "520", "521", "522", "524", "525",
"526", "530", "533", "534", "535", "536", "538", "540", "541", "544",
"545", "546", "547", "550", "552", "555", "556", "561", "562", "563",
"565", "567", "580", "581", "583", "584", "585", "586", "590", "591",
"592", "593", "594", "595", "596", "597", "598", "599",
)
# fmt: on
@property
def physicaldescription(self) -> List[Field]:
"""Return physical description fields (300)."""
return self.get_fields("300")
@property
def publisher(self) -> Optional[str]:
"""Return publisher from 260 or 264.
Note: 264 field with second indicator '1' indicates publisher.
"""
for f in self.get_fields("260", "264"):
if f.tag == "260":
return f.get("b")
if f.tag == "264" and f.indicator2 == "1":
return f.get("b")
return None
@property
def pubyear(self) -> Optional[str]:
"""Returns publication year from 260 or 264."""
for f in self.get_fields("260", "264"):
if f.tag == "260":
return f.get("c") # type: ignore
if f.tag == "264" and f.indicator2 == "1":
return f.get("c") # type: ignore
return None
[docs]
def map_marc8_record(record: Record) -> Record:
"""Map MARC-8 record."""
record.fields = [map_marc8_field(field) for field in record.fields]
leader: List[str] = list(record.leader)
leader[9] = "a" # see http://www.loc.gov/marc/specifications/speccharucs.html
record.leader = "".join(leader)
return record
[docs]
def normalize_subfield_code(subfield) -> Tuple[Any, int]:
"""Normalize subfield code."""
skip_bytes: int = 1
try:
text_subfield = subfield.decode("utf-8")
skip_bytes = len(text_subfield[0].encode("utf-8"))
except UnicodeDecodeError:
text_subfield = subfield.decode("latin-1")
decomposed = unicodedata.normalize("NFKD", text_subfield)
without_diacritics = decomposed.encode("ascii", "ignore").decode("ascii")
return without_diacritics[0], skip_bytes