We previously discussed how one can write an interpreter for Python. We hinted at that time that the machinery could be used for a variety of other applications, including exctracting the call and control flow graph. In this post, we will show how one can extract the control flow graph using such an interpteter.

A control flow graph is a directed graph data structure that encodes all paths that may be traversed through a program. That is, in some sense, it is an abstract view of the interpreter as a whole.

This implementation is based on the fuzzingbook CFG appendix However, the fuzzingbook implementation is focused on Python statements as it is used primarily for visualization, while this is based on basic blocks with the intension of using it for code generation.

Control flow graphs are useful for a variety of tasks. They are one of the most frequently used tools for visualization. But more imporatntly it is the starting point for further analysis of the program including code generation, optimizations, and other static analysis techniques.

Prerequisites

import ast
import re
import astunparse


We define a simple viewing function for visualization

import graphviz

def get_color(p, c):
color='black'
while not p.annotation():
if p.label == 'if:True':
return 'blue'
elif p.label == 'if:False':
return 'red'
p = p.parents[0]
return color

def get_peripheries(p):
annot = p.annotation()
if annot  in {'<start>', '<stop>'}:
return '2'
if annot.startswith('<define>') or annot.startswith('<exit>'):
return '2'
return '1'

def get_shape(p):
annot = p.annotation()
if annot in {'<start>', '<stop>'}:
return 'oval'
if annot.startswith('<define>') or annot.startswith('<exit>'):
return 'oval'

if annot.startswith('if:'):
return 'diamond'
else:
return 'rectangle'

def to_graph(registry, arcs=[], comment='', get_shape=lambda n: 'rectangle', get_peripheries=lambda n: '1', get_color=lambda p,c: 'black'):
graph = Digraph(comment=comment)
for nid, cnode in registry.items():
if not cnode.annotation():
continue
sn = cnode.annotation()
graph.node(cnode.name(), sn, shape=get_shape(cnode), peripheries=get_peripheries(cnode))
for pn in cnode.parents:
gp = pn.get_gparent_id()
color = get_color(pn, cnode)
graph.edge(gp, str(cnode.rid), color=color)
return graph


The CFGNode

The control flow graph is a graph, and hence we need a data structue for the node. We need to store the parents of this node, the children of this node, and register itself in the registery.

class CFGNode:
counter = 0
registry = {}
stack = []
def __init__(self, parents=[], ast=None, label=None, annot=None):
self.parents = parents
self.calls = []
self.children = []
self.ast_node = ast
self.label = label
self.annot = annot
self.rid  = CFGNode.counter
CFGNode.registry[self.rid] = self
CFGNode.counter += 1


Given that it is a directed graph node, we need the ability to add parents and children.

class CFGNode(CFGNode):
if c not in self.children:
self.children.append(c)

if p not in self.parents:
self.parents.append(p)

for p in ps:

mid = None
if hasattr(func, 'id'): # ast.Name
mid = func.id
else: # ast.Attribute
mid = func.value.id
self.calls.append(mid)


A few convenience methods to make our life simpler.

class CFGNode(CFGNode):
def __eq__(self, other):
return self.rid == other.rid

def __neq__(self, other):
return self.rid != other.rid

def lineno(self):
return self.ast_node.lineno if hasattr(self.ast_node, 'lineno') else 0

def name(self):
return str(self.rid)

def expr(self):
return self.source()

def __str__(self):
return "id:%d line[%d] parents: %s : %s" % \
(self.rid, self.lineno(), str([p.rid for p in self.parents]), self.source())

def __repr__(self):
return str(self)

def source(self):
return astunparse.unparse(self.ast_node).strip()

def annotation(self):
if self.annot is not None:
return self.annot
return self.source()

def to_json(self):
return {'id':self.rid, 'parents': [p.rid for p in self.parents],
'children': [c.rid for c in self.children],
'calls': self.calls, 'at':self.lineno() ,'ast':self.source()}

def get_gparent_id(self):
p = CFGNode.registry[self.rid]
while not p.annotation():
p = p.parents[0]
return str(p.rid)


