Source code for allegedb.graph

# This file is part of allegedb, an object relational mapper for versioned graphs.
# Copyright (C) Zachary Spector. public@zacharyspector.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""allegedb's special implementations of the NetworkX graph objects"""
import networkx
from networkx.exception import NetworkXError
from collections import defaultdict
from .wrap import MutableMappingUnwrapper

[docs]class EntityCollisionError(ValueError): """For when there's a discrepancy between the kind of entity you're creating and the one by the same name"""
[docs]def getatt(attribute_name): """An easy way to make an alias""" from operator import attrgetter return property(attrgetter(attribute_name))
[docs]def convert_to_networkx_graph(data, create_using=None, multigraph_input=False): """Convert an AllegedGraph to the corresponding NetworkX graph type.""" if isinstance(data, AllegedGraph): result = networkx.convert.from_dict_of_dicts( data.adj, create_using=create_using, multigraph_input=data.is_multigraph() ) result.graph = dict(data.graph) result.node = {k: dict(v) for k, v in data.node.items()} return result return networkx.convert.to_networkx_graph( data, create_using, multigraph_input )
_alleged_receivers = defaultdict(list)
[docs]class AllegedMapping(MutableMappingUnwrapper): """Common amenities for mappings""" __slots__ = ()
[docs] def connect(self, func): """Arrange to call this function whenever something changes here. The arguments will be this object, the key changed, and the value set. """ l = _alleged_receivers[id(self)] if func not in l: l.append(func)
[docs] def disconnect(self, func): """No longer call the function when something changes here.""" if id(self) not in _alleged_receivers: return l = _alleged_receivers[id(self)] try: l.remove(func) except ValueError: return if not l: del _alleged_receivers[id(self)]
[docs] def send(self, sender, **kwargs): """Internal. Call connected functions.""" if id(self) not in _alleged_receivers: return for func in _alleged_receivers[id(self)]: func(sender, **kwargs)
[docs] def clear(self): """Delete everything""" for k in list(self.keys()): del self[k]
[docs] def update(self, other, **kwargs): """Version of ``update`` that doesn't clobber the database so much""" from itertools import chain if hasattr(other, 'items'): other = other.items() for (k, v) in chain(other, kwargs.items()): if ( k not in self or self[k] != v ): self[k] = v
[docs]class AbstractEntityMapping(AllegedMapping): __slots__ = () def _get_cache(self, key, branch, turn, tick): raise NotImplementedError def _get_cache_now(self, key): return self._get_cache(key, *self.db.btt()) def _cache_contains(self, key, branch, turn, tick): raise NotImplementedError def _set_db(self, key, branch, turn, tick, value): """Set a value for a key in the database (not the cache).""" raise NotImplementedError def _set_cache(self, key, branch, turn, tick, value): raise NotImplementedError def _set_cache_now(self, key, value): branch, turn, tick = self.db.nbtt() self._set_cache(key, branch, turn, tick, value) def _del_db(self, key, branch, turn, tick): """Delete a key from the database (not the cache).""" self._set_db(key, branch, turn, tick, None) def _del_cache(self, key, branch, turn, tick): self._set_cache(key, branch, turn, tick, None) def __getitem__(self, key): """If key is 'graph', return myself as a dict, else get the present value of the key and return that """ def wrapval(v): from functools import partial from .wrap import DictWrapper, ListWrapper, SetWrapper if isinstance(v, list): return ListWrapper(partial(self._get_cache_now, key), partial(self._set_cache_now, key), self, key) elif isinstance(v, dict): return DictWrapper(partial(self._get_cache_now, key), partial(self._set_cache_now, key), self, key) elif isinstance(v, set): return SetWrapper(partial(self._get_cache_now, key), partial(self._set_cache_now, key), self, key) else: return v return wrapval(self._get_cache_now(key)) def __contains__(self, item): return self._cache_contains(item, *self.db.btt()) def __setitem__(self, key, value): """Set key=value at the present branch and revision""" if value is None: raise ValueError( "allegedb uses None to indicate that a key's been deleted" ) branch, turn, tick = self.db.nbtt() try: if self._get_cache(key, branch, turn, tick) != value: self._set_cache(key, branch, turn, tick, value) except KeyError: self._set_cache(key, branch, turn, tick, value) self._set_db(key, branch, turn, tick, value) self.send(self, key=key, value=value) def __delitem__(self, key): branch, turn, tick = self.db.nbtt() self._del_cache(key, branch, turn, tick) self._del_db(key, branch, turn, tick) self.send(self, key=key, value=None)
[docs]class GraphMapping(AbstractEntityMapping): """Mapping for graph attributes""" __slots__ = ('graph',) db = getatt('graph.db') def __init__(self, graph): super().__init__() self.graph = graph def __iter__(self): yield 'name' yield from self.db._graph_val_cache.iter_entity_keys( self.graph.name, *self.db.btt() ) def _cache_contains(self, key, branch, turn, tick): return self.db._graph_val_cache.contains_key( self.graph.name, key, branch, turn, tick ) def __len__(self): return 1 + self.db._graph_val_cache.count_entities( self.graph.name, *self.db.btt() ) def __getitem__(self, item): if item == 'name': return self.graph.name return super().__getitem__(item) def __setitem__(self, key, value): if key == 'name': raise KeyError("name cannot be changed after creation") super().__setitem__(key, value) def _get_cache(self, key, branch, turn, tick): return self.db._graph_val_cache.retrieve( self.graph.name, key, branch, turn, tick ) def _get(self, key): return self._get_cache(key, *self.db.btt()) def _set_db(self, key, branch, turn, tick, value): self.db.query.graph_val_set( self.graph.name, key, branch, turn, tick, value ) def _set_cache(self, key, branch, turn, tick, value): self.db._graph_val_cache.store( self.graph.name, key, branch, turn, tick, value ) def _del_db(self, key, branch, turn, tick): self.db.query.graph_val_del( self.graph.name, key, branch, turn, tick )
[docs] def clear(self): keys = list(self.keys()) keys.remove('name') for k in keys: del self[k]
[docs] def unwrap(self): return {k: v.unwrap() if hasattr(v, 'unwrap') else v for (k, v) in self.items()}
[docs]class Node(AbstractEntityMapping): """Mapping for node attributes""" __slots__ = ('graph', 'node', '__weakref__') db = getatt('graph.db') def __new__(cls, graph, node): if (graph.name, node) in graph.db._node_objs: ret = graph.db._node_objs[graph.name, node] if not isinstance(ret, cls): raise EntityCollisionError("Already have node {} in graph {}, but it's of class {}".format( node, graph.name, type(ret))) return ret return super(Node, cls).__new__(cls) def __init__(self, graph, node): """Store name and graph""" super().__init__() self.graph = graph self.node = node def __repr__(self): return "{}(graph={}, node={})".format(self.__class__.__name__, self.graph, self.node) def __str__(self): return "{}(graph={}, node={}, data={})".format(self.__class__.__name__, self.graph, self.node, repr(dict(self))) def __iter__(self): return self.db._node_val_cache.iter_entity_keys( self.graph.name, self.node, *self.db.btt() ) def _cache_contains(self, key, branch, turn, tick): return self.db._node_val_cache.contains_key( self.graph.name, self.node, key, branch, turn, tick ) def __len__(self): return self.db._node_val_cache.count_entity_keys( self.graph.name, self.node, *self.db.btt() ) def _get_cache(self, key, branch, turn, tick): return self.db._node_val_cache.retrieve( self.graph.name, self.node, key, branch, turn, tick ) def _set_db(self, key, branch, turn, tick, value): self.db.query.node_val_set( self.graph.name, self.node, key, branch, turn, tick, value ) def _set_cache(self, key, branch, turn, tick, value): self.db._node_val_cache.store( self.graph.name, self.node, key, branch, turn, tick, value )
[docs]class Edge(AbstractEntityMapping): """Mapping for edge attributes""" __slots__ = ('graph', 'orig', 'dest', 'idx', '__weakref__') db = getatt('graph.db') def __new__(cls, graph, orig, dest, idx=0): if (graph.name, orig, dest, idx) in graph.db._edge_objs: ret = graph.db._edge_objs[graph.name, orig, dest, idx] if not isinstance(ret, cls): raise EntityCollisionError( "Already have an edge {}->{}[{}] in graph {}, but of class {}".format( orig, dest, idx, graph.name, type(ret) ) ) return ret return super(Edge, cls).__new__(cls) def __init__(self, graph, orig, dest, idx=0): """Store the graph, the names of the nodes, and the index. For non-multigraphs the index is always 0. """ super().__init__() self.graph = graph self.orig = orig self.dest = dest self.idx = idx def __repr__(self): if self.idx == 0: return "{}(graph={}, orig={}, dest={})".format( self.__class__.__name__, self.graph, self.orig, self.dest ) return "{}(graph={}, orig={}, dest={}, idx={})".format( self.__class__.__name__, self.graph, self.orig, self.dest, self.idx ) def __str__(self): if self.idx == 0: return "{}(graph={}, orig={}, dest={}, data={})".format( self.__class__.__name__, self.graph, self.orig, self.dest, dict(self) ) else: return "{}(graph={}, orig={}, dest={}, idx={}, data={})".format( self.__class__.__name__, self.graph, self.orig, self.dest, self.idx, dict(self) ) def __iter__(self): return self.db._edge_val_cache.iter_entity_keys( self.graph.name, self.orig, self.dest, self.idx, *self.db.btt() ) def _cache_contains(self, key, branch, turn, tick): return self.db._edge_val_cache.contains_key( self.graph.name, self.orig, self.dest, self.idx, key, branch, turn, tick ) def __len__(self): return self.db._edge_val_cache.count_entity_keys( self.graph.name, self.orig, self.dest, self.idx, *self.db.btt() ) def _get_cache(self, key, branch, turn, tick): return self.db._edge_val_cache.retrieve( self.graph.name, self.orig, self.dest, self.idx, key, branch, turn, tick ) def _set_db(self, key, branch, turn, tick, value): self.db.query.edge_val_set( self.graph.name, self.orig, self.dest, self.idx, key, branch, turn, tick, value ) def _set_cache(self, key, branch, turn, tick, value): self.db._edge_val_cache.store( self.graph.name, self.orig, self.dest, self.idx, key, branch, turn, tick, value )
[docs]class GraphNodeMapping(AllegedMapping): """Mapping for nodes in a graph""" __slots__ = ('graph',) db = getatt('graph.db') def __init__(self, graph): super().__init__() self.graph = graph def __iter__(self): """Iterate over the names of the nodes""" return self.db._nodes_cache.iter_entities( self.graph.name, *self.db.btt() ) def __eq__(self, other): from collections import Mapping if not isinstance(other, Mapping): return NotImplemented if self.keys() != other.keys(): return False for k in self.keys(): me = self[k] you = other[k] if hasattr(me, 'unwrap'): me = me.unwrap() if hasattr(you, 'unwrap'): you = you.unwrap() if me != you: return False else: return True def __contains__(self, node): """Return whether the node exists presently""" return self.db._nodes_cache.contains_entity( self.graph.name, node, *self.db.btt() ) def __len__(self): """How many nodes exist right now?""" return self.db._nodes_cache.count_entities( self.graph.name, *self.db.btt() ) def __getitem__(self, node): """If the node exists at present, return it, else throw KeyError""" if node not in self: raise KeyError return self.db._get_node(self.graph, node) def __setitem__(self, node, dikt): """Only accept dict-like values for assignment. These are taken to be dicts of node attributes, and so, a new GraphNodeMapping.Node is made with them, perhaps clearing out the one already there. """ branch, turn, tick = self.db.nbtt() created = node not in self n = self.db._get_node(self.graph, node) n.clear() n.update(dikt) if created: self.db.query.exist_node( self.graph.name, node, branch, turn, tick, True ) self.send(self, node_name=node, exists=True) def __delitem__(self, node): """Indicate that the given node no longer exists""" if node not in self: raise KeyError("No such node") branch, turn, tick = self.db.nbtt() self.db.query.exist_node( self.graph.name, node, branch, turn, tick, False ) self.db._nodes_cache.store( self.graph.name, node, branch, turn, tick, False ) key = (self.graph.name, node) if node in self.db._node_objs: del self.db._node_objs[key] self.send(self, node_name=node, exists=False) def __eq__(self, other): """Compare values cast into dicts. As I serve the custom Node class, rather than dicts like networkx normally would, the normal comparison operation would not let you compare my nodes with regular networkx nodes-that-are-dicts. So I cast my nodes into dicts for this purpose, and cast the other argument's nodes the same way, in case it is a db graph. """ if not hasattr(other, 'keys'): return False if self.keys() != other.keys(): return False for k in self.keys(): if dict(self[k]) != dict(other[k]): return False return True
[docs]class GraphEdgeMapping(AllegedMapping): """Provides an adjacency mapping and possibly a predecessor mapping for a graph. """ __slots__ = ('graph',) _metacache = defaultdict(dict) @property def _cache(self): return self._metacache[id(self)] db = getatt('graph.db') def __init__(self, graph): super().__init__() self.graph = graph def __eq__(self, other): """Compare dictified versions of the edge mappings within me. As I serve custom Predecessor or Successor classes, which themselves serve the custom Edge class, I wouldn't normally be comparable to a networkx adjacency dictionary. Converting myself and the other argument to dicts allows the comparison to work anyway. """ if not hasattr(other, 'keys'): return False if self.keys() != other.keys(): return False for k in self.keys(): if dict(self[k]) != dict(other[k]): return False return True def __iter__(self): return iter(self.graph.node)
[docs]class AbstractSuccessors(GraphEdgeMapping): __slots__ = ('graph', 'container', 'orig') db = getatt('graph.db') _metacache = defaultdict(dict) @property def _cache(self): return self._metacache[id(self)] def __init__(self, container, orig): """Store container and node""" super().__init__(container.graph) self.container = container self.orig = orig def __iter__(self): """Iterate over node IDs that have an edge with my orig""" return self.db._edges_cache.iter_successors( self.graph.name, self.orig, *self.db.btt() ) def __contains__(self, dest): """Is there an edge leading to ``dest`` at the moment?""" return self.db._edges_cache.has_successor( self.graph.name, self.orig, dest, *self.db.btt() ) def __len__(self): """How many nodes touch an edge shared with my orig?""" return self.db._edges_cache.count_successors( self.graph.name, self.orig, *self.db.btt() ) def _make_edge(self, dest): return Edge(self.graph, self.orig, dest) def __getitem__(self, dest): """Get the edge between my orig and the given node""" if dest not in self: raise KeyError("No edge {}->{}".format(self.orig, dest)) return self.db._get_edge(self.graph, self.orig, dest, 0) def __setitem__(self, dest, value): """Set the edge between my orig and the given dest to the given value, a mapping. """ created = dest not in self branch, turn, tick = self.db.nbtt() self.db.query.exist_edge( self.graph.name, self.orig, dest, 0, branch, turn, tick, True ) self.db._edges_cache.store( self.graph.name, self.orig, dest, 0, branch, turn, tick, True ) e = self[dest] e.clear() e.update(value) if created: self.send(self, orig=self.orig, dest=dest, idx=0, exists=True) def __delitem__(self, dest): """Remove the edge between my orig and the given dest""" branch, turn, tick = self.db.nbtt() self.db.query.exist_edge( self.graph.name, self.orig, dest, 0, branch, turn, tick, False ) self.db._edges_cache.store( self.graph.name, self.orig, dest, 0, branch, turn, tick, None ) self.send(self, orig=self.orig, dest=dest, idx=0, exists=False)
[docs] def clear(self): """Delete every edge with origin at my orig""" for dest in list(self): del self[dest]
[docs]class GraphSuccessorsMapping(GraphEdgeMapping): """Mapping for Successors (itself a MutableMapping)""" __slots__ = ('graph',)
[docs] class Successors(AbstractSuccessors): __slots__ = ('graph', 'container', 'orig') def _order_nodes(self, dest): if dest < self.orig: return (dest, self.orig) else: return (self.orig, dest)
def __getitem__(self, orig): if orig not in self: raise KeyError("No edges from {}".format(orig)) if orig not in self._cache: self._cache[orig] = self.Successors(self, orig) return self._cache[orig] def __setitem__(self, key, val): """Wipe out any edges presently emanating from orig and replace them with those described by val """ if key in self: sucs = self[key] created = False else: sucs = self._cache[key] = self.Successors(self, key) created = True sucs.clear() sucs.update(val) if created: self.send(self, key=key, val=val) def __delitem__(self, key): """Wipe out edges emanating from orig""" self[key].clear() del self._cache[key] self.send(self, key=key, val=None) def __iter__(self): return iter(self.graph.node) def __len__(self): return len(self.graph.node) def __contains__(self, key): return key in self.graph.node
[docs]class DiGraphSuccessorsMapping(GraphSuccessorsMapping): __slots__ = ('graph',)
[docs] class Successors(AbstractSuccessors): __slots__ = ('graph', 'container', 'orig') def _order_nodes(self, dest): return (self.orig, dest)
[docs]class DiGraphPredecessorsMapping(GraphEdgeMapping): """Mapping for Predecessors instances, which map to Edges that end at the dest provided to this """ __slots__ = ('graph',) def __contains__(self, dest): return dest in self.graph.node def __getitem__(self, dest): """Return a Predecessors instance for edges ending at the given node """ if dest not in self: raise KeyError("No edges available") if dest not in self._cache: self._cache[dest] = self.Predecessors(self, dest) return self._cache[dest] def __setitem__(self, key, val): """Interpret ``val`` as a mapping of edges that end at ``dest``""" created = key not in self if key not in self._cache: self._cache[key] = self.Predecessors(self, key) preds = self._cache[key] preds.clear() preds.update(val) if created: self.send(self, key=key, val=val) def __delitem__(self, key): """Delete all edges ending at ``dest``""" if key in self._cache: self._cache[key].clear() del self._cache[key] self.send(self, key=key, val=None) def __iter__(self): return iter(self.graph.node) def __len__(self): return len(self.graph.node)
[docs] class Predecessors(GraphEdgeMapping): """Mapping of Edges that end at a particular node""" __slots__ = ('graph', 'container', 'dest') def __init__(self, container, dest): """Store container and node ID""" super().__init__(container.graph) self.container = container self.dest = dest def __iter__(self): """Iterate over the edges that exist at the present (branch, rev) """ return self.db._edges_cache.iter_predecessors( self.graph.name, self.dest, *self.db.btt() ) def __contains__(self, orig): """Is there an edge from ``orig`` at the moment?""" return self.db._edges_cache.has_predecessor( self.graph.name, self.dest, orig, *self.db.btt() ) def __len__(self): """How many edges exist at this rev of this branch?""" return self.db._edges_cache.count_predecessors( self.graph.name, self.dest, *self.db.btt() ) def _make_edge(self, orig): return Edge(self.graph, orig, self.dest) def __getitem__(self, orig): """Get the edge from the given node to mine""" return self.graph.adj[orig][self.dest] def __setitem__(self, orig, value): """Use ``value`` as a mapping of edge attributes, set an edge from the given node to mine. """ branch, turn, tick = self.db.nbtt() try: e = self[orig] e.clear() created = False except KeyError: self.db.query.exist_edge( self.graph.name, orig, self.dest, 0, branch, turn, tick, True ) e = self._make_edge(orig) e.update(value) self.db._edges_cache.store( self.graph.name, orig, self.dest, 0, branch, turn, tick, True ) self.send(self, key=orig, val=value) def __delitem__(self, orig): """Unset the existence of the edge from the given node to mine""" branch, turn, tick = self.db.nbtt() if 'Multi' in self.graph.__class__.__name__: for idx in self[orig]: self.db.query.exist_edge( self.graph.name, orig, self.dest, idx, branch, turn, tick, False ) self.db._edges_cache.store( self.graph.name, orig, self.dest, idx, branch, turn, tick, False ) self.send(self, key=orig, val=None) return else: raise KeyError("No edges from {}".format(orig)) self.db.query.exist_edge( self.graph.name, orig, self.dest, 0, branch, turn, tick, False ) self.db._edges_cache.store( self.graph.name, orig, self.dest, 0, branch, turn, tick, None ) self.send(self, key=orig, val=None)
[docs]class MultiEdges(GraphEdgeMapping): """Mapping of Edges between two nodes""" __slots__ = ('graph', 'orig', 'dest') db = getatt('graph.db') def __init__(self, graph, orig, dest): super().__init__(graph) self.orig = orig self.dest = dest def __iter__(self): return self.db._edges_cache.iter_keys( self.graph.name, self.orig, self.dest, *self.db.btt() ) def __len__(self): """How many edges currently connect my two nodes?""" n = 0 for idx in iter(self): n += 1 return n def __contains__(self, i): return self.db._edges_cache.contains_key( self.graph.name, self.orig, self.dest, i, *self.db.btt() ) def _getedge(self, idx): return self.db._get_edge(self.graph, self.orig, self.dest, idx) def __getitem__(self, idx): """Get an Edge with a particular index, if it exists at the present (branch, rev) """ if idx not in self: raise KeyError("No edge at that index") return self._getedge(idx) def __setitem__(self, idx, val): """Create an Edge at a given index from a mapping. Delete the existing Edge first, if necessary. """ branch, turn, tick = self.db.nbtt() created = idx not in self self.db.query.exist_edge( self.graph.name, self.orig, self.dest, idx, branch, turn, tick, True ) e = self._getedge(idx) e.clear() e.update(val) self.db._edges_cache.store( self.graph.name, self.orig, self.dest, idx, branch, turn, tick, True ) if created: self.send(self, orig=self.orig, dest=self.dest, idx=idx, exists=True) def __delitem__(self, idx): """Delete the edge at a particular index""" branch, turn, tick = self.db.btt() tick += 1 e = self._getedge(idx) if not e.exists: raise KeyError("No edge at that index") e.clear() del self._cache[idx] self.db._edges_cache.store( self.graph.name, self.orig, self.dest, idx, branch, turn, tick, None ) self.db.tick = tick self.send(self, orig=self.orig, dest=self.dest, idx=idx, exists=False)
[docs] def clear(self): """Delete all edges between these nodes""" for idx in self: del self[idx]
[docs]class MultiGraphSuccessorsMapping(GraphSuccessorsMapping): """Mapping of Successors that map to MultiEdges""" __slots__ = ('graph',) def __getitem__(self, orig): """If the node exists, return its Successors""" if orig not in self.graph.node: raise KeyError("No such node") return self._getsucc(orig) def _getsucc(self, orig): if orig not in self._cache: self._cache[orig] = self.Successors(self, orig) return self._cache[orig] def __setitem__(self, orig, val): """Interpret ``val`` as a mapping of successors, and turn it into a proper Successors object for storage """ r = self._getsucc(orig) r.clear() r.update(val) self.send(self, key=orig, val=val) def __delitem__(self, orig): """Disconnect this node from everything""" succs = self._getsucc(orig) succs.clear() del self._cache[orig] self.send(self, key=orig, val=None)
[docs] class Successors(AbstractSuccessors): """Edges succeeding a given node in a multigraph""" __slots__ = ('graph', 'container', 'orig', '_multedge') def __init__(self, container, orig): super().__init__(container, orig) self._multedge = {} def _order_nodes(self, dest): if dest < self.orig: return(dest, self.orig) else: return (self.orig, dest) def _get_multedge(self, dest): if dest not in self._multedge: self._multedge[dest] = MultiEdges( self.graph, *self._order_nodes(dest) ) return self._multedge[dest] def __getitem__(self, dest): """Return MultiEdges to ``dest`` if it exists""" if dest in self.graph.node: return self._get_multedge(dest) raise KeyError("No such node") def __setitem__(self, dest, val): """Interpret ``val`` as a dictionary of edge attributes for edges between my ``orig`` and the given ``dest`` """ self[dest].update(val) self.send(self, key=dest, val=val) def __delitem__(self, dest): """Delete all edges between my ``orig`` and the given ``dest``""" self[dest].clear() del self._multedge[dest] self.send(self, key=dest, val=None)
[docs]class MultiDiGraphPredecessorsMapping(DiGraphPredecessorsMapping): """Version of DiGraphPredecessorsMapping for multigraphs""" __slots__ = ('graph',)
[docs] class Predecessors(DiGraphPredecessorsMapping.Predecessors): """Predecessor edges from a given node""" __slots__ = ('graph', 'container', 'dest') def __getitem__(self, orig): """Get MultiEdges""" return MultiEdges(self.graph, orig, self.dest) def __setitem__(self, orig, val): self[orig].update(val) self.send(self, key=orig, val=val) def __delitem__(self, orig): self[orig].clear() self.send(self, key=orig, val=None)
[docs]class MultiDiGraphSuccessorsMapping(MultiGraphSuccessorsMapping): __slots__ = ('graph',)
[docs] class Successors(MultiGraphSuccessorsMapping.Successors): __slots__ = ('graph', 'container', 'orig', '_multedge') def _order_nodes(self, dest): return self.orig, dest
[docs]class AllegedGraph(object): """Class giving the graphs those methods they share in common. """ _succs = {} _statmaps = {} graph_map_cls = GraphMapping node_map_cls = GraphNodeMapping def __new__(cls, db, name, data=None, **attr): if name in db._graph_objs: ret = db._graph_objs[name] if not isinstance(ret, cls): raise EntityCollisionError("Already have a graph named {}, but it's of class {}".format( name, type(ret) )) return ret return super(AllegedGraph, cls).__new__(cls) def __init__(self, db, name, data=None, **attr): self._name = name self.db = db if name not in self.db._graph_objs: self.db._graph_objs[name] = self if data is not None: data = data.copy() if isinstance(data, dict) and 'name' in data: del data['name'] if hasattr(data, 'graph') and 'name' in data.graph: del data.graph['name'] convert_to_networkx_graph(data, create_using=self) self.graph.update(attr) @property def graph(self): if id(self) not in self._statmaps: self._statmaps[id(self)] = self.graph_map_cls(self) return self._statmaps[id(self)] @graph.setter def graph(self, v): self.graph.clear() self.graph.update(v) _nodemaps = {} @property def node(self): if id(self) not in self._nodemaps: self._nodemaps[id(self)] = self.node_map_cls(self) return self._nodemaps[id(self)] @node.setter def node(self, v): self.node.clear() self.node.update(v) _node = node _succmaps = {} @property def adj(self): if self._name not in self._succmaps: self._succmaps[id(self)] = self.adj_cls(self) return self._succmaps[id(self)] @adj.setter def adj(self, v): self.adj.clear() self.adj.update(v) edge = succ = _succ = _adj = adj _predmaps = {} @property def pred(self): if not hasattr(self, 'pred_cls'): raise TypeError("Undirected graph") if self._name not in self._predmaps: self._predmaps[id(self)] = self.pred_cls(self) return self._predmaps[id(self)] @pred.setter def pred(self, v): self.pred.clear() self.pred.update(v) _pred = pred @property def name(self): return self._name @name.setter def name(self, v): raise TypeError("graphs can't be renamed") def _and_previous(self): """Return a 4-tuple that will usually be (current branch, current revision - 1, current branch, current revision), unless current revision - 1 is before the start of the current branch, in which case the first element will be the parent branch. """ branch = self.db.branch rev = self.db.rev (parent, parent_rev) = self.db.sql('parparrev', branch).fetchone() before_branch = parent if parent_rev == rev else branch return (before_branch, rev-1, branch, rev)
[docs] def clear(self): """Remove all nodes and edges from the graph. Unlike the regular networkx implementation, this does *not* remove the graph's name. But all the other graph, node, and edge attributes go away. """ self.adj.clear() self.node.clear() self.graph.clear()
[docs]class Graph(AllegedGraph, networkx.Graph): """A version of the networkx.Graph class that stores its state in a database. """ adj_cls = GraphSuccessorsMapping
[docs]class DiGraph(AllegedGraph, networkx.DiGraph): """A version of the networkx.DiGraph class that stores its state in a database. """ adj_cls = DiGraphSuccessorsMapping pred_cls = DiGraphPredecessorsMapping
[docs] def remove_edge(self, u, v): """Version of remove_edge that's much like normal networkx but only deletes once, since the database doesn't keep separate adj and succ mappings """ try: del self.succ[u][v] except KeyError: raise NetworkXError( "The edge {}-{} is not in the graph.".format(u, v) )
[docs] def remove_edges_from(self, ebunch): """Version of remove_edges_from that's much like normal networkx but only deletes once, since the database doesn't keep separate adj and succ mappings """ for e in ebunch: (u, v) = e[:2] if u in self.succ and v in self.succ[u]: del self.succ[u][v]
[docs] def add_edge(self, u, v, attr_dict=None, **attr): """Version of add_edge that only writes to the database once""" if attr_dict is None: attr_dict = attr else: try: attr_dict.update(attr) except AttributeError: raise NetworkXError( "The attr_dict argument must be a dictionary." ) datadict = self.adj[u].get(v, {}) datadict.update(attr_dict) if u not in self.node: self.node[u] = {} if v not in self.node: self.node[v] = {} self.succ[u][v] = datadict assert u in self.succ, "Failed to add edge {u}->{v} ({u} not in successors)".format(u=u, v=v) assert v in self.succ[u], "Failed to add edge {u}->{v} ({v} not in succ[{u}])".format(u=u, v=v)
[docs] def add_edges_from(self, ebunch, attr_dict=None, **attr): """Version of add_edges_from that only writes to the database once""" if attr_dict is None: attr_dict = attr else: try: attr_dict.update(attr) except AttributeError: raise NetworkXError( "The attr_dict argument must be a dict." ) for e in ebunch: ne = len(e) if ne == 3: u, v, dd = e assert hasattr(dd, "update") elif ne == 2: u, v = e dd = {} else: raise NetworkXError( "Edge tupse {} must be a 2-tuple or 3-tuple.".format(e) ) if u not in self.node: self.node[u] = {} if v not in self.node: self.node[v] = {} datadict = self.adj.get(u, {}).get(v, {}) datadict.update(attr_dict) datadict.update(dd) self.succ[u][v] = datadict assert(u in self.succ) assert(v in self.succ[u])
[docs]class MultiGraph(AllegedGraph, networkx.MultiGraph): """A version of the networkx.MultiGraph class that stores its state in a database. """ adj_cls = MultiGraphSuccessorsMapping
[docs]class MultiDiGraph(AllegedGraph, networkx.MultiDiGraph): """A version of the networkx.MultiDiGraph class that stores its state in a database. """ adj_cls = MultiDiGraphSuccessorsMapping pred_cls = MultiDiGraphPredecessorsMapping
[docs] def remove_edge(self, u, v, key=None): """Version of remove_edge that's much like normal networkx but only deletes once, since the database doesn't keep separate adj and succ mappings """ try: d = self.adj[u][v] except KeyError: raise NetworkXError( "The edge {}-{} is not in the graph.".format(u, v) ) if key is None: d.popitem() else: try: del d[key] except KeyError: raise NetworkXError( "The edge {}-{} with key {} is not in the graph.".format (u, v, key) ) if len(d) == 0: del self.succ[u][v]
[docs] def remove_edges_from(self, ebunch): """Version of remove_edges_from that's much like normal networkx but only deletes once, since the database doesn't keep separate adj and succ mappings """ for e in ebunch: (u, v) = e[:2] if u in self.succ and v in self.succ[u]: del self.succ[u][v]
[docs] def add_edge(self, u, v, key=None, attr_dict=None, **attr): """Version of add_edge that only writes to the database once.""" if attr_dict is None: attr_dict = attr else: try: attr_dict.update(attr) except AttributeError: raise NetworkXError( "The attr_dict argument must be a dictionary." ) if u not in self.node: self.node[u] = {} if v not in self.node: self.node[v] = {} if v in self.succ[u]: keydict = self.adj[u][v] if key is None: key = len(keydict) while key in keydict: key += 1 datadict = keydict.get(key, {}) datadict.update(attr_dict) keydict[key] = datadict else: if key is None: key = 0 datadict = {} datadict.update(attr_dict) keydict = {key: datadict} self.succ[u][v] = keydict return key
pass