We previously discussed how one can write an interpreter for Python. We hinted at that time that the machinery could be used for a variety of other applications, including exctracting the call and control flow graph. In this post, we will show how one can extract the control flow graph using such an interpteter.

A control flow graph is a directed graph data structure that encodes all paths that may be traversed through a program. That is, in some sense, it is an abstract view of the interpreter as a whole.

This implementation is based on the fuzzingbook CFG appendix However, the fuzzingbook implementation is focused on Python statements as it is used primarily for visualization, while this is based on basic blocks with the intension of using it for code generation.

Control flow graphs are useful for a variety of tasks. They are one of the most frequently used tools for visualization. But more imporatntly it is the starting point for further analysis of the program including code generation, optimizations, and other static analysis techniques.

Prerequisites

As before, we start with the prerequisite imports.

import ast
import re
import astunparse

We define a simple viewing function for visualization

import graphviz

def get_color(p, c):
    color='black'
    while not p.annotation():
        if p.label == 'if:True':
            return 'blue'
        elif p.label == 'if:False':
            return 'red'
        p = p.parents[0]
    return color

def get_peripheries(p):
    annot = p.annotation()
    if annot  in {'<start>', '<stop>'}:
        return '2'
    if annot.startswith('<define>') or annot.startswith('<exit>'):
        return '2'
    return '1'

def get_shape(p):
    annot = p.annotation()
    if annot in {'<start>', '<stop>'}:
        return 'oval'
    if annot.startswith('<define>') or annot.startswith('<exit>'):
        return 'oval'

    if annot.startswith('if:'):
        return 'diamond'
    else:
        return 'rectangle'


def to_graph(registry, arcs=[], comment='', get_shape=lambda n: 'rectangle', get_peripheries=lambda n: '1', get_color=lambda p,c: 'black'):
    graph = Digraph(comment=comment)
    for nid, cnode in registry.items():
        if not cnode.annotation():
            continue
        sn = cnode.annotation()
        graph.node(cnode.name(), sn, shape=get_shape(cnode), peripheries=get_peripheries(cnode))
        for pn in cnode.parents:
            gp = pn.get_gparent_id()
            color = get_color(pn, cnode)
            graph.edge(gp, str(cnode.rid), color=color)
    return graph

The CFGNode

The control flow graph is a graph, and hence we need a data structue for the node. We need to store the parents of this node, the children of this node, and register itself in the registery.

class CFGNode:
    counter = 0
    registry = {}
    stack = []
    def __init__(self, parents=[], ast=None, label=None, annot=None):
        self.parents = parents
        self.calls = []
        self.children = []
        self.ast_node = ast
        self.label = label
        self.annot = annot
        self.rid  = CFGNode.counter
        CFGNode.registry[self.rid] = self
        CFGNode.counter += 1

Given that it is a directed graph node, we need the ability to add parents and children.

class CFGNode(CFGNode):
    def add_child(self, c):
        if c not in self.children:
            self.children.append(c)

    def add_parent(self, p):
        if p not in self.parents:
            self.parents.append(p)

    def add_parents(self, ps):
        for p in ps:
            self.add_parent(p)

    def add_calls(self, func):
        mid = None
        if hasattr(func, 'id'): # ast.Name
            mid = func.id
        else: # ast.Attribute
            mid = func.value.id
        self.calls.append(mid)

A few convenience methods to make our life simpler.

class CFGNode(CFGNode):
    def __eq__(self, other):
        return self.rid == other.rid

    def __neq__(self, other):
        return self.rid != other.rid

    def lineno(self):
        return self.ast_node.lineno if hasattr(self.ast_node, 'lineno') else 0
        
    def name(self):
        return str(self.rid)
        
    def expr(self):
        return self.source()
        
    def __str__(self):
        return "id:%d line[%d] parents: %s : %s" % \
           (self.rid, self.lineno(), str([p.rid for p in self.parents]), self.source())

    def __repr__(self):
        return str(self)

    def source(self):
        return astunparse.unparse(self.ast_node).strip()

    def annotation(self):
        if self.annot is not None:
            return self.annot
        return self.source()

    def to_json(self):
        return {'id':self.rid, 'parents': [p.rid for p in self.parents],
               'children': [c.rid for c in self.children],
               'calls': self.calls, 'at':self.lineno() ,'ast':self.source()}
               
    def get_gparent_id(self):
        p = CFGNode.registry[self.rid]
        while not p.annotation():
            p = p.parents[0]
        return str(p.rid)

The usage is as below:

start = CFGNode(parents=[], ast=ast.parse('start').body)
g = to_graph(CFGNode.registry, get_color=get_color, get_peripheries=get_peripheries, get_shape=get_shape)
g.format = 'svg'
print(g.pipe().decode())
%3 0 start