The usage is as below:

start = CFGNode(parents=[], ast=ast.parse('start').body)
g = to_graph(CFGNode.registry, get_color=get_color, get_peripheries=get_peripheries, get_shape=get_shape)
g.format = 'svg'
print(g.pipe().decode())


Extracting the control flow

The control flow graph is essentially a source code walker, and shares the basic structure with our interpreter. It can indeed inherit from the interpreter, but given that we override all functions in it, we chose not to inherit.

class PyCFGExtractor:
def __init__(self):
self.founder = CFGNode(parents=[], ast=ast.parse('start').body[0]) # sentinel
self.founder.ast_node.lineno = 0
self.functions = {}
self.functions_node = {}


As before, we define walk() that walks a given AST node.

A major difference from MCI is in the functions that handle each node. Since it is a directed graph traversal, our walk() accepts a list of parent nodes that point to this node, and also invokes the various on_*() functions with the same list. These functions in turn return a list of nodes that exit them.

While expressions, and single statements only have one node that comes out of them, control flow structures and function calls can have multiple nodes that come out of them going into the next node. For example, an If statement will have a node from both the if.body and if.orelse going into the next one.

class PyCFGExtractor(PyCFGExtractor):
def walk(self, node, myparents):
if node is None: return
fname = "on_%s" % node.__class__.__name__.lower()
if hasattr(self, fname):
return getattr(self, fname)(node, myparents)
raise SyntaxError('walk: Not Implemented in %s' % type(node))


Pass

The pass statement is trivial. It simply adds one more node.

class PyCFGExtractor(PyCFGExtractor):
def on_pass(self, node, myparents):
p = [CFGNode(parents=myparents, ast=node)]
return p


Here is the CFG from a single pass statement.

pass


Module(stmt* body)

We start by defining the Module. A python module is composed of a sequence of statements, and the graph is a linear path through these statements. That is, each time a statement is executed, we make a link from it to the next statement.

class PyCFGExtractor(PyCFGExtractor):
def on_module(self, node, myparents):
p = myparents
for n in node.body:
p = self.walk(n, p)
return p


Here is the CFG from the following which is wrapped in a module

pass
pass


Primitives

How should we handle primitives? Since they are simply interpreted as is, they can be embedded right in.

class PyCFGExtractor(PyCFGExtractor):
def on_str(self, node, myparents):
p = [CFGNode(parents=myparents, ast=node, annot='')]
return p

def on_num(self, node, myparents):
p = [CFGNode(parents=myparents, ast=node, annot='')]
return p


They however, are simple expressions

class PyCFGExtractor(PyCFGExtractor):
def on_expr(self, node, myparents):
p = self.walk(node.value, myparents)
p = [CFGNode(parents=p, ast=node)]
return p


Generating the following CFG

10
'a'


Arithmetic expressions

The following implements the arithmetic expressions. The unaryop() simply walks the arguments and adds the current node to the chain. The binop() has to walk the left argument, then walk the right argument, and finally insert the current node in the chain. compare() is again similar to binop(). expr(), again has only one argument to walk, and one node out of it.

class PyCFGExtractor(PyCFGExtractor):
def on_unaryop(self, node, myparents):
p = [CFGNode(parents=myparents, ast=node, annot='')]
return self.walk(node.operand, p)

def on_binop(self, node, myparents):
left = self.walk(node.left, myparents)
right = self.walk(node.right, left)
p = [CFGNode(parents=right, ast=node, annot='')]
return p

def on_compare(self, node, myparents):
left = self.walk(node.left, myparents)
right = self.walk(node.comparators[0], left)
p = [CFGNode(parents=right, ast=node, annot='')]
return p


CFG for this expression

10+1


Assign(expr* targets, expr value)

Unlike MCI, assignment is simple as it has only a single node coming out of it.

The following are not yet implemented:

• AugAssign(expr target, operator op, expr value)
• AnnAssign(expr target, expr annotation, expr? value, int simple)

