new graphlib.TopologicalSorter class Topological Ordering

Lib/graphlib.py

  1 __all__ = ["TopologicalSorter", "CycleError"]
  2
  3 _NODE_OUT = -1
  4 _NODE_DONE = -2
  5
  6
  7 class _NodeInfo:
  8     __slots__ = "node", "npredecessors", "successors"
  9
 10     def __init__(self, node):
 11         # The node this class is augmenting.
 12         self.node = node
 13
 14         # Number of predecessors, generally >= 0. When this value falls to 0,
 15         # and is returned by get_ready(), this is set to _NODE_OUT and when the
 16         # node is marked done by a call to done(), set to _NODE_DONE.
 17         self.npredecessors = 0
 18
 19         # List of successor nodes. The list can contain duplicated elements as
 20         # long as they're all reflected in the successor's npredecessors attribute).
 21         self.successors = []
 22
 23
 24 class CycleError(ValueError):
 25     """Subclass of ValueError raised by TopologicalSorterif cycles exist in the graph
 26
 27     If multiple cycles exist, only one undefined choice among them will be reported
 28     and included in the exception. The detected cycle can be accessed via the second
 29     element in the *args* attribute of the exception instance and consists in a list
 30     of nodes, such that each node is, in the graph, an immediate predecessor of the
 31     next node in the list. In the reported list, the first and the last node will be
 32     the same, to make it clear that it is cyclic.
 33     """
 34
 35     pass
 36
 37
 38 class TopologicalSorter:
 39     """Provides functionality to topologically sort a graph of hashable nodes"""
 40
 41     def __init__(self, graph=None):
 42         self._node2info = {}
 43         self._ready_nodes = None
 44         self._npassedout = 0
 45         self._nfinished = 0
 46
 47         if graph is not None:
 48             for node, predecessors in graph.items():
 49                 self.add(node, *predecessors)
 50
 51     def _get_nodeinfo(self, node):
 52         if (result := self._node2info.get(node)) is None:
 53             self._node2info[node] = result = _NodeInfo(node)
 54         return result
 55
 56     def add(self, node, *predecessors):
 57         """Add a new node and its predecessors to the graph.
 58
 59         Both the *node* and all elements in *predecessors* must be hashable.
 60
 61         If called multiple times with the same node argument, the set of dependencies
 62         will be the union of all dependencies passed in.
 63
 64         It is possible to add a node with no dependencies (*predecessors* is not provided)
 65         as well as provide a dependency twice. If a node that has not been provided before
 66         is included among *predecessors* it will be automatically added to the graph with
 67         no predecessors of its own.
 68
 69         Raises ValueError if called after "prepare".
 70         """
 71         if self._ready_nodes is not None:
 72             raise ValueError("Nodes cannot be added after a call to prepare()")
 73
 74         # Create the node -> predecessor edges
 75         nodeinfo = self._get_nodeinfo(node)
 76         nodeinfo.npredecessors += len(predecessors)
 77
 78         # Create the predecessor -> node edges
 79         for pred in predecessors:
 80             pred_info = self._get_nodeinfo(pred)
 81             pred_info.successors.append(node)
 82
 83     def prepare(self):
 84         """Mark the graph as finished and check for cycles in the graph.
 85
 86         If any cycle is detected, "CycleError" will be raised, but "get_ready" can
 87         still be used to obtain as many nodes as possible until cycles block more
 88         progress. After a call to this function, the graph cannot be modified and
 89         therefore no more nodes can be added using "add".
 90         """
 91         if self._ready_nodes is not None:
 92             raise ValueError("cannot prepare() more than once")
 93
 94         self._ready_nodes = [
 95             i.node for i in self._node2info.values() if i.npredecessors == 0
 96         ]
 97         # ready_nodes is set before we look for cycles on purpose:
 98         # if the user wants to catch the CycleError, that's fine,
 99         # they can continue using the instance to grab as many
