# This file is part of allegedb, an object relational mapper for versioned graphs.
# Copyright (C) Zachary Spector. public@zacharyspector.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""The main interface to the allegedb ORM, and some supporting functions and classes"""
from contextlib import ContextDecorator
from weakref import WeakValueDictionary
from blinker import Signal
from allegedb.window import update_window, update_backward_window
from .graph import (
Graph,
DiGraph,
MultiGraph,
MultiDiGraph,
Node,
Edge
)
from .query import QueryEngine, TimeError
from .window import HistoryError
[docs]class GraphNameError(KeyError):
"""For errors involving graphs' names"""
[docs]class PlanningContext(ContextDecorator):
"""A context manager for 'hypothetical' edits.
Start a block of code like:
```
with orm.plan():
...
```
and any changes you make to the world state within that block will be
'plans,' meaning that they are used as defaults. The world will
obey your plan unless you make changes to the same entities outside
of the plan, in which case the world will obey those, and cancel any
future plan.
New branches cannot be started within plans.
"""
__slots__ = ['orm', 'time']
def __init__(self, orm):
self.orm = orm
def __enter__(self):
if self.orm._planning:
raise ValueError("Already planning")
self.orm._planning = True
self.time = self.orm.btt()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.orm._obranch, self.orm._oturn, self.orm._otick = self.time
self.orm._planning = False
[docs]class TimeSignal(Signal):
"""Acts like a list of ``[branch, turn]`` for the most part.
You can set these to new values, or even replace them with a whole new
``[branch, turn]`` if you wish. It's even possible to use the strings
``'branch'`` or ``'turn'`` in the place of indices, but at that point
you might prefer to set ``engine.branch`` or ``engine.turn`` directly.
This is a Signal, so pass a function to the connect(...) method and
it will be called whenever the time changes. Not when the tick
changes, though. If you really need something done whenever the
tick changes, override the _set_tick method of
:class:`allegedb.ORM`.
"""
def __init__(self, engine):
super().__init__()
self.engine = engine
def __iter__(self):
yield self.engine.branch
yield self.engine.turn
def __len__(self):
return 2
def __getitem__(self, i):
if i in ('branch', 0):
return self.engine.branch
if i in ('turn', 1):
return self.engine.turn
def __setitem__(self, i, v):
branch_then, turn_then, tick_then = self.engine.btt()
if i in ('branch', 0):
self.engine.branch = v
if i in ('turn', 1):
self.engine.turn = v
branch_now, turn_now, tick_now = self.engine.btt()
self.send(
self, branch_then=branch_then, turn_then=turn_then, tick_then=tick_then,
branch_now=branch_now, turn_now=turn_now, tick_now=tick_now
)
def __str__(self):
return str((self.engine.branch, self.engine.turn))
def __eq__(self, other):
return tuple(self) == other
def __ne__(self, other):
return tuple(self) != other
def __gt__(self, other):
return tuple(self) > other
def __ge__(self, other):
return tuple(self) >= other
def __lt__(self, other):
return tuple(self) < other
def __le__(self, other):
return tuple(self) <= other
[docs]class TimeSignalDescriptor:
__doc__ = TimeSignal.__doc__
signals = {}
def __get__(self, inst, cls):
if id(inst) not in self.signals:
self.signals[id(inst)] = TimeSignal(inst)
return self.signals[id(inst)]
def __set__(self, inst, val):
if id(inst) not in self.signals:
self.signals[id(inst)] = TimeSignal(inst)
real = self.signals[id(inst)]
branch_then, turn_then, tick_then = real.engine.btt()
branch_now, turn_now = val
if (branch_then, turn_then) == (branch_now, turn_now):
return
e = real.engine
# enforce the arrow of time, if it's in effect
if e._forward:
if branch_now != branch_then:
raise TimeError("Can't change branches in a forward context")
if turn_now < turn_then:
raise TimeError("Can't time travel backward in a forward context")
if turn_now > turn_then + 1:
raise TimeError("Can't skip turns in a forward context")
# make sure I'll end up within the revision range of the
# destination branch
branches = e._branches
if branch_now in branches:
tick_now = e._turn_end_plan.setdefault(
(branch_now, turn_now),
tick_then
)
parent, turn_start, tick_start, turn_end, tick_end = branches[branch_now]
if turn_now < turn_start:
raise ValueError(
"The turn number {} "
"occurs before the start of "
"the branch {}".format(turn_now, branch_now)
)
if turn_now == turn_start and tick_now < tick_start:
raise ValueError(
"The tick number {}"
"on turn {} "
"occurs before the start of "
"the branch {}".format(
tick_now, turn_now, branch_now
)
)
if not e._planning and (
turn_now > turn_end or (
turn_now == turn_end and tick_now > tick_end
)
):
branches[branch_now] = parent, turn_start, tick_start, turn_now, tick_now
else:
tick_now = tick_then
branches[branch_now] = (
branch_then, turn_now, tick_now, turn_now, tick_now
)
e.query.new_branch(branch_now, branch_then, turn_now, tick_now)
e._obranch, e._oturn = val
if not e._planning:
if tick_now > e._turn_end[val]:
e._turn_end[val] = tick_now
e._otick = e._turn_end_plan[val] = tick_now
real.send(
e,
branch_then=branch_then,
turn_then=turn_then,
tick_then=tick_then,
branch_now=branch_now,
turn_now=turn_now,
tick_now=tick_now
)
[docs]def setgraphval(delta, graph, key, val):
"""Change a delta to say that a graph stat was set to a certain value"""
delta.setdefault(graph, {})[key] = val
[docs]def setnode(delta, graph, node, exists):
"""Change a delta to say that a node was created or deleted"""
delta.setdefault(graph, {}).setdefault('nodes', {})[node] = bool(exists)
[docs]def setnodeval(delta, graph, node, key, value):
"""Change a delta to say that a node stat was set to a certain value"""
if (
graph in delta and 'nodes' in delta[graph] and
node in delta[graph]['nodes'] and not delta[graph]['nodes'][node]
):
return
delta.setdefault(graph, {}).setdefault('node_val', {}).setdefault(node, {})[key] = value
[docs]def setedge(delta, is_multigraph, graph, orig, dest, idx, exists):
"""Change a delta to say that an edge was created or deleted"""
if is_multigraph(graph):
delta.setdefault(graph, {}).setdefault('edges', {})\
.setdefault(orig, {}).setdefault(dest, {})[idx] = bool(exists)
else:
delta.setdefault(graph, {}).setdefault('edges', {})\
.setdefault(orig, {})[dest] = bool(exists)
[docs]def setedgeval(delta, is_multigraph, graph, orig, dest, idx, key, value):
"""Change a delta to say that an edge stat was set to a certain value"""
if is_multigraph(graph):
if (
graph in delta and 'edges' in delta[graph] and
orig in delta[graph]['edges'] and dest in delta[graph]['edges'][orig]
and idx in delta[graph]['edges'][orig][dest]
and not delta[graph]['edges'][orig][dest][idx]
):
return
delta.setdefault(graph, {}).setdefault('edge_val', {})\
.setdefault(orig, {}).setdefault(dest, {})\
.setdefault(idx, {})[key] = value
else:
if (
graph in delta and 'edges' in delta[graph] and
orig in delta[graph]['edges'] and dest in delta[graph]['edges'][orig]
and not delta[graph]['edges'][orig][dest]
):
return
delta.setdefault(graph, {}).setdefault('edge_val', {})\
.setdefault(orig, {}).setdefault(dest, {})[key] = value
[docs]class ORM(object):
"""Instantiate this with the same string argument you'd use for a
SQLAlchemy ``create_engine`` call. This will be your interface to
allegedb.
"""
node_cls = Node
edge_cls = Edge
query_engine_cls = QueryEngine
illegal_graph_names = ['global']
illegal_node_names = ['nodes', 'node_val', 'edges', 'edge_val']
time = TimeSignalDescriptor()
def _make_node(self, graph, node):
return self.node_cls(graph, node)
def _get_node(self, graph, node):
key = (graph.name, node)
if key in self._node_objs:
return self._node_objs[key]
if not self._node_exists(graph.name, node):
self._exist_node(graph.name, node)
ret = self._make_node(graph, node)
self._node_objs[key] = ret
return ret
def _make_edge(self, graph, orig, dest, idx):
return self.edge_cls(graph, orig, dest, idx)
def _get_edge(self, graph, orig, dest, idx=0):
key = (graph.name, orig, dest, idx)
if key in self._edge_objs:
return self._edge_objs[key]
if not self._edge_exists(graph.name, orig, dest, idx):
self._exist_edge(graph.name, orig, dest, idx)
ret = self._make_edge(graph, orig, dest, idx)
self._edge_objs[key] = ret
return ret
[docs] def plan(self):
return PlanningContext(self)
plan.__doc__ = PlanningContext.__doc__
from contextlib import contextmanager
[docs] @contextmanager
def advancing(self):
"""A context manager for when time is moving forward one turn at a time.
When used in LiSE, this means that the game is being simulated.
It changes how the caching works, making it more efficient.
"""
if self._forward:
raise ValueError("Already advancing")
self._forward = True
yield
self._forward = False
[docs] @contextmanager
def batch(self):
"""A context manager for when you're creating lots of state.
Reads will be much slower in a batch, but writes will be faster.
You *can* combine this with ``advancing`` but it isn't any faster.
"""
if self._no_kc:
raise ValueError("Already in a batch")
self._no_kc = True
yield
self._no_kc = False
[docs] def get_delta(self, branch, turn_from, tick_from, turn_to, tick_to):
"""Get a dictionary describing changes to all graphs.
The keys are graph names. Their values are dictionaries of the graphs'
attributes' new values, with ``None`` for deleted keys. Also in those graph
dictionaries are special keys 'node_val' and 'edge_val' describing changes
to node and edge attributes, and 'nodes' and 'edges' full of booleans
indicating whether a node or edge exists.
"""
from functools import partial
if turn_from == turn_to:
return self.get_turn_delta(branch, turn_from, tick_from, tick_to)
delta = {}
graph_objs = self._graph_objs
if turn_to < turn_from:
updater = partial(update_backward_window, turn_from, tick_from, turn_to, tick_to)
gvbranches = self._graph_val_cache.presettings
nbranches = self._nodes_cache.presettings
nvbranches = self._node_val_cache.presettings
ebranches = self._edges_cache.presettings
evbranches = self._edge_val_cache.presettings
else:
updater = partial(update_window, turn_from, tick_from, turn_to, tick_to)
gvbranches = self._graph_val_cache.settings
nbranches = self._nodes_cache.settings
nvbranches = self._node_val_cache.settings
ebranches = self._edges_cache.settings
evbranches = self._edge_val_cache.settings
if branch in gvbranches:
updater(partial(setgraphval, delta), gvbranches[branch])
if branch in nbranches:
updater(partial(setnode, delta), nbranches[branch])
if branch in nvbranches:
updater(partial(setnodeval, delta), nvbranches[branch])
if branch in ebranches:
updater(partial(setedge, delta, lambda g: graph_objs[g].is_multigraph()), ebranches[branch])
if branch in evbranches:
updater(partial(setedgeval, delta, lambda g: graph_objs[g].is_multigraph()), evbranches[branch])
return delta
[docs] def get_turn_delta(self, branch=None, turn=None, tick_from=0, tick_to=None):
"""Get a dictionary describing changes made on a given turn.
If ``tick_to`` is not supplied, report all changes after ``tick_from``
(default 0).
The keys are graph names. Their values are dictionaries of the graphs'
attributes' new values, with ``None`` for deleted keys. Also in those graph
dictionaries are special keys 'node_val' and 'edge_val' describing changes
to node and edge attributes, and 'nodes' and 'edges' full of booleans
indicating whether a node or edge exists.
"""
branch = branch or self.branch
turn = turn or self.turn
tick_to = tick_to or self.tick
delta = {}
if tick_from < tick_to:
gvbranches = self._graph_val_cache.settings
nbranches = self._nodes_cache.settings
nvbranches = self._node_val_cache.settings
ebranches = self._edges_cache.settings
evbranches = self._edge_val_cache.settings
else:
gvbranches = self._graph_val_cache.presettings
nbranches = self._nodes_cache.presettings
nvbranches = self._node_val_cache.presettings
ebranches = self._edges_cache.presettings
evbranches = self._edge_val_cache.presettings
if branch in gvbranches and turn in gvbranches[branch]:
for graph, key, value in gvbranches[branch][turn][tick_from:tick_to]:
if graph in delta:
delta[graph][key] = value
else:
delta[graph] = {key: value}
if branch in nbranches and turn in nbranches[branch]:
for graph, node, exists in nbranches[branch][turn][tick_from:tick_to]:
delta.setdefault(graph, {}).setdefault('nodes', {})[node] = bool(exists)
if branch in nvbranches and turn in nvbranches[branch]:
for graph, node, key, value in nvbranches[branch][turn][tick_from:tick_to]:
if (
graph in delta and 'nodes' in delta[graph] and
node in delta[graph]['nodes'] and not delta[graph]['nodes'][node]
):
continue
nodevd = delta.setdefault(graph, {}).setdefault('node_val', {})
if node in nodevd:
nodevd[node][key] = value
else:
nodevd[node] = {key: value}
graph_objs = self._graph_objs
if branch in ebranches and turn in ebranches[branch]:
for graph, orig, dest, idx, exists in ebranches[branch][turn][tick_from:tick_to]:
if graph_objs[graph].is_multigraph():
if (
graph in delta and 'edges' in delta[graph] and
orig in delta[graph]['edges'] and dest in delta[graph]['edges'][orig]
and idx in delta[graph]['edges'][orig][dest]
and not delta[graph]['edges'][orig][dest][idx]
):
continue
delta.setdefault(graph, {}).setdefault('edges', {})\
.setdefault(orig, {}).setdefault(dest, {})[idx] = bool(exists)
else:
if (
graph in delta and 'edges' in delta[graph] and
orig in delta[graph]['edges'] and dest in delta[graph]['edges'][orig]
and not delta[graph]['edges'][orig][dest]
):
continue
delta.setdefault(graph, {}).setdefault('edges', {})\
.setdefault(orig, {})[dest] = bool(exists)
if branch in evbranches and turn in evbranches[branch]:
for graph, orig, dest, idx, key, value in evbranches[branch][turn][tick_from:tick_to]:
edgevd = delta.setdefault(graph, {}).setdefault('edge_val', {})\
.setdefault(orig, {}).setdefault(dest, {})
if graph_objs[graph].is_multigraph():
if idx in edgevd:
edgevd[idx][key] = value
else:
edgevd[idx] = {key: value}
else:
edgevd[key] = value
return delta
def _init_caches(self):
from collections import defaultdict
from .cache import Cache, NodesCache, EdgesCache
self._global_cache = self.query._global_cache = {}
self._node_objs = WeakValueDictionary()
self._edge_objs = WeakValueDictionary()
for k, v in self.query.global_items():
if k == 'branch':
self._obranch = v
elif k == 'turn':
self._oturn = int(v)
elif k == 'tick':
self._otick = int(v)
else:
self._global_cache[k] = v
self._childbranch = defaultdict(set)
"""Immediate children of a branch"""
self._branches = {}
"""Start time, end time, and parent of each branch"""
self._branch_parents = defaultdict(set)
"""Parents of a branch at any remove"""
self._turn_end = defaultdict(lambda: 0)
"""Tick on which a (branch, turn) ends"""
self._turn_end_plan = defaultdict(lambda: 0)
"""Tick on which a (branch, turn) ends, even if it hasn't been simulated"""
self._graph_val_cache = Cache(self)
self._nodes_cache = NodesCache(self)
self._edges_cache = EdgesCache(self)
self._node_val_cache = Cache(self)
self._edge_val_cache = Cache(self)
self._graph_objs = {}
def _load_graphs(self):
for (graph, typ) in self.query.graphs_types():
self._graph_objs[graph] = {
'Graph': Graph,
'DiGraph': DiGraph,
'MultiGraph': MultiGraph,
'MultiDiGraph': MultiDiGraph
}[typ](self, graph)
def __init__(
self,
dbstring,
alchemy=True,
connect_args={},
validate=False
):
"""Make a SQLAlchemy engine if possible, else a sqlite3 connection. In
either case, begin a transaction.
"""
self._planning = False
self._forward = False
self._no_kc = False
if not hasattr(self, 'query'):
self.query = self.query_engine_cls(
dbstring, connect_args, alchemy,
getattr(self, 'pack', None), getattr(self, 'unpack', None)
)
self.query.initdb()
# in case this is the first startup
self._otick = self._oturn = 0
self._init_caches()
for (branch, parent, parent_turn, parent_tick, end_turn, end_tick) in self.query.all_branches():
self._branches[branch] = (parent, parent_turn, parent_tick, end_turn, end_tick)
self._upd_branch_parentage(parent, branch)
for (branch, turn, end_tick, plan_end_tick) in self.query.turns_dump():
self._turn_end[branch, turn] = end_tick
self._turn_end_plan[branch, turn] = plan_end_tick
if 'trunk' not in self._branches:
self._branches['trunk'] = None, 0, 0, 0, 0
self._load_graphs()
self._init_load(validate=validate)
def _upd_branch_parentage(self, parent, child):
self._childbranch[parent].add(child)
self._branch_parents[child].add(parent)
while parent in self._branches:
parent, _, _, _, _ = self._branches[parent]
self._branch_parents[child].add(parent)
def _init_load(self, validate=False):
if not hasattr(self, 'graph'):
self.graph = self._graph_objs
noderows = [
(graph, node, branch, turn, tick, ex if ex else None)
for (graph, node, branch, turn, tick, ex)
in self.query.nodes_dump()
]
self._nodes_cache.load(noderows, validate=validate)
edgerows = [
(graph, orig, dest, idx, branch, turn, tick, ex if ex else None)
for (graph, orig, dest, idx, branch, turn, tick, ex)
in self.query.edges_dump()
]
self._edges_cache.load(edgerows, validate=validate)
self._graph_val_cache.load(self.query.graph_val_dump(), validate=validate)
self._node_val_cache.load(self.query.node_val_dump(), validate=validate)
self._edge_val_cache.load(self.query.edge_val_dump(), validate=validate)
def __enter__(self):
"""Enable the use of the ``with`` keyword"""
return self
def __exit__(self, *args):
"""Alias for ``close``"""
self.close()
[docs] def is_parent_of(self, parent, child):
"""Return whether ``child`` is a branch descended from ``parent`` at
any remove.
"""
if parent == 'trunk':
return True
if child == 'trunk':
return False
if child not in self._branches:
raise ValueError(
"The branch {} seems not to have ever been created".format(
child
)
)
if self._branches[child][0] == parent:
return True
return self.is_parent_of(parent, self._branches[child][0])
def _get_branch(self):
return self._obranch
def _set_branch(self, v):
if self._planning:
raise ValueError("Don't change branches while planning")
curbranch, curturn, curtick = self.btt()
if curbranch == v:
self._otick = self._turn_end_plan[curbranch, curturn]
return
# make sure I'll end up within the revision range of the
# destination branch
if v != 'trunk' and v in self._branches:
parturn = self._branches[v][1]
if curturn < parturn:
raise ValueError(
"Tried to jump to branch {br}, "
"which starts at turn {rv}. "
"Go to turn {rv} or later to use this branch.".format(
br=v,
rv=parturn
)
)
if v not in self._branches:
# assumes the present turn in the parent branch has
# been finalized.
self.query.new_branch(v, curbranch, curturn, curtick)
self._branches[v] = curbranch, curturn, curtick, curturn, curtick
self._upd_branch_parentage(v, curbranch)
self._turn_end_plan[v, curturn] = self._turn_end[v, curturn] = curtick
self._obranch = v
self._otick = self._turn_end_plan[v, curturn]
# easier to override things this way
@property
def branch(self):
return self._get_branch()
@branch.setter
def branch(self, v):
self._set_branch(v)
def _get_turn(self):
return self._oturn
def _set_turn(self, v):
if v == self.turn:
self._otick = self._turn_end_plan[tuple(self.time)]
return
if not isinstance(v, int):
raise TypeError("turn must be an integer")
# enforce the arrow of time, if it's in effect
if self._forward and v < self._oturn:
raise ValueError("Can't time travel backward in a forward context")
# first make sure the cursor is not before the start of this branch
branch = self.branch
tick = self._turn_end_plan.setdefault((branch, v), 0)
parent, turn_start, tick_start, turn_end, tick_end = self._branches[branch]
if branch != 'trunk':
if v < turn_start:
raise ValueError(
"The turn number {} "
"occurs before the start of "
"the branch {}".format(v, branch)
)
if not self._planning and v > turn_end:
self._branches[branch] = parent, turn_start, tick_start, v, tick
self._otick = tick
self._oturn = v
# easier to override things this way
@property
def turn(self):
return self._get_turn()
@turn.setter
def turn(self, v):
self._set_turn(v)
def _get_tick(self):
return self._otick
def _set_tick(self, v):
if not isinstance(v, int):
raise TypeError("tick must be an integer")
time = branch, turn = self._obranch, self._oturn
# enforce the arrow of time, if it's in effect
if self._forward and v < self._otick:
raise ValueError("Can't time travel backward in a forward context")
if v > self._turn_end_plan[time]:
self._turn_end_plan[time] = v
if not self._planning:
if v > self._turn_end[time]:
self._turn_end[time] = v
parent, turn_start, tick_start, turn_end, tick_end = self._branches[branch]
if turn == turn_end and v > tick_end:
self._branches[branch] = parent, turn_start, tick_start, turn, v
self._otick = v
# easier to override things this way
@property
def tick(self):
return self._get_tick()
@tick.setter
def tick(self, v):
self._set_tick(v)
[docs] def btt(self):
"""Return the branch, turn, and tick."""
return self._obranch, self._oturn, self._otick
[docs] def nbtt(self):
"""Increment the tick and return branch, turn, tick
Unless we're viewing the past, in which case raise HistoryError.
Idea is you use this when you want to advance time, which you
can only do once per branch, turn, tick.
"""
from .cache import HistoryError
branch, turn, tick = self.btt()
tick += 1
if (branch, turn) in self._turn_end_plan:
if tick > self._turn_end_plan[branch, turn]:
self._turn_end_plan[branch, turn] = tick
else:
tick = self._turn_end_plan[branch, turn] + 1
self._turn_end_plan[branch, turn] = tick
if self._turn_end[branch, turn] > tick:
raise HistoryError(
"You're not at the end of turn {}. Go to tick {} to change things".format(
turn, self._turn_end[branch, turn]
)
)
parent, turn_start, tick_start, turn_end, tick_end = self._branches[branch]
if turn < turn_end or (
turn == turn_end and tick < tick_end
):
raise HistoryError(
"You're in the past. Go to turn {}, tick {} to change things".format(turn_end, tick_end)
)
if not self._planning:
if turn_end != turn:
raise HistoryError(
"When advancing time outside of a plan, you can't skip turns. Go to turn {}".format(turn_end)
)
self._branches[branch] = parent, turn_start, tick_start, turn_end, tick
self._turn_end[branch, turn] = tick
self._otick = tick
return branch, turn, tick
[docs] def commit(self):
"""Write the state of all graphs to the database and commit the transaction.
Also saves the current branch, turn, and tick.
"""
self.query.globl['branch'] = self._obranch
self.query.globl['turn'] = self._oturn
self.query.globl['tick'] = self._otick
set_branch = self.query.set_branch
for branch, (parent, turn_start, tick_start, turn_end, tick_end) in self._branches.items():
set_branch(branch, parent, turn_start, tick_start, turn_end, tick_end)
turn_end = self._turn_end
set_turn = self.query.set_turn
for (branch, turn), plan_end_tick in self._turn_end_plan.items():
set_turn(branch, turn, turn_end[branch], plan_end_tick)
self.query.commit()
[docs] def close(self):
"""Write changes to database and close the connection"""
self.commit()
self.query.close()
[docs] def initdb(self):
"""Alias of ``self.query.initdb``"""
self.query.initdb()
def _init_graph(self, name, type_s='Graph'):
if self.query.have_graph(name):
raise GraphNameError("Already have a graph by that name")
if name in self.illegal_graph_names:
raise GraphNameError("Illegal name")
self.query.new_graph(name, type_s)
[docs] def new_graph(self, name, data=None, **attr):
"""Return a new instance of type Graph, initialized with the given
data if provided.
"""
self._init_graph(name, 'Graph')
g = Graph(self, name, data, **attr)
self._graph_objs[name] = g
return g
[docs] def new_digraph(self, name, data=None, **attr):
"""Return a new instance of type DiGraph, initialized with the given
data if provided.
"""
self._init_graph(name, 'DiGraph')
dg = DiGraph(self, name, data, **attr)
self._graph_objs[name] = dg
return dg
[docs] def new_multigraph(self, name, data=None, **attr):
"""Return a new instance of type MultiGraph, initialized with the given
data if provided.
"""
self._init_graph(name, 'MultiGraph')
mg = MultiGraph(self, name, data, **attr)
self._graph_objs[name] = mg
return mg
[docs] def new_multidigraph(self, name, data=None, **attr):
"""Return a new instance of type MultiDiGraph, initialized with the given
data if provided.
"""
self._init_graph(name, 'MultiDiGraph')
mdg = MultiDiGraph(self, name, data, **attr)
self._graph_objs[name] = mdg
return mdg
[docs] def get_graph(self, name):
"""Return a graph previously created with ``new_graph``,
``new_digraph``, ``new_multigraph``, or
``new_multidigraph``
"""
if name in self._graph_objs:
return self._graph_objs[name]
graphtypes = {
'Graph': Graph,
'DiGraph': DiGraph,
'MultiGraph': MultiGraph,
'MultiDiGraph': MultiDiGraph
}
type_s = self.query.graph_type(name)
if type_s not in graphtypes:
raise GraphNameError(
"I don't know of a graph named {}".format(name)
)
g = graphtypes[type_s](self, name)
self._graph_objs[name] = g
return g
[docs] def del_graph(self, name):
"""Remove all traces of a graph's existence from the database"""
# make sure the graph exists before deleting anything
self.get_graph(name)
self.query.del_graph(name)
if name in self._graph_objs:
del self._graph_objs[name]
def _iter_parent_btt(self, branch=None, turn=None, tick=None, *, stoptime=None):
"""Private use. Iterate over (branch, turn, tick), where the branch is
a descendant of the previous (starting with whatever branch is
presently active and ending at 'trunk'), and the turn is the
latest revision in the branch that matters.
Keyword ``stoptime`` may be a branch, in which case iteration will stop
instead of proceeding into that branch's parent; or it may be a triple,
``(branch, turn, tick)``, in which case iteration will stop instead of
yielding any time before that. The tick may be ``None``, in which case
iteration will stop instead of yielding the turn.
"""
branch = branch or self.branch
trn = self.turn if turn is None else turn
tck = self.tick if tick is None else tick
yield branch, trn, tck
stopbranches = set()
if stoptime:
if type(stoptime) is tuple:
stopbranch = stoptime[0]
stopbranches.add(stopbranch)
stopbranches.update(self._branch_parents[stopbranch])
else:
stopbranch = stoptime
stopbranches = self._branch_parents[stopbranch]
_branches = self._branches
while branch in _branches:
# ``par`` is the parent branch;
# ``(trn, tck)`` is when ``branch`` forked off from ``par``
(branch, trn, tck, _, _) = _branches[branch]
if branch in stopbranches and (
trn < stoptime[1] or (
trn == stoptime[1] and (
stoptime[2] is None or tck <= stoptime[2]
)
)
):
return
yield branch, trn, tck
def _branch_descendants(self, branch=None):
"""Iterate over all branches immediately descended from the current
one (or the given one, if available).
"""
branch = branch or self.branch
for (parent, (child, _, _, _, _)) in self._branches.items():
if parent == branch:
yield child
def _node_exists(self, character, node):
return self._nodes_cache.contains_entity(character, node, *self.btt())
def _exist_node(self, character, node, exist=True):
branch, turn, tick = self.nbtt()
self.query.exist_node(
character,
node,
branch,
turn,
tick,
True
)
self._nodes_cache.store(character, node, branch, turn, tick, exist)
def _edge_exists(self, character, orig, dest, idx=0):
return self._edges_cache.contains_entity(
character, orig, dest, idx, *self.btt()
)
def _exist_edge(
self, character, orig, dest, idx=0, exist=True
):
branch, turn, tick = self.nbtt()
self.query.exist_edge(
character,
orig,
dest,
idx,
branch,
turn,
tick,
exist
)
self._edges_cache.store(
character, orig, dest, idx, branch, turn, tick, exist
)