Dalke Scientific Software: More science. Less time. Products
[ previous | newer ]     /home/writings/diary/archive/2010/02/22/instrumenting_the_ast

Instrumenting the AST

The following is a rough retelling of my presentation for the Testing in Python BoF at PyCon 2010, including removing some in-jokes relevant only for that session. On the other hand, I expanded it to include working code. This means you, yes you, could work on this. It's not for the faint of heart. Have fun! And let me know what you think.

The AST module

Code coverage is a good thing. I want to do branch coverage. Last year Ned Batchelder added branch coverage support to coverage.py, which works by analyzing the byte code. I want to see if there's a better solution through an entirely different approach.

Let me introduce you to Python's "ast" module.

>>> import ast
It's an interface to Python's internal Python parser so program can convert string containing Python code into an abstract syntax tree (AST).
>>> ast.parse("for i in range(10): print i")
<_ast.Module object at 0x1004d06d0>
The ast module contains some code to display the contents of the AST as a string.
>>> ast.dump(ast.parse("for i in range(10): print i"))
"Module(body=[For(target=Name(id='i', ctx=Store()),
   iter=Call(func=Name(id='range', ctx=Load()),
   args=[Num(n=10)], keywords=[], starargs=None,
   kwargs=None), body=[Print(dest=None,
   values=[Name(id='i', ctx=Load())], nl=True)],
I've reformatted it to fit my slides as otherwise it's a long string. I can also ask it to display the position information, which is the second True in the following.
>>> ast.dump(ast.parse("for i in range(10): print i"), True, True)
"Module(body=[For(target=Name(id='i', ctx=Store(),
   lineno=1, col_offset=4), iter=Call(func=Name(
   id='range', ctx=Load(), lineno=1, col_offset=9),
   args=[Num(n=10, lineno=1, col_offset=15)],
   keywords=[], starargs=None, kwargs=None,
   lineno=1, col_offset=9), body=[Print(dest=None,
   values=[Name(id='i', ctx=Load(), lineno=1, col_offset=26)],
   nl=True, lineno=1, col_offset=20)], orelse=[], lineno=1,

Programmatically building an AST

You can use the ast module to build a tree directly, without parsing a string, then compile and execute that code.

>>> from ast import *
>>> tree = Module([Print(None, [Str("PyCon2010!")], True)])
>>> tree.lineno = 1
>>> tree.col_offset = 1
>>> fix_missing_locations(tree)
<_ast.Module object at 0x1004dff50>
>>> tree = fix_missing_locations(tree)
>>> compile(tree, "<TiP>", "exec")
<code object <module> at 0x1004d38a0, file "<TiP>", line 1>
>>> exec compile(tree, "<TiP>", "exec")
"ast.fix_missing_locations" is a helper function to assign missing position information the compiler needs when generating byte code. I end up using it and "ast.copy_location" a lot, which copies the location information from one node to another.

The mystery of the wrong TypeError

What's the bug with the following?

  raise TypeError("blah: %d" % "I said 'PyCon2010'!")
except TypeError:
The code correctly raises a TypeError, but it's the wrong TypeError. I've made this mistake a few times, which is why I try to remember to check the contents of the exception during my tests. Notice that unittest doesn't help here, since assertRaises only checks the exception type, and not the content.

It is possible to check all of these manually. You could defer the calculation to a "check_mod()" function

  raise TypeError(
         check_mod("blah: %d", "I said 'PyCon2010'!"))
except TypeError:
A check_mod function might look like
def check_mod(left, right):
        return left % right
    except Exception, err:
        print "Could not interpolate: %s" % (err,)

Rewriting the AST for fun (and profit?)

The ast module has some support code for creating a new parse tree based on transforming another parse tree. I can transform all "%" binary operations to call a function for the left and right sides. Here's a non-working but mostly complete version of how that might look.

from ast import *

