# This file is part of pymarc. It is subject to the license terms in the
# LICENSE file found in the top-level directory of this distribution and at
# https://opensource.org/licenses/BSD-2-Clause. pymarc may be copied, modified,
# propagated, or distributed according to the terms contained in the LICENSE
# file.
"""From XML to MARC21 and back again."""
import unicodedata
from xml.sax import make_parser
from xml.sax.handler import ContentHandler, feature_namespaces
import xml.etree.ElementTree as ET
from pymarc import Field, MARC8ToUnicode, Record
XSI_NS = "http://www.w3.org/2001/XMLSchema-instance"
MARC_XML_NS = "http://www.loc.gov/MARC21/slim"
MARC_XML_SCHEMA = "http://www.loc.gov/MARC21/slim http://www.loc.gov/standards/marcxml/schema/MARC21slim.xsd"
[docs]
class XmlHandler(ContentHandler):
"""XML Handler.
You can subclass XmlHandler and add your own process_record
method that'll be passed a pymarc.Record as it becomes
available. This could be useful if you want to stream the
records elsewhere (like to a rdbms) without having to store
them all in memory.
"""
def __init__(self, strict=False, normalize_form=None):
"""Initialize XmlHandler.
* `strict` will ignore elements not matching MARC_XML_NS.
* see unicodedata.normalize for valid `normalize_form` values
"""
self.records = []
self._record = None
self._field = None
self._subfield_code = None
self._text = []
self._strict = strict
self.normalize_form = normalize_form
[docs]
def startElementNS(self, name, qname, attrs):
"""Start element NS."""
if self._strict and name[0] != MARC_XML_NS:
return
element = name[1]
self._text = []
if element == "record":
self._record = Record()
elif element == "controlfield":
tag = attrs.getValue((None, "tag"))
self._field = Field(tag)
elif element == "datafield":
tag = attrs.getValue((None, "tag"))
ind1 = attrs.get((None, "ind1"), " ")
ind2 = attrs.get((None, "ind2"), " ")
self._field = Field(tag, [ind1, ind2])
elif element == "subfield":
self._subfield_code = attrs[(None, "code")]
[docs]
def endElementNS(self, name, qname):
"""End element NS."""
if self._strict and name[0] != MARC_XML_NS:
return
element = name[1]
if self.normalize_form is not None:
text = unicodedata.normalize(self.normalize_form, "".join(self._text))
else:
text = "".join(self._text)
if element == "record":
self.process_record(self._record)
self._record = None
elif element == "leader":
self._record.leader = text
elif element == "controlfield":
self._field.data = text
self._record.add_field(self._field)
self._field = None
elif element == "datafield":
self._record.add_field(self._field)
self._field = None
elif element == "subfield":
self._field.add_subfield(self._subfield_code, text)
self._subfield_code = None
self._text = []
[docs]
def characters(self, chars):
"""Append `chars` to `_text`."""
self._text.append(chars)
[docs]
def process_record(self, record):
"""Append `record` to `records`."""
self.records.append(record)
[docs]
def parse_xml(xml_file, handler):
"""Parse a file with a given subclass of xml.sax.handler.ContentHandler."""
parser = make_parser()
parser.setContentHandler(handler)
parser.setFeature(feature_namespaces, 1)
parser.parse(xml_file)
[docs]
def map_xml(function, *files):
"""Map a function onto the file.
So that for each record that is parsed the function will get called with the
extracted record
.. code-block:: python
def do_it(r):
print(r)
map_xml(do_it, 'marc.xml')
"""
handler = XmlHandler()
handler.process_record = function
for xml_file in files:
parse_xml(xml_file, handler)
[docs]
def parse_xml_to_array(xml_file, strict=False, normalize_form=None):
"""Parse an XML file and return the records as an array.
Instead of passing in a file path you can also pass in an open file handle, or a file
like object like StringIO. If you would like the parser to explicitly check the
namespaces for the MARCSlim namespace use the strict=True option. Valid values for
normalize_form are 'NFC', 'NFKC', 'NFD', and 'NFKD'. See
unicodedata.normalize for more info on these.
"""
handler = XmlHandler(strict, normalize_form)
parse_xml(xml_file, handler)
return handler.records
[docs]
def record_to_xml(record, quiet=False, namespace=False):
"""From MARC to XML."""
node = record_to_xml_node(record, quiet, namespace)
return ET.tostring(node)
[docs]
def record_to_xml_node(record, quiet=False, namespace=False):
"""Converts a record object to a chunk of XML.
If you would like to include the marcxml namespace in the root tag set namespace to
True.
"""
# helper for converting non-unicode data to unicode
# TODO: maybe should set g0 and g1 appropriately using 066 $a and $b?
marc8 = MARC8ToUnicode(quiet=quiet)
def translate(data):
if type(data) is str:
return data
else:
return marc8.translate(data)
root = ET.Element("record")
if namespace:
root.set("xmlns", MARC_XML_NS)
root.set("xmlns:xsi", XSI_NS)
root.set("xsi:schemaLocation", MARC_XML_SCHEMA)
leader = ET.SubElement(root, "leader")
leader.text = str(record.leader)
for field in record:
if field.is_control_field():
control_field = ET.SubElement(root, "controlfield")
control_field.set("tag", field.tag)
control_field.text = translate(field.data)
else:
data_field = ET.SubElement(root, "datafield")
data_field.set("ind1", field.indicators[0])
data_field.set("ind2", field.indicators[1])
data_field.set("tag", field.tag)
for subfield in field:
data_subfield = ET.SubElement(data_field, "subfield")
data_subfield.set("code", subfield.code)
data_subfield.text = translate(subfield.value)
return root