100         # nodes as possible before cycles block more progress
101         cycle = self._find_cycle()
102         if cycle:
103             raise CycleError(f"nodes are in a cycle", cycle)
104
105     def get_ready(self):
106         """Return a tuple of all the nodes that are ready.
107
108         Initially it returns all nodes with no predecessors; once those are marked
109         as processed by calling "done", further calls will return all new nodes that
110         have all their predecessors already processed. Once no more progress can be made,
111         empty tuples are returned.
112
113         Raises ValueError if called without calling "prepare" previously.
114         """
115         if self._ready_nodes is None:
116             raise ValueError("prepare() must be called first")
117
118         # Get the nodes that are ready and mark them
119         result = tuple(self._ready_nodes)
120         n2i = self._node2info
121         for node in result:
122             n2i[node].npredecessors = _NODE_OUT
123
124         # Clean the list of nodes that are ready and update
125         # the counter of nodes that we have returned.
126         self._ready_nodes.clear()
127         self._npassedout += len(result)
128
129         return result
130
131     def is_active(self):
132         """Return True if more progress can be made and ``False`` otherwise.
133
134         Progress can be made if cycles do not block the resolution and either there
135         are still nodes ready that haven't yet been returned by "get_ready" or the
136         number of nodes marked "done" is less than the number that have been returned
137         by "get_ready".
138
139         Raises ValueError if called without calling "prepare" previously.
140         """
141         if self._ready_nodes is None:
142             raise ValueError("prepare() must be called first")
143         return self._nfinished < self._npassedout or bool(self._ready_nodes)
144
145     def __bool__(self):
146         return self.is_active()
147
148     def done(self, *nodes):
149         """Marks a set of nodes returned by "get_ready" as processed.
150
151         This method unblocks any successor of each node in *nodes* for being returned
152         in the future by a a call to "get_ready"
153
154         Raises :exec:`ValueError` if any node in *nodes* has already been marked as
155         processed by a previous call to this method, if a node was not added to the
156         graph by using "add" or if called without calling "prepare" previously or if
157         node has not yet been returned by "get_ready".
158         """
159
160         if self._ready_nodes is None:
161             raise ValueError("prepare() must be called first")
162
163         n2i = self._node2info
164
165         for node in nodes:
166
167             # Check if we know about this node (it was added previously using add()
168             if (nodeinfo := n2i.get(node)) is None:
169                 raise ValueError(f"node {node!r} was not added using add()")
170
171             # If the node has not being returned (marked as ready) previously, inform the user.
172             stat = nodeinfo.npredecessors
173             if stat != _NODE_OUT:
174                 if stat >= 0:
175                     raise ValueError(
176                         f"node {node!r} was not passed out (still not ready)"
177                     )
178                 elif stat == _NODE_DONE:
179                     raise ValueError(f"node {node!r} was already marked done")
180                 else:
181                     assert False, f"node {node!r}: unknown status {stat}"
182
183             # Mark the node as processed
184             nodeinfo.npredecessors = _NODE_DONE
185
186             # Go to all the successors and reduce the number of predecessors, collecting all the ones
187             # that are ready to be returned in the next get_ready() call.
188             for successor in nodeinfo.successors:
189                 successor_info = n2i[successor]
190                 successor_info.npredecessors -= 1
191                 if successor_info.npredecessors == 0:
192                     self._ready_nodes.append(successor)
193             self._nfinished += 1
194
195     def _find_cycle(self):
196         n2i = self._node2info
197         stack = []
198         itstack = []
199         seen = set()
200         node2stacki = {}
201
202         for node in n2i:
203             if node in seen:
204                 continue
205
206             while True:
207                 if node in seen:
208                     # If we have seen already the node and is in the
209                     # current stack we have found a cycle.
210                     if node in node2stacki:
211                         return stack[node2stacki[node] :] + [node]
212                     # else go on to get next successor
213                 else:
214                     seen.add(node)
215                     itstack.append(iter(n2i[node].successors).__next__)
216                     node2stacki[node] = len(stack)
217                     stack.append(node)
218
219                 # Backtrack to the topmost stack entry with
220                 # at least another successor.
221                 while stack:
222                     try:
223                         node = itstack[-1]()
224                         break
225                     except StopIteration:
226                         del node2stacki[stack.pop()]
227                         itstack.pop()
228                 else:
229                     break
230         return None
231
232     def static_order(self):
233         """Returns an iterable of nodes in a topological order.
234
235         The particular order that is returned may depend on the specific
236         order in which the items were inserted in the graph.
237
238         Using this method does not require to call "prepare" or "done". If any
239         cycle is detected, :exc:`CycleError` will be raised.
240         """
241         self.prepare()
242         while self.is_active():
243             node_group = self.get_ready()
244             yield from node_group
245             self.done(*node_group)

Python 3.9 graphlib review

I had a look at the TopologicalSorter class and it seems to mix the graph algorithm in with a use case in a less than ideal way.

  • If given a graph of dependencies it will give one ordering that can satisfy the dependencies via its static_order() method only.

  • It supports one way to add parallelism.

I had written a Topological Sort task on Rosetta Code, a Python implementation, and had been both implementing and using T-sorts to compile Electronics Design Automation libraries for over a decade, (maybe 20+ years).

Below, I make slight alterations to the RC toposort2 function to create toposort3, then compare it to the new graphlib.TopologicalSorter class .

The Modules A, B, C, D example