class RewriteInterpolation(NodeTransformer):
    def visit_BinOp(self, node):
        if isinstance(node.op, Mod):
            new_node = Call(func=Name(id='check_string', ctx=Load()),
                            args=[node.left, node.right,
                            keywords = [], starargs=None, kwargs=None
            copy_location(new_node, node)
            return new_node
        return node

code = open(filename).read()
tree = parse(code, filename)
tree = RewriteInterpolation().visit(tree)
What's missing is the code to define or import check_string. I'll leave that for later. For now, just get the idea that you can parse Python code to an AST, rewrite it in order to instrument certain parts, then execute the result.

I ran something similar to this against the Python standard library and tests, in the hopes that I could find an bug. It took a lot of hands on fiddling, since some essential Python modules cannot be instrumented because the full path wasn't fully defined. I got it to work, and found no bugs. The closest was this code from difflib.py

            linenum = '%d' % linenum
            id = ' id="%s%s"' % (self._prefix[side],linenum)
        except TypeError:
            # handle blank lines where linenum is '>' or ''
            id = ''
where the comment is needed because otherwise the reason isn't immediately obvious. While not a bug, it perhaps does show you that the test revealed something. (Oh, and it also showed the several hundred tests that the standard library does to test string interpolation failures.)

(Well-known) Limitations in coverage.py

Take a look at this program. I've used coverage.py to run the program and annotate the code to display the coverage and highlight the lines which weren't executed.

You can see there are a other few problems which coverage did not test. Line 9 never executes "x=9" and the "raise TypeError" in line 17 is never reached, because of the string interpolation error in the parameter list. I've hacked together something over the last 30 hours to show that something better is possible.

A different approach

I want to generate coverage for this statement:
x = 1
I'll do that by parsing the code into the AST then rewriting the AST so it's equivalent to:
from ast_report import register_module

ast_enter, ast_leave, ast_reached = \
      {0: (1, 0)}, {} )   #  unique identifer -> (lineno, col_offset)
if 1:
  ast_enter[0] += 1
  x = 3
  ast_leave[0] += 1

The "register_module" function is something I'll write in a bit. It will take a filename and two location dictionaries. The first dictionary is for statements like assignment which are supposed to go to completion. That is, code before it and after it are supposed to execute. (Compare this to 'return', which will never allow code after it to run. That's what the second dictionary is used for.)

The key is a unique identifier associated with each statement with a coverage test, and the value is the lineno and col_offset pair which come from the AST.

