We previous discussed how to extract the control flow graph from the Python AST. However, the Python AST can change from version to version, with the introduction of new control flow structures. The byte code, in comparison, stays relatively stable. Hence, we can use the bytecode to recover the control flow graph too.

First, we need the following imports. The dis package gives us access to the Python disassembly, and pygraphviz lets us draw.

import sys
import dis
import pygraphviz

Next, we define the node in the graph.

class CFGNode:
    def __init__(self, i, bid):
        self.i = i
        self.bid = bid
        self.children = []
        self.props = {}
    def add_child(self, n):
        self.children.append(n)

Now, we come to the main class

class CFG:
    def __init__(self, myfn):
        self.myfn = myfn

A small helper

class CFG(CFG):
    def lstadd(self, hmap, key, val):
        if key not in hmap:
            hmap[key] = [val]
        else:
            hmap[key].append(val)   

We define an entry point for the graph

class CFG(CFG):
    def entry(self):
        return CFGNode(dis.Instruction('NOP', opcode=dis.opmap['NOP'],
                                        arg=0, argval=0, argrepr=0,
                                        offset=0,starts_line=0, is_jump_target=False), 0)

Next, the meat of generation. We look handle the jump instructions specifically. Everything else is a simple block.

class CFG(CFG):
    def gen(self):
        enter = self.entry()
        last = enter
        self.jump_to = {}
        self.opcodes = {}
        for i,ins in enumerate(dis.get_instructions(self.myfn)):
            byte = i * 2
            node = CFGNode(ins, byte)
            self.opcodes[byte] = node
            #print(i,ins)
            if ins.opname in ['LOAD_CONST',
                              'LOAD_FAST',
                              'STORE_FAST',
                              'COMPARE_OP',
                              'INPLACE_ADD',
                              'INPLACE_SUBTRACT',
                              'RETURN_VALUE',
                              'BINARY_MODULO',
                              'POP_BLOCK']:
                last.add_child(node)
                last = node
            elif ins.opname == 'POP_JUMP_IF_FALSE':
                #print("will jump to", ins.arg)
                self.lstadd(self.jump_to, ins.arg, node)
                node.props['jmp'] = True
                last.add_child(node)
                last = node
            elif ins.opname == 'JUMP_FORWARD':
                node.props['jmp'] = True
                self.lstadd(self.jump_to, (i+1)*2 + ins.arg, node)
                #print("will jump to", (i+1)*2 + ins.arg)
                last.add_child(node)
                last = node
            elif ins.opname == 'SETUP_LOOP':
                #print("setuploop: ", byte , ins.arg)
                last.add_child(node)
                last = node
            elif ins.opname == 'JUMP_ABSOLUTE':
                #print("will jump to", ins.arg)
                self.lstadd(self.jump_to, ins.arg, node)
                node.props['jmp'] = True
                last.add_child(node)
                last = node
            else:
                assert False

We need to fix the jumps now.

class CFG(CFG):
    def fix_jumps(self):
        for byte in self.opcodes:
            if  byte in self.jump_to:
                node = self.opcodes[byte]
                assert node.i.is_jump_target
                for b in self.jump_to[byte]:
                    b.add_child(node)

Making the graph to be displayed.

class CFG(CFG):
    def to_graph(self):
        self.gen()
        self.fix_jumps()
        G = pygraphviz.AGraph(directed=True)
        for nid, cnode in self.opcodes.items():
            G.add_node(cnode.bid)
            n = G.get_node(cnode.bid)
            n.attr['label'] = "%d: %s" % (nid, cnode.i.opname)
            for cn in cnode.children:
                G.add_edge(cnode.bid, cn.bid)
        return G

This finishes our implementation. We need a simple test program

def gcd(a, b):
    if a<b:
        c = a
        a = b
        b = c

    while b != 0 :
        c = a
        a = b
        b = c % b
    return a

The CFG can be extracted as follows

from IPython.display import Image
v = CFG(gcd)
g = v.to_graph()
Image(g.draw(format='png', prog='dot'))
0 0: LOAD_FAST 2 2: LOAD_FAST 0->2 4 4: COMPARE_OP 2->4 6 6: POP_JUMP_IF_FALSE 4->6 8 8: LOAD_FAST 6->8 20 20: SETUP_LOOP 6->20 10 10: STORE_FAST 8->10 22 22: LOAD_FAST 20->22 12 12: LOAD_FAST 10->12 24 24: LOAD_CONST 22->24 14 14: STORE_FAST 12->14 16 16: LOAD_FAST 14->16 18 18: STORE_FAST 16->18 18->20 26 26: COMPARE_OP 24->26 28 28: POP_JUMP_IF_FALSE 26->28 30 30: LOAD_FAST 28->30 48 48: POP_BLOCK 28->48 32 32: STORE_FAST 30->32 50 50: LOAD_FAST 48->50 34 34: LOAD_FAST 32->34 52 52: RETURN_VALUE 50->52 36 36: STORE_FAST 34->36 38 38: LOAD_FAST 36->38 40 40: LOAD_FAST 38->40 42 42: BINARY_MODULO 40->42 44 44: STORE_FAST 42->44 46 46: JUMP_ABSOLUTE 44->46 46->22 46->48