They give the following input graph for T-sorting:

graph = {“D”: {“B”, “C”},

“C”: {“A”}, “B”: {“A”}}

Or more colloquially: D depends on, (or must come after), both B and C; C depends on A and B depends on A.

The static_order ordering given is:

('A', 'C', 'B', 'D')

Now those dependencies have only a “looser” ordering of the set of nodes B and C. Nodes B and C can be swapped and it will still satisfy the dependencies. B and C can be implemented in parallel in fact.

My routine, instead of returning a iterator over individual items, instead iterates over sets of items.

  • All items in each set must be completed/ordered before all those of a preceding set from the iterator.

  • The items in each set may be ordered/completed in any way or completed in parallel.

My routines result iterator if turned into a tuple would yield:

({'A'}, {'B', 'C'}, {'D'})

If this were a calculated compilation order for Verilog and VHDL libraries then it clearly shows the opportunity for parallelism in compiling B and C and that a maximum of two libraries can be compiled in parallel.

RosettaCode task Example

The RosettaCode task graph shows even more scope for parallelism.

Here is the task data run through the new module and my toposort function:

 1 from functools import reduce
 2 from pprint import pprint as pp
 3 from collections import defaultdict
 4
 5 from graphlib import TopologicalSorter
 6
 7 # LIBRARY     mapped_to     LIBRARY DEPENDENCIES
 8 data = {
 9     'des_system_lib':   set('std synopsys std_cell_lib des_system_lib dw02 dw01 ramlib ieee'.split()),
10     'dw01':             set('ieee dw01 dware gtech'.split()),
11     'dw02':             set('ieee dw02 dware'.split()),
12     'dw03':             set('std synopsys dware dw03 dw02 dw01 ieee gtech'.split()),
13     'dw04':             set('dw04 ieee dw01 dware gtech'.split()),
14     'dw05':             set('dw05 ieee dware'.split()),
15     'dw06':             set('dw06 ieee dware'.split()),
16     'dw07':             set('ieee dware'.split()),
17     'dware':            set('ieee dware'.split()),
18     'gtech':            set('ieee gtech'.split()),
19     'ramlib':           set('std ieee'.split()),
20     'std_cell_lib':     set('ieee std_cell_lib'.split()),
21     'synopsys':         set(),
22     }
23 for k, v in data.items():
24     v.discard(k)   # Ignore self dependencies
25
26
27 # LIBRARY     mapped_to     LIBRARY PREDECESSORS
28 data_invert = defaultdict(set)
29 for lib, deps in data.items():
30     for dep in deps:
31         data_invert[dep].add(lib)
32
33
34 ts = TopologicalSorter(data)
35 graphlib_order = tuple(ts.static_order())
36
37 def toposort3(data):
38     extra_items_in_deps = reduce(set.union, data.values()) - set(data.keys())
39     data.update({item:set() for item in extra_items_in_deps})
40     while True:
41         ordered = set(item for item,dep in data.items() if not dep)
42         if not ordered:
43             break
44         yield ordered
45         data = {item: (dep - ordered) for item,dep in data.items()
46                 if item not in ordered}
47     assert not data, "A cyclic dependency exists amongst %r" % data
48
49 paddys_order = tuple(toposort3(data))
50
51 print('Python 3.9 graphlib gives a topological ordering:')
52 pp(graphlib_order)
53
54 print('\nMy topological ordering of sets that must be in tuple order, but each item in an individual set can be executed in any order for that set, or in *parallel*:')
55 pp(paddys_order)

Output:

python 3.9 graphlib gives a topological ordering:
('des_system_lib',
 'dw03',
 'dw07',
 'dw06',
 'dw04',
 'dw05',
 'ramlib',
 'std_cell_lib',
 'synopsys',
 'dw02',
 'dw01',
 'std',
 'dware',
 'gtech',
 'ieee')

My topological ordering of sets that must be in tuple order, but each item in an individual set can be executed in any order for that set, or in *parallel*:
({'synopsys', 'std', 'ieee'},
 {'dware', 'ramlib', 'gtech', 'std_cell_lib'},
 {'dw07', 'dw02', 'dw06', 'dw01', 'dw05'},
 {'des_system_lib', 'dw04', 'dw03'})

From the output above, my routine doesn’t hide what can be run in parallel, and encompasses every possible ordering of items, wheras the graphlib module only shows one ordering, and if the graphlib modules parallelism support is used, it is more difficult to get an idea of the opportunities for parallelism rather than just starting up another task.

I can see that I can use a maximum of five and a minimum of 3 tasks in parallel and that those parallel tasks can be executed in a minimum of four sequential steps.

So, That’s why and how I think graphlib could be better.