The ast_enter and ast_leave dictionaries here are default dicts (though that's an implementation point). The "0" is same unique identifier in the location dictionary, and can be used to say that the statment at line 1, column 1 (col_offset starts with 0), was reached and left.

At this point someone in the audience astutely asked why I used an "if 1:" in the above. That's a limitation of how the ast.NodeTransformer works. It lets derived classes tranform a single term to a single other term, which means I need to transform a single statement (the assignment here) into a single other statement, and not three statements. I chose the "if 1:" because it's easy to write, it can contain an arbitrary number of sub-statements, and because Python's byte compiler knows how to optimize away the "if 1:" test.

If you think about this approach, the run-time overhead is pretty low, but it's a lot more than simple assignment. I don't know how it affects real-world code. Remember, it's been 30 hours since I started this, and I'm at conference as well.

Instrumenting the AST for code coverage

The next step is to automate all of this: convert a .py file into an AST, instrument the code to add these coverage checks, implement the reporting mechanism as an atexit hook, and for good measure, add the "%" TypeError check. To see if this is effective, convert the AST to byte code and save it to a .pyc file.

This calls for a horrible hack around a call to compileall.py. I've named the result "ast_compileall.py"

# ast_compileall.py
import __builtin__
from ast import *
import compileall
import sys
import traceback
import itertools

class RewriteInterpolation(NodeTransformer):
    def __init__(self, filename):
        self.filename = filename
        self.enter_linenos = {}  # id -> (lineno, col_offset)
        self.reach_linenos = {}  # id -> (lineno, col_offset)
        self.counter = itertools.count()
    def visit_Module(self, module_node):
        # Need to import and call ast_report.register_module().
        # These must occur after the "from __future__ import ..." statements.
        # Find where I can insert them.
        body_future = []
        body_rest = []
        for node in module_node.body:
            node = self.visit(node)
            if (not body_rest and isinstance(node, ImportFrom) and
                node.module == "__future__"):

        # It's easier to let Python convert the code to an AST
        import_line = parse("from ast_report import register_module, check_string").body[0]
        print ("ast_enter, ast_leave, ast_reached = register_module(%r, %r, %r)" %
               (self.filename, self.enter_linenos, self.reach_linenos))
        register_line = parse(
            "ast_enter, ast_leave, ast_reached = register_module(%r, %r, %r)" %
            (self.filename, self.enter_linenos, self.reach_linenos)).body[0]

        # Assign a reasonable seeming line number.
        lineno = 1
        if body_future:
            lineno = body_future[0].lineno
        for new_node in (import_line, register_line):
            new_node.col_offset = 1
            new_node.lineno = lineno

        new_body = body_future + [import_line, register_line] + body_rest
        return Module(body=new_body)

    # These are statements which should have an enter and leave
    # (In retrospect, this isn't always true, eg, for 'if')
    def track_enter_leave_lineno(self, node):
        node = self.generic_visit(node)
        id = next(self.counter)
        enter = parse("ast_enter[%d] += 1" % id).body[0]
        leave = parse("ast_leave[%d] += 1" % id).body[0]
        self.enter_linenos[id] = (node.lineno, node.col_offset)
        for new_node in (enter, leave):
            copy_location(new_node, node)

        # This is the code for "if 1: ..."
        n = Num(n=1)
        copy_location(n, node)
        if_node = If(test=n, body=[enter, node, leave], orelse=[])
        copy_location(if_node, node)
        return if_node

    visit_FunctionDef = track_enter_leave_lineno
    visit_ClassDef = track_enter_leave_lineno
    visit_Assign = track_enter_leave_lineno
    visit_AugAssign = track_enter_leave_lineno
    visit_Delete = track_enter_leave_lineno
    visit_Print = track_enter_leave_lineno
    visit_For = track_enter_leave_lineno
    visit_While = track_enter_leave_lineno
    visit_If = track_enter_leave_lineno
    visit_With = track_enter_leave_lineno
    visit_TryExcept = track_enter_leave_lineno
    visit_TryFinally = track_enter_leave_lineno
    visit_Assert = track_enter_leave_lineno
    visit_Import = track_enter_leave_lineno
    visit_ImportFrom = track_enter_leave_lineno
    visit_Exec = track_enter_leave_lineno
    visit_Expr = track_enter_leave_lineno
    visit_Pass = track_enter_leave_lineno

    # These statements can be reached, but they change
    # control flow and are never exited.
    def track_reached_lineno(self, node):
        node = self.generic_visit(node)
        id = next(self.counter)
        reach = parse("ast_reached[%d] += 1" % id).body[0]
        self.reach_linenos[id] = (node.lineno, node.col_offset)
        copy_location(reach, node)

        n = Num(n=1)
        copy_location(n, node)
        if_node = If(test=n, body=[reach, node], orelse=[])
        copy_location(if_node, node)
        return if_node

    visit_Return = track_reached_lineno
    visit_Raise = track_reached_lineno
    visit_Break = track_reached_lineno
    visit_Continue = track_reached_lineno
    # Some code to instrument the run-time and check for '%' failures.
    def visit_BinOp(self, node):
        if isinstance(node.op, Mod):
            new_node = Call(func=Name(id='check_string', ctx=Load()),
                            args=[node.left, node.right,
                            keywords = [], starargs=None, kwargs=None
            copy_location(new_node, node)
            return new_node
        return node

old_compile = __builtin__.compile

def compile(source, filename, mode, flags=0): # skipping a few parameters
    # My rewrite code uses ast.parse, which ends up calling this
    # function with this argument, so pass it back to the real compile.
    if flags == PyCF_ONLY_AST:
        return old_compile(source, filename, mode, flags)
    assert mode == "exec"
    code = open(filename).read()
    tree = parse(code, filename)
    tree = RewriteInterpolation(filename).visit(tree)
    code = old_compile(tree, filename, "exec")
    return code

# Ugly hack so I can force compileall to use my compile function.
__builtin__.compile = compile

exit_status = int(not compileall.main())
I placed this file in "spam/testing.py"
def main():

  def f(x):
    if x > 0:
      return x*x

  for i in range(4, 9):
    if f(i) < 0: x=9
    if i == 8:
       print "Here"
    if i == 10:

      raise TypeError("Hi! %d" % "sdfa")
  except TypeError:

I then compiled all of the .py files in the 'spam' directory with
python ast_compileall.py spam
and I made sure the following was on my PYTHONPATH as "ast_report.py"
# ast_report.py
from collections import defaultdict
import traceback
import atexit
import linecache

loaded_modules = []

class FileInfo(object):
    def __init__(self, filename, enter_linenos, reach_linenos):
        self.filename = filename
        self.enter_linenos = enter_linenos
        self.reach_linenos = reach_linenos
        self.ast_enter = defaultdict(int)
        self.ast_leave = defaultdict(int)
        self.ast_reach = defaultdict(int)

def register_module(filename, enter_linenos, reach_linenos):
    #print filename, enter_linenos, reach_linenos
    info = FileInfo(filename, enter_linenos, reach_linenos)
    return info.ast_enter, info.ast_leave, info.ast_reach

def check_string(left, right, lineno, col_offset):
    if not isinstance(left, basestring):
        return left % right
        return left % right
    except Exception, err:
        print "Could not interpolate: %s" % (err,)

# Basic coverage report
def report_coverage():
    for fileinfo in loaded_modules:
        # This will contain a list of all results as a 3-ple of
        #   lineno, col_offset, "text message"
        report = []
        # These should have both 'enter' and 'leave' counts.
        for id, (lineno, col_offset) in fileinfo.enter_linenos.items():
            if id not in fileinfo.ast_enter:
                report.append( (lineno, col_offset, "not entered") )
            elif id not in fileinfo.ast_leave:
                report.append( (lineno, col_offset, "enter %d but never left" %
                                fileinfo.ast_enter[id]) )
                delta = fileinfo.ast_leave[id] - fileinfo.ast_enter[id]
                report.append( (lineno, col_offset, "enter %d leave %d (diff %d)" %
                                (fileinfo.ast_enter[id], fileinfo.ast_leave[id], delta)) )

        # These only need to be 'reach'ed
        for id, (lineno, col_offset) in fileinfo.reach_linenos.items():
            if id not in fileinfo.ast_reach:
                report.append( (lineno, col_offset, "not reached") )
                report.append( (lineno, col_offset, "reach %d" % (fileinfo.ast_reach[id],)) )

        # sort by line number, breaking ties by column offset

        print "Coverage results for file", fileinfo.filename
        for lineno, col_offset, msg in report:
            print "%d:%d %s" % (lineno, col_offset+1, msg)
            print linecache.getline(fileinfo.filename, lineno).rstrip()

# Dump the coverage results when Python exist.
(While I used an atexit hook here, I did that because it was the fastest way to get to a proof-of-concept solution. Really I think this should be more like how coverage.py works, with a command-line script which sets up the run environment and reports the results at the end.)

Try it out!

This coverage code will only work on modules which were imported, where the .pyc file is used instead of the .py file. (But perhaps an import hook would be useful or at least interesting here?) What I do is import the module via the command-line

% cd spam/
% python -c 'import testing'
Could not interpolate: %d format: a number is required, not str
  File "<string>", line 1, in <module>
  File "spam/testing.py", line 21, in <module>
  File "spam/testing.py", line 785, in main
  File "ast_report.py", line 30, in check_string
Coverage results for file spam/testing.py
1:1 enter 1 leave 1 (diff 0)
def main():
3:3 enter 1 leave 1 (diff 0)
  def f(x):
4:5 enter 5 but never left
    if x > 0:
5:7 reach 5
      return x*x
6:5 not entered
8:3 enter 1 leave 1 (diff 0)
  for i in range(4, 9):
9:5 enter 5 leave 5 (diff 0)
    if f(i) < 0: x=9
9:18 not entered
    if f(i) < 0: x=9
10:5 enter 5 leave 4 (diff -1)
    if i == 8:
11:8 reach 1
12:8 not entered
       print "Here"
13:5 enter 4 leave 4 (diff 0)
    if i == 10:
14:8 not reached
16:3 enter 1 leave 1 (diff 0)
17:7 reach 1
      raise TypeError("Hi! %d" % "sdfa")
19:7 enter 1 leave 1 (diff 0)
21:1 enter 1 leave 1 (diff 0)

You can see that it reports the string interpolation without a problem, and if you look closely you'll see that it catches that the "if" on line 9 is executed while the "x=9" also on line 9 is never executed.

There's also some problems. Line 4 reports that the code was entered 5 times and never left, but that's a bit of a false positive since it left through a return statement. I think now, after additional thought, that the better solution is to put the "leave" test on the first line of each possible branch.

Pluses and minuses

There are some great advantages to this approach.

Some very complex things are possible. Some very evil things are also possible.

There are some difficult problems as well. Consider:

x = arg or default_arg or die(_("missing arg"))

Branch reporting should say that 'arg' tested both True and False, that default_arg tested True and False and ... that the result of die() tested both True and False?

And just how should someone visualize all this extra data?

"See also" and ruminations

I talked with Ned some after my presentation. He pointed out that the complex part of coverage.py, which he's worked on a lot during the last year, is to make the system configurable so it can be told which coverage to ignore. I know what he means. In the late 1990s I added the "#pragma: no cover" option to the early form of coverage.py, which exists (although not my actual code) to this day.

If coverage works on a more fine-grained level, how do you suppress the false warnings so the true issues aren't hidden in the noise?

Ned also pointed out Matthew J. Desmarais' work with Canopy

instrument python code to generate robust coverage information. the goal is to provide modified condition/decision coverage metrics.
I'm not the only one who has thought about instrumenting the AST, even in Python. (The Lisp community likely thought of this before I was born.) What I've hoped to do here is explain it well enough so that you can figure out how this approach works and come up with ways to extend it for the future.... or figure out why it fails.

If you are doing that, do bear in mind my python4ply package. It contains a full grammar definition for Python using PLY, with support for the decrepit AST from the compiler module. Potentially you could use it to have Python 3 generate an AST for Python 2, or even vice versa, with a lot more work.

Or, if you have both money and interest, perhaps you'll fund me? I am a consultant, after all. I mostly work in computational chemistry and my clients aren't interested in this sort of deep language analysis, so I only work on this during rare intervals. It's not only money, but access to people who want these sorts of capabilities and can give me feedback on what they want and how effective a solution is.

Or, if you want to work on it yourself - feel free! I hereby release all of this code to the public domain, and disavow any copyright interest in the code expressed in this article. You don't even have to mention my name. Just develop good testing tools.

I know there are a number of tools in the greater world of computing which can work on ASTs. I have no experience with them. Perhaps it's best to convert the Python AST to some other tree grammar where there is a tree manipulation language? When I'm feeling crazy I think "just convert the AST to XML then use XSLT to add the instrumentation, and convert the resulting XML back to an AST." How sane is that? And it would mean I would have to learn a lot more about XSLT. Or what about ANTLR's tree grammars? But then there's Manual Tree Walking Is Better Than Tree Grammars. It's a Brave New World.


I thank Armin Rigo, Brett Cannon, Grant Edwards, John Ehresman, Jeremy Hylton, Kurt Kaiser, Neal Norwitz, Neil Schemenauer, Nick Coghlan, Tim Peters, Martin von Löwis and everyone else who worked on the ast module. Without them this would be a much harder problem.

Any comments?

Leave them here.

Andrew Dalke is an independent consultant focusing on software development for computational chemistry and biology. Need contract programming, help, or training? Contact me

Copyright © 2001-2013 Andrew Dalke Scientific AB