"""An experimental XPath-based streaming filter for ElementTree's iterparse
For details see:
http://dalkescientific.com/writings/diary/archive/2006/11/06/iterparse_filter.html
"""
# I have got to rearrange my site to use shorter URLs.
__version__ = "0.9-experimental"
import re
import cElementTree as ET
# define "letter" as "any character except /:[]()@={}* or in \s"
# (XXX make it match the XML spec)
# A URI is:
# letter+
# letter+ ':' letter+ --- a namespace prefixed term, like xml:space
# '{' [^}]* '}' letter+ --- a Clark namespace term, like {http://a}b
# Can also use a '*' in place of a URI or in the tag part of a namespaced field
#
# URIs are separated only be '/' and '//'.
# These may not occur together, eg, '///' is not allowed.
# Basing this tokenization method in part on elementtree.ElementPath
xpath_tokenizer = re.compile( r"""
(// | / ) # separators
| (?: # namespaced term
([^\/\:\[\]\(\)\@\=\{\}\*\s]+) : # namespace
([^\/\:\[\]\(\)\@\=\{\}\*\s]+|\*) # tag
)
| (?:
\{([^}]*)\} # namespace in Clark notation
([^\/\:\[\]\(\)\@\=\{\}\*\s]+|\*) # tag
)
| ([^\/\:\[\]\(\)\@\=\{\}\*\s]+|\*) # tag with no namespace
| (.) # everything else; used to identify errors
""", re.X).findall
# """"" # fix emacs cruft; having too many special characters fools it
def tokenize(s):
pos = 0
for token in xpath_tokenizer(s):
op = token[0]
if op in ("/", "//"):
yield (op, None, pos)
elif token[1]:
yield ("namespace", (token[1], token[2]), pos)
elif token[3]:
yield ("clark", (token[3], token[4]), pos)
elif token[5]:
yield ("default", token[5], pos)
elif token[6]:
raise SyntaxError("Unknown symbol %r at position %d" %
(token[6], pos))
else:
raise AssertionError("Unknown token: %r" % (token,))
def _make_original_tag(op, args):
if op == "namespace":
return "%s:%s" % (args[0], args[1])
if op == "clark":
return "{%s}:%s" % (args[0], args[1])
if op == "default":
return args
raise AssertionError("Strange: %r %r" % (op, args))
def _verify_ordering(tokens):
if not tokens:
raise SyntaxError(
"empty xpath not supported (don't know how to handle that case)")
pos = 0
prev = None
SEP = 1
URI = 2
# Check that the path alternates between separator and uri
for op, args, pos in tokens:
if op in ("/", "//"):
if prev == SEP:
raise SyntaxError(
"separator %r may not follow separator at position %d" %
(op, pos))
prev = SEP
elif op in ("namespace", "clark", "default"):
if prev == URI:
errmsg = _make_original_tag(op, args)
raise SyntaxError(
"%r may not follow a separator at position %d" %
(errormsg, pos))
prev = URI
else:
raise AssertionError("Unknown op: %r, %r, %r" % (op, args, pos))
if tokens[-1][0] == "//":
raise AssertionError("xpath may not end with '//'")
# There are further optimizations. For example, if this
# returned a match function instead of the regex then it
# could special case terms like /blah//* to mean "startswith('/blah/')"
# The small performance advantages for most cases doesn't
# currently warrant the extra work.
def to_regexp(s, namespaces={}, default_namespace=None):
tokens = list(tokenize(s))
_verify_ordering(tokens)
### Process the tokens
re_terms = []
if tokens[0][0] == "/":
re_terms.append("^")
tokens.pop(0)
for op, args, pos in tokens:
if op == "/":
pass
elif op == "//":
re_terms.append("(/[^/]+)*")
elif op in ("namespace", "clark", "default"):
# Break each apart to get the correct namespace and tag
if op == "namespace":
namespace, tag = args
try:
full_namespace = namespaces[namespace]
except KeyError:
raise SyntaxError("Unknown namespace %r at position %d" %
(namespace, pos))
elif op == "clark":
full_namespace, tag = args
elif op == "default":
full_namespace = default_namespace
tag = args
# Figure out which pattern to use for the combination
# of (namespace, namespace==None) x (tag, tag=='*')
if full_namespace is None:
# No namespace specified
if tag == "*":
# Select everything between the /s
re_terms.append("/[^/]+")
else:
# Select exactly the tag, no namespace
re_terms.append("/%s" % (re.escape(tag),))
else:
# namespace specified
if tag == "*":
# Select only fields in the given namespace
re_terms.append("/" +
re.escape("{%s}" % (full_namespace,)) +
"[^/]+")
else:
# Must match namespace and tag, exactly
re_terms.append("/" +
re.escape("{%s}%s" % (full_namespace, tag)))
else:
raise AssertionError("Unknown op %r" % (op,))
# Must be a complete match
re_terms.append("/$")
return "".join(re_terms)
class IterParseFilter(object):
def __init__(self, namespaces=None, default_namespace=None):
if namespaces is None:
namespaces = {}
self.namespaces = namespaces
self.default_namespace = default_namespace
self._start_document_handlers = []
self._end_document_handlers = []
self._start_filters = []
self._end_filters = []
self._default_start_filters = []
self._default_end_filters = []
self._iter_start_filters = []
self._iter_end_filters = []
self._start_ns_handlers = []
self._end_ns_handlers = []
self._iter_start_ns = False
self._iter_end_ns = False
def on_start_document(self, handler):
self._start_document_handlers.append(handler)
def on_end_document(self, handler):
self._end_document_handlers.append(handler)
def _add_handler(self, filters, path, handler):
path_re = to_regexp(path,
namespaces = self.namespaces,
default_namespace = self.default_namespace)
filters.append( (path, re.compile(path_re).search, handler) )
def on_start(self, path, handler):
self._add_handler(self._start_filters, path, handler)
def on_end(self, path, handler):
self._add_handler(self._end_filters, path, handler)
def on_start_default(self, path, handler):
self._add_handler(self._default_start_filters, path, handler)
def on_end_default(self, path, handler):
self._add_handler(self._default_end_filters, path, handler)
def _add_yielder(self, yielders, path):
path_re = to_regexp(path,
namespaces = self.namespaces,
default_namespace = self.default_namespace)
yielders.append( (path, re.compile(path_re).search) )
def iter_start(self, path):
self._add_yielder(self._iter_start_filters, path)
def iter_end(self, path):
self._add_yielder(self._iter_end_filters, path)
def on_start_ns(self, handler):
self._start_ns_handlers.append(handler)
def on_end_ns(self, handler):
self._end_ns_handlers.append(handler)
def iter_start_ns(self):
self._iter_start_ns = True
def iter_end_ns(self):
self._iter_end_ns = True
def _get_filter_info(self, category):
for (_, _, pat, handler) in self.filters[category]:
yield (pat, handler)
def create_fa(self):
# Make copies of everything to emphasize that they must
# not be changed during processing.
return FilterAutomata(
start_document_handlers = self._start_document_handlers,
end_document_handlers = self._end_document_handlers[::-1], # reverse!
start_filters = self._start_filters[:],
end_filters = self._end_filters[::-1], # reversing here!
default_start_filters = self._default_start_filters[:],
default_end_filters = self._default_end_filters[::-1], # reversing!
iter_start_filters = self._iter_start_filters[:],
iter_end_filters = self._iter_end_filters[:],
start_ns_handlers = self._start_ns_handlers[:],
end_ns_handlers = self._end_ns_handlers[::-1], # reversing here!
iter_start_ns = self._iter_start_ns,
iter_end_ns = self._iter_end_ns)
# These forward to the underlying automata; make a new one each time.
def parse(self, file, state=None):
return self.create_fa().parse(file, state)
# Experimental
def iterparse(self, file):
return self.create_fa().iterparse(file)
# I need a better name
def handler_parse(self, file, state=None):
return self.create_fa().handler_parse(file, state)
class FilterAutomata(object):
def __init__(self,
start_document_handlers,
end_document_handlers,
start_filters,
end_filters,
default_start_filters,
default_end_filters,
iter_start_filters,
iter_end_filters,
start_ns_handlers,
end_ns_handlers,
iter_start_ns,
iter_end_ns):
self.start_document_handlers = start_document_handlers
self.end_document_handlers = end_document_handlers
self.start_filters = start_filters
self.end_filters = end_filters
self.default_start_filters = default_start_filters
self.default_end_filters = default_end_filters
self.iter_start_filters = iter_start_filters
self.iter_end_filters = iter_end_filters
self.start_ns_handlers = start_ns_handlers
self.end_ns_handlers = end_ns_handlers
self.iter_start_ns = iter_start_ns
self.iter_end_ns = iter_end_ns
# Can cache results over multiple invocations
# NOTE: not thread-safe. Though given the GIL
# this shouldn't be a problem.
self.dfa = {}
def _new_node(self, stack_as_path):
start_handlers = []
for (path, matcher, handler) in self.start_filters:
if matcher(stack_as_path):
start_handlers.append(handler)
if not start_handlers:
# Any defaults?
for (path, matcher, handler) in self.default_start_filters:
if matcher(stack_as_path):
start_handlers.append(handler)
end_handlers = []
for (path, matcher, handler) in self.end_filters:
if matcher(stack_as_path):
end_handlers.append(handler)
if not end_handlers:
# Any defaults?
for (path, matcher, handler) in self.default_end_filters:
if matcher(stack_as_path):
end_handlers.append(handler)
# Have all the handlers, now check for yields
iter_start = False
for (path, matcher) in self.iter_start_filters:
if matcher(stack_as_path):
iter_start = True
break
iter_end = False
for (path, matcher) in self.iter_end_filters:
if matcher(stack_as_path):
iter_end = True
break
new_node = ({}, start_handlers, end_handlers, iter_start, iter_end)
return new_node
def _needed_actions(self, iter=False, handler=False):
if (not handler) and (not cb):
raise AssertionError("must specify one")
actions = ("start", "end")
if ( (handler and self.start_ns_handlers) or
(iter and self.iter_start_ns) ):
actions = actions + ("start-ns",)
if ( (handler and self.end_ns_handlers) or
(iter and self.iter_end_ns) ):
actions = actions + ("end-ns",)
return actions
# I plan to implement 'handler_parse' as a near copy of 'parse'
# but without any yield statements.
def handler_parse(self, file, state=None):
for x in self.parse(file, state):
pass
# I plan to implement 'iterparse' as a near copy of 'parse'
# but without any references to callbacks
def iterparse(self, file):
return self.parse(file, None)
def parse(self, file, state=None):
node_stack = []
node_stack_append = node_stack.append
tag_stack = []
tag_stack_append = tag_stack.append
# children, start handlers, end handlers, iter start, iter end
node = (self.dfa, [], [], False, False)
# synthesize start-document events
for handler in self.start_document_handlers:
handler("start-document", None, state)
# figure out if I also need start-ns and/or end-ns events
needed_actions = self._needed_actions(True, True)
for (event, ele) in ET.iterparse(file, needed_actions):
if event == "start":
tag = ele.tag
# Descend into node; track where I am
tag_stack_append(tag)
node_stack_append(node)
try:
node = node[0][tag]
except KeyError:
# No child exists; figure out what to do
stack_as_path = "/" + ("/".join(tag_stack)) + "/"
new_node = self._new_node(stack_as_path)
node[0][tag] = new_node
node = new_node
# call the start handlers then yield the element
for start_handler in node[1]:
start_handler(event, ele, state)
if node[3]:
yield (event, ele)
elif event == "end":
# call the end handlers then yield the element
for end_handler in node[2]:
end_handler(event, ele, state)
if node[4]:
yield (event, ele)
del tag_stack[-1]
node = node_stack.pop()
elif event == "start-ns":
for handler in self.start_ns_handlers:
handler(event, ele, state)
if self.iter_start_ns:
yield (event, ele)
elif event == "end-ns":
for handler in self.end_ns_handlers:
handler(event, ele, state)
if self.iter_start_ns:
yield (event, ele)
for handler in self.end_document_handlers:
handler("end-document", None, state)
#### An incomplete test suite ####
def test_path(path, args):
#print "**** test_path", repr(path), repr(args)
pattern = to_regexp(path)
pat = re.compile(pattern)
s = "/" + ("/".join(args)) + "/"
#print pattern, s
return bool(pat.search(s))
def test_ns_path(path, args):
#print "**** test_path", repr(path), repr(args)
pattern = to_regexp(path,
namespaces = {
"xml": "http://www.w3.org/XML/1998/namespace",
"das2": "http://biodas.org/documents/das2"},
# the empty namespace is not the same as no namespace!
default_namespace = "")
pat = re.compile(pattern)
s = "/" + ("/".join(args)) + "/"
#print pattern, s
return bool(pat.search(s))
def test_syntax():
for (xpath, tag_list, expect) in (
("A", ["A"], 1),
("A", ["AA"], 0),
("A", ["B", "A"], 1),
("/A", ["B", "A"], 0),
("/B", ["B", "A"], 0),
("//A", ["B", "A"], 1),
("A//B", ["A", "B"], 1),
("A//B", ["C", "A", "B"], 1),
("/A//B", ["C", "A", "B"], 0),
("/B/*", ["B", "A"], 1),
# Test back-tracking; both greedy and non-greedy cases
("A//B//C//D", ["A", "B", "C", "B", "D"], 1),
("A//B/D", ["A", "B", "C", "B", "D"], 1),
# Clark namespace tests
("{http://x.com}A", ["{http://x.com}A"], 1),
("{http://x.org}A", ["{http://x.com}A"], 0),
("{http://x.org}A", ["{http://x.com}B", "{http://x.org}A"], 1),
("*", ["{http://x.com}A"], 1),
("{http://x.com}*", ["{http://x.com}A"], 1),
("{http://x.com}*", ["{http://x.org}A"], 0),
):
got = test_path(xpath, tag_list)
if got != expect:
raise AssertionError("xpath %r against %r got %r, expected %r" %
(xpath, tag_list, got, bool(expect)))
for (xpath, tag_list, expect) in (
# various namespace checks
("xml:A", ["{http://www.w3.org/XML/1998/namespace}A"], 1),
("xml:A", ["{http://www.w3.org/XML/1998/namespace2}A"], 0),
("xml:A", ["{http://www.w3.org/XML/1998/namespace}AA"], 0),
("xml:A", ["{http://www.w3.org/XML/1998/namespace}B",
"{http://www.w3.org/XML/1998/namespace}A"], 1),
("xml:B", ["{http://www.w3.org/XML/1998/namespace}B",
"{http://www.w3.org/XML/1998/namespace}A"], 0),
("A", ["{}A"], 1),
("A", ["A"], 0),
("*", ["A"], 0),
("*", ["{}A"], 1),
("das2:*", ["{http://biodas.org/documents/das2}AAA"], 1),
("das2:*", ["{}AAA"], 0),
("xml:*/das2:*", ["{http://www.w3.org/XML/1998/namespace}ABC",
"{http://biodas.org/documents/das2}ABC"], 1),
("das2:*/xml:*", ["{http://www.w3.org/XML/1998/namespace}ABC",
"{http://biodas.org/documents/das2}ABC"], 0),
):
got = test_ns_path(xpath, tag_list)
if got != expect:
raise AssertionError("xpath %r against %r got %r, expected %r" %
(xpath, tag_list, got, bool(expect)))
def test_filtering():
import cStringIO as StringIO
f = StringIO.StringIO("""\
eggs
foobarbaz
""")
special = object()
class Capture(object):
def __init__(self):
self.history = []
def __call__(self, event, ele, state):
if state is not special:
raise AssertionError("Did not get expected state")
self.history.append( (event, ele) )
filter = IterParseFilter()
capture_all = Capture()
filter.on_start_document(capture_all)
filter.on_start("*", capture_all)
filter.on_end("*", capture_all)
filter.on_end_document(capture_all)
filter.on_start_ns(capture_all)
filter.on_end_ns(capture_all)
for x in filter.parse(f, state=special):
raise AssertionError("should not yield %r" % (x,))
expect_history = (
("start-document", None),
("start", "A"),
("start", "AA"),
("start-ns", ("", "http://z/")),
("start", "{http://z/}B"),
("start", "{http://z/}C"),
("end", "{http://z/}C"),
("start-ns", ("spam", "http://spam/")),
("start", "{http://spam/}D"),
("end", "{http://spam/}D"),
("end-ns", None),
("end", "{http://z/}B"),
("end-ns", None),
("start", "B"),
("start", "B"),
("end", "B"),
("end", "B"),
("end", "AA"),
("end","A"),
("end-document", None),
)
for (got, expect) in zip(capture_all.history, expect_history):
event, ele = got
tag = getattr(ele, "tag", ele)
if (event, tag) != expect:
raise AssertionError("Expected %r Got %r" % (expect, (event, tag)))
if len(capture_all.history) != len(expect_history):
raise AssertionError("Length mismatch")
f.seek(0)
filter = IterParseFilter()
def must_match_B(event, ele, state):
if ele.tag != "B":
raise AssertionError("%r is not B" % (ele.tag,))
def must_match_B_y7(event, ele, state):
if ele.tag != "B":
raise AssertionError("%r is not B" % (ele.tag,))
if ele.attrib["y"] != "7":
raise AssertionError("%r is not the correct B" % (ele.tag,))
filter.on_start("B", must_match_B)
filter.on_start("B/B", must_match_B_y7)
f.seek
def test_parse():
import os
filename = "/Users/dalke/Music/iTunes/iTunes Music Library.xml"
if not os.path.exists(filename):
print "Cannot find %r: skipping test" % (filename,)
return
# Work through callbacks
ef = IterParseFilter()
def print_info(event, ele, state):
d = {}
children = iter(ele)
for child in children:
key = child.text
value = children.next().text
d[key] = value
print "%r is by %r" % (d["Name"], d.get("Artist", ""))
ele.clear()
ef.on_end("/plist/dict/dict/dict", print_info)
ef.handler_parse(open(filename))
# Work through iterators
ef = IterParseFilter()
ef.iter_end("/plist/dict/dict/dict")
for (event, ele) in ef.iterparse(open(filename)):
d = {}
children = iter(ele)
for child in children:
key = child.text
value = children.next().text
d[key] = value
print "%r is a %r song" % (d["Name"], d.get("Genre", ""))
ele.clear()
def test():
test_syntax()
test_filtering()
test_parse()
if __name__ == "__main__":
test()
print "All tests passed."