Extracting the control flow

The control flow graph is essentially a source code walker, and shares the basic structure with our interpreter. It can indeed inherit from the interpreter, but given that we override all functions in it, we chose not to inherit.

class PyCFGExtractor:
    def __init__(self):
        self.founder = CFGNode(parents=[], ast=ast.parse('start').body[0]) # sentinel
        self.founder.ast_node.lineno = 0
        self.functions = {}
        self.functions_node = {}

As before, we define walk() that walks a given AST node.

A major difference from MCI is in the functions that handle each node. Since it is a directed graph traversal, our walk() accepts a list of parent nodes that point to this node, and also invokes the various on_*() functions with the same list. These functions in turn return a list of nodes that exit them.

While expressions, and single statements only have one node that comes out of them, control flow structures and function calls can have multiple nodes that come out of them going into the next node. For example, an If statement will have a node from both the if.body and if.orelse going into the next one.

class PyCFGExtractor(PyCFGExtractor):
    def walk(self, node, myparents):
        if node is None: return
        fname = "on_%s" % node.__class__.__name__.lower()
        if hasattr(self, fname):
            return getattr(self, fname)(node, myparents)
        raise SyntaxError('walk: Not Implemented in %s' % type(node))

Pass

The pass statement is trivial. It simply adds one more node.

class PyCFGExtractor(PyCFGExtractor):
    def on_pass(self, node, myparents):
        p = [CFGNode(parents=myparents, ast=node)]
        return p

Here is the CFG from a single pass statement.

pass
%3 0 start 1 pass 0->1

Module(stmt* body)

We start by defining the Module. A python module is composed of a sequence of statements, and the graph is a linear path through these statements. That is, each time a statement is executed, we make a link from it to the next statement.

class PyCFGExtractor(PyCFGExtractor):
    def on_module(self, node, myparents):
        p = myparents
        for n in node.body:
            p = self.walk(n, p)
        return p

Here is the CFG from the following which is wrapped in a module

pass
pass
%3 0 start 1 pass 0->1 2 pass 1->2

Expressions

Primitives

How should we handle primitives? Since they are simply interpreted as is, they can be embedded right in.

class PyCFGExtractor(PyCFGExtractor):
    def on_str(self, node, myparents):
        p = [CFGNode(parents=myparents, ast=node, annot='')]
        return p
        
    def on_num(self, node, myparents):
        p = [CFGNode(parents=myparents, ast=node, annot='')]
        return p

They however, are simple expressions

class PyCFGExtractor(PyCFGExtractor):
    def on_expr(self, node, myparents):
        p = self.walk(node.value, myparents)
        p = [CFGNode(parents=p, ast=node)]
        return p

Generating the following CFG

10
'a'
%3 0 start 2 10 0->2 4 'a' 2->4

Arithmetic expressions

The following implements the arithmetic expressions. The unaryop() simply walks the arguments and adds the current node to the chain. The binop() has to walk the left argument, then walk the right argument, and finally insert the current node in the chain. compare() is again similar to binop(). expr(), again has only one argument to walk, and one node out of it.

class PyCFGExtractor(PyCFGExtractor):
    def on_unaryop(self, node, myparents):
        p = [CFGNode(parents=myparents, ast=node, annot='')]
        return self.walk(node.operand, p)

    def on_binop(self, node, myparents):
        left = self.walk(node.left, myparents)
        right = self.walk(node.right, left)
        p = [CFGNode(parents=right, ast=node, annot='')]
        return p

    def on_compare(self, node, myparents):
        left = self.walk(node.left, myparents)
        right = self.walk(node.comparators[0], left)
        p = [CFGNode(parents=right, ast=node, annot='')]
        return p

CFG for this expression

10+1
%3 0 start 4 (10 + 1) 0->4

Assign(expr* targets, expr value)

Unlike MCI, assignment is simple as it has only a single node coming out of it.

The following are not yet implemented:

  • AugAssign(expr target, operator op, expr value)
  • AnnAssign(expr target, expr annotation, expr? value, int simple)

Further, we do not yet implement parallel assignments.

class PyCFGExtractor(PyCFGExtractor):
    def on_assign(self, node, myparents):
        if len(node.targets) > 1: raise NotImplemented('Parallel assignments')
        p = [CFGNode(parents=myparents, ast=node)]
        p = self.walk(node.value, p)
        return p

Example

a = 10+1
%3 0 start 1 a = (10 + 1) 0->1

Name

class PyCFGExtractor(PyCFGExtractor):
    def on_name(self, node, myparents):
        p = [CFGNode(parents=myparents, ast=node, annot='')]
        return p

Control structures

If