Further, we do not yet implement parallel assignments.

class PyCFGExtractor(PyCFGExtractor):
def on_assign(self, node, myparents):
if len(node.targets) > 1: raise NotImplemented('Parallel assignments')
p = [CFGNode(parents=myparents, ast=node)]
p = self.walk(node.value, p)
return p


Example

a = 10+1


Name

class PyCFGExtractor(PyCFGExtractor):
def on_name(self, node, myparents):
p = [CFGNode(parents=myparents, ast=node, annot='')]
return p


Control structures

If

We now come to the control structures. For the if statement, we have two parallel paths. We first evaluate the test expression, then add a new node corresponding to the if statement, and provide the paths through the if.body and if.orelse.

class PyCFGExtractor(PyCFGExtractor):
class PyCFGExtractor(PyCFGExtractor):
def on_if(self, node, myparents):
p = self.walk(node.test, myparents)
test_node = [CFGNode(parents=p, ast=node, annot="if: %s" % astunparse.unparse(node.test).strip())]
g1 = test_node
g_true = [CFGNode(parents=g1, ast=None, label="if:True", annot='')]
g1 = g_true
for n in node.body:
g1 = self.walk(n, g1)
g2 = test_node
g_false = [CFGNode(parents=g2, ast=None, label="if:False", annot='')]
g2 = g_false
for n in node.orelse:
g2 = self.walk(n, g2)
return g1 + g2


Example

a = 1
if a>1:
a = 1
else:
a = 0


While

The while statement is more complex than the if statement. For one, we need to provide a way to evaluate the condition at the beginning of each iteration.

Essentially, given something like this:

while x > 0:
statement1
if x:
continue;
if y:
break
statement2


We need to expand this into:

lbl1: v = x > 0
lbl2: if not v: goto lbl2
statement1
if x: goto lbl1
if y: goto lbl3
statement2
goto lbl1
lbl3: ...



The basic idea is that when we walk the node.body, if there is a break statement, it will start searching up the parent chain, until it finds a node with loop_entry label. Then it will attach itself to the exit_nodes as one of the exits.

    def on_while(self, node, myparents):
loop_id = CFGNode.counter
lbl1_node = CFGNode(parents=myparents, ast=node, label='loop_entry', annot='%s:while' % loop_id)
p = self.walk(node.test, [lbl1_node])

lbl2_node = CFGNode(parents=p, ast=node.test, label='while:test',
annot='if: %s' % astunparse.unparse(node.test).strip())
g_false = CFGNode(parents=[lbl2_node], ast=None, label="if:False", annot='')
g_true = CFGNode(parents=[lbl2_node], ast=None, label="if:True", annot='')
lbl1_node.exit_nodes = [g_false]

p = [g_true]

for n in node.body:
p = self.walk(n, p)

# the last node is the parent for the lb1 node.

return lbl1_node.exit_nodes


Example

x = 1
while x > 0:
x = x -1
y = x


Break

As we explained before, the break when it is encountred, looks up the parent chain. Once it finds a parent that has the loop_entry label, it attaches itself to that parent. The statements following the break are not its immediate children. Hence, we return an empty list.

class PyCFGExtractor(PyCFGExtractor):
def on_break(self, node, myparents):
parent = myparents[0]
while parent.label != 'loop_entry':
parent = parent.parents[0]

assert hasattr(parent, 'exit_nodes')
p = CFGNode(parents=myparents, ast=node)

# make the break one of the parents of label node.
parent.exit_nodes.append(p)

# break doesnt have immediate children
return []


Example

x = 1
while x > 0:
if x > 1:
break
x = x -1
y = x


Continue

Continue is similar to break, except that it has to restart the loop. Hence, it adds itself as a parent to the node with loop_entry attribute. As like break, execution does not proceed to the lexically next statement after continue. Hence, we return an empty set.

class PyCFGExtractor(PyCFGExtractor):
def on_continue(self, node, myparents):
parent = myparents[0]
while parent.label != 'loop_entry':
parent = parent.parents[0]