We now come to the control structures. For the if statement, we have two parallel paths. We first evaluate the test expression, then add a new node corresponding to the if statement, and provide the paths through the if.body and if.orelse.

class PyCFGExtractor(PyCFGExtractor):
class PyCFGExtractor(PyCFGExtractor):
    def on_if(self, node, myparents):
        p = self.walk(node.test, myparents)
        test_node = [CFGNode(parents=p, ast=node, annot="if: %s" % astunparse.unparse(node.test).strip())]
        g1 = test_node
        g_true = [CFGNode(parents=g1, ast=None, label="if:True", annot='')]
        g1 = g_true
        for n in node.body:
            g1 = self.walk(n, g1)
        g2 = test_node
        g_false = [CFGNode(parents=g2, ast=None, label="if:False", annot='')]
        g2 = g_false
        for n in node.orelse:
            g2 = self.walk(n, g2)
        return g1 + g2

Example

a = 1
if a>1:
    a = 1
else:
    a = 0
%3 0 start 1 a = 1 0->1 6 if: (a > 1) 1->6 8 a = 1 6->8 11 a = 0 6->11

While

The while statement is more complex than the if statement. For one, we need to provide a way to evaluate the condition at the beginning of each iteration.

Essentially, given something like this:

while x > 0:
    statement1
    if x:
       continue;
    if y:
       break
    statement2

We need to expand this into:

lbl1: v = x > 0
lbl2: if not v: goto lbl2
      statement1
      if x: goto lbl1
      if y: goto lbl3
      statement2
      goto lbl1
lbl3: ...

The basic idea is that when we walk the node.body, if there is a break statement, it will start searching up the parent chain, until it finds a node with loop_entry label. Then it will attach itself to the exit_nodes as one of the exits.

    def on_while(self, node, myparents):
        loop_id = CFGNode.counter
        lbl1_node = CFGNode(parents=myparents, ast=node, label='loop_entry', annot='%s:while' % loop_id)
        p = self.walk(node.test, [lbl1_node])

        lbl2_node = CFGNode(parents=p, ast=node.test, label='while:test',
               annot='if: %s' % astunparse.unparse(node.test).strip())
        g_false = CFGNode(parents=[lbl2_node], ast=None, label="if:False", annot='')
        g_true = CFGNode(parents=[lbl2_node], ast=None, label="if:True", annot='')
        lbl1_node.exit_nodes = [g_false]

        p = [g_true]

        for n in node.body:
            p = self.walk(n, p)

        # the last node is the parent for the lb1 node.
        lbl1_node.add_parents(p)

        return lbl1_node.exit_nodes

Example

x = 1
while x > 0:
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 3 3: while 1->3 7 while: (x > 0) 3->7 10 x = (x - 1) 10->3 7->10 14 y = x 7->14

Break

As we explained before, the break when it is encountred, looks up the parent chain. Once it finds a parent that has the loop_entry label, it attaches itself to that parent. The statements following the break are not its immediate children. Hence, we return an empty list.

class PyCFGExtractor(PyCFGExtractor):
    def on_break(self, node, myparents):
        parent = myparents[0]
        while parent.label != 'loop_entry':
            parent = parent.parents[0]

        assert hasattr(parent, 'exit_nodes')
        p = CFGNode(parents=myparents, ast=node)

        # make the break one of the parents of label node.
        parent.exit_nodes.append(p)

        # break doesnt have immediate children
        return []

Example

x = 1
while x > 0:
    if x > 1:
        break
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 3 3: while 1->3 7 while: (x > 0) 3->7 17 x = (x - 1) 17->3 13 if: (x > 1) 7->13 21 y = x 7->21 13->17 15 break 13->15 15->21

Continue

Continue is similar to break, except that it has to restart the loop. Hence, it adds itself as a parent to the node with loop_entry attribute. As like break, execution does not proceed to the lexically next statement after continue. Hence, we return an empty set.

class PyCFGExtractor(PyCFGExtractor):
    def on_continue(self, node, myparents):
        parent = myparents[0]
        while parent.label != 'loop_entry':
            parent = parent.parents[0]
            
        p = CFGNode(parents=myparents, ast=node)
        parent.add_parent(p)
        
        return []

Example

x = 1
while x > 0:
    if x > 1:
        continue
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 3 3: while 1->3 7 while: (x > 0) 3->7 15 continue 15->3 17 x = (x - 1) 17->3 13 if: (x > 1) 7->13 21 y = x 7->21 13->15 13->17

For

The For statement in Python is rather complex. Given a for loop as below

for i in my_expr:
    statement1
    statement2

This has to be extracted to the following:

lbl1: 
      __iv = iter(my_expr)
lbl2: if __iv.length_hint() > 0: goto lbl3
      i = next(__iv)
      statement1
      statement2
lbl3: ...

We need on_call() for implementing on_for(). Essentially, we walk through the arguments, then add a node corresponding to the call to the parents.

class PyCFGExtractor(PyCFGExtractor):
    def on_call(self, node, myparents):
        p = myparents
        for a in node.args:
            p = self.walk(a, p)
        myparents[0].add_calls(node.func)
        p = [CFGNode(parents=p, ast=node, label='call', annot='')]
        return p
class PyCFGExtractor(PyCFGExtractor):
    def on_for(self, node, myparents):
        #node.target in node.iter: node.body
        loop_id = CFGNode.counter

        for_pre = CFGNode(parents=myparents, ast=None, label='for_pre', annot='')

        init_node = ast.parse('__iv_%d = iter(%s)' % (loop_id, astunparse.unparse(node.iter).strip())).body[0]
        p = self.walk(init_node, [for_pre])

        lbl1_node = CFGNode(parents=p, ast=node, label='loop_entry', annot='%s: for' % loop_id)
        _test_node = ast.parse('__iv_%d.__length__hint__() > 0' % loop_id).body[0].value
        p = self.walk(_test_node, [lbl1_node])

        lbl2_node = CFGNode(parents=p, ast=_test_node, label='for:test', annot='for: %s' % astunparse.unparse(_test_node).strip())
        g_false = CFGNode(parents=[lbl2_node], ast=None, label="if:False", annot='')
        g_true = CFGNode(parents=[lbl2_node], ast=None, label="if:True", annot='')
        lbl1_node.exit_nodes = [g_false]

        p = [g_true]

        # now we evaluate the body, one at a time.
        for n in node.body:
            p = self.walk(n, p)

        # the test node is looped back at the end of processing.
        lbl1_node.add_parents(p)

        return lbl1_node.exit_nodes

Example

x = 1
for i in val:
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 4 __iv_3 = iter(val) 1->4 6 3: for 4->6 9 for: (__iv_3.__length__hint__() > 0) 6->9 12 x = (x - 1) 12->6 9->12 16 y = x 9->16
x = 1
for i in val:
    if x > 1:
        break
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 4 __iv_3 = iter(val) 1->4 6 3: for 4->6 9 for: (__iv_3.__length__hint__() > 0) 6->9 19 x = (x - 1) 19->6 15 if: (x > 1) 9->15 23 y = x 9->23 15->19 17 break 15->17 17->23
x = 1
for i in val:
    if x > 1:
        continue
    x = x -1
y = x
%3 0 start 1 x = 1 0->1 4 __iv_3 = iter(val) 1->4 6 3: for 4->6 9 for: (__iv_3.__length__hint__() > 0) 6->9 17 continue 17->6 19 x = (x - 1) 19->6 15 if: (x > 1) 9->15 23 y = x 9->23 15->17 15->19

FunctionDef

When defining a function, we should define the return_nodes for the return statement to hook into. Further, we should also register our functions.

Next, we have to decide: Do we want the call graph of the function definition to be attached to the previous statements? In Python, the function definition itself is independent of the previous statements. Hence, here, we choose not to have parents for the definition.

DEFS_HAVE_PARENTS = False

class PyCFGExtractor(PyCFGExtractor):  
    def on_functiondef(self, node, myparents):
        # name, args, body, decorator_list, returns
        fname = node.name
        args = node.args
        returns = node.returns
        p = myparents if DEFS_HAVE_PARENTS else []
        enter_node = CFGNode(parents=p, ast=node, label='enter',
                annot='<define>: %s' % node.name)
        enter_node.return_nodes = [] # sentinel

        p = [enter_node]
        for n in node.body:
            p = self.walk(n, p)

        enter_node.return_nodes.extend(p)

        self.functions[fname] = [enter_node, enter_node.return_nodes]
        self.functions_node[enter_node.lineno()] = fname

        return myparents

Return

For return, we need to look up which function we have to return from.

class PyCFGExtractor(PyCFGExtractor):  
    def on_return(self, node, myparents):
        parent = myparents[0]

        val_node = self.walk(node.value, myparents)
        # on return look back to the function definition.
        while not hasattr(parent, 'return_nodes'):
            parent = parent.parents[0]
        assert hasattr(parent, 'return_nodes')

        p = CFGNode(parents=val_node, ast=node)

        # make the break one of the parents of label node.
        parent.return_nodes.append(p)

        # return doesnt have immediate children
        return []

Example

x = 1
def my_fn(v1, v2):
    if v1 > v2:
        return v1
    else:
        return v2
y = 2
%3 0 start 1 x = 1 0->1 15 y = 2 1->15 3 <define>: my_fn 8 if: (v1 > v2) 3->8 4 <exit>: my_fn 11 return v1 11->4 14 return v2 14->4 8->11 8->14