p = CFGNode(parents=myparents, ast=node)

return []


Example

x = 1
while x > 0:
if x > 1:
continue
x = x -1
y = x


For

The For statement in Python is rather complex. Given a for loop as below

for i in my_expr:
statement1
statement2


This has to be extracted to the following:

lbl1:
__iv = iter(my_expr)
lbl2: if __iv.length_hint() > 0: goto lbl3
i = next(__iv)
statement1
statement2
lbl3: ...


We need on_call() for implementing on_for(). Essentially, we walk through the arguments, then add a node corresponding to the call to the parents.

class PyCFGExtractor(PyCFGExtractor):
def on_call(self, node, myparents):
p = myparents
for a in node.args:
p = self.walk(a, p)
p = [CFGNode(parents=p, ast=node, label='call', annot='')]
return p

class PyCFGExtractor(PyCFGExtractor):
def on_for(self, node, myparents):
#node.target in node.iter: node.body
loop_id = CFGNode.counter

for_pre = CFGNode(parents=myparents, ast=None, label='for_pre', annot='')

init_node = ast.parse('__iv_%d = iter(%s)' % (loop_id, astunparse.unparse(node.iter).strip())).body[0]
p = self.walk(init_node, [for_pre])

lbl1_node = CFGNode(parents=p, ast=node, label='loop_entry', annot='%s: for' % loop_id)
_test_node = ast.parse('__iv_%d.__length__hint__() > 0' % loop_id).body[0].value
p = self.walk(_test_node, [lbl1_node])

lbl2_node = CFGNode(parents=p, ast=_test_node, label='for:test', annot='for: %s' % astunparse.unparse(_test_node).strip())
g_false = CFGNode(parents=[lbl2_node], ast=None, label="if:False", annot='')
g_true = CFGNode(parents=[lbl2_node], ast=None, label="if:True", annot='')
lbl1_node.exit_nodes = [g_false]

p = [g_true]

# now we evaluate the body, one at a time.
for n in node.body:
p = self.walk(n, p)

# the test node is looped back at the end of processing.

return lbl1_node.exit_nodes


Example

x = 1
for i in val:
x = x -1
y = x

x = 1
for i in val:
if x > 1:
break
x = x -1
y = x

x = 1
for i in val:
if x > 1:
continue
x = x -1
y = x


FunctionDef

When defining a function, we should define the return_nodes for the return statement to hook into. Further, we should also register our functions.

Next, we have to decide: Do we want the call graph of the function definition to be attached to the previous statements? In Python, the function definition itself is independent of the previous statements. Hence, here, we choose not to have parents for the definition.

DEFS_HAVE_PARENTS = False

class PyCFGExtractor(PyCFGExtractor):
def on_functiondef(self, node, myparents):
# name, args, body, decorator_list, returns
fname = node.name
args = node.args
returns = node.returns
p = myparents if DEFS_HAVE_PARENTS else []
enter_node = CFGNode(parents=p, ast=node, label='enter',
annot='<define>: %s' % node.name)
enter_node.return_nodes = [] # sentinel

p = [enter_node]
for n in node.body:
p = self.walk(n, p)

enter_node.return_nodes.extend(p)

self.functions[fname] = [enter_node, enter_node.return_nodes]
self.functions_node[enter_node.lineno()] = fname

return myparents


Return

For return, we need to look up which function we have to return from.

class PyCFGExtractor(PyCFGExtractor):
def on_return(self, node, myparents):
parent = myparents[0]

val_node = self.walk(node.value, myparents)
# on return look back to the function definition.
while not hasattr(parent, 'return_nodes'):
parent = parent.parents[0]
assert hasattr(parent, 'return_nodes')

p = CFGNode(parents=val_node, ast=node)

# make the break one of the parents of label node.
parent.return_nodes.append(p)

# return doesnt have immediate children
return []


Example

x = 1
def my_fn(v1, v2):
if v1 > v2:
return v1
else:
return v2
y = 2