# This file is part of allegedb, a database abstraction for versioned graphs
# Copyright (c) Zachary Spector. public@zacharyspector.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Classes for in-memory storage and retrieval of historical graph data.
"""
from .window import WindowDict, HistoryError
from collections import OrderedDict, deque
[docs]class FuturistWindowDict(WindowDict):
"""A WindowDict that does not let you rewrite the past."""
__slots__ = ('_future', '_past')
def __setitem__(self, rev, v):
if hasattr(v, 'unwrap') and not hasattr(v, 'no_unwrap'):
v = v.unwrap()
if self._past is None:
self._past = []
if not self._past and not self._future:
self._past.append((rev, v))
return
self.seek(rev)
if self._future:
raise HistoryError(
"Already have some history after {}".format(rev)
)
if not self._past or rev > self._past[-1][0]:
self._past.append((rev, v))
elif rev == self._past[-1][0]:
self._past[-1] = (rev, v)
else:
raise HistoryError(
"Already have some history after {} "
"(and my seek function is broken?)".format(rev)
)
if type(self._past) is list and len(self._past) > self.DEQUE_THRESHOLD:
self._past = deque(self._past)
if type(self._future) is list and len(self._future) > self.DEQUE_THRESHOLD:
self._future = deque(self._future)
[docs]class TurnDict(FuturistWindowDict):
__slots__ = ('_future', '_past')
cls = FuturistWindowDict
def __getitem__(self, rev):
try:
return super().__getitem__(rev)
except KeyError:
ret = self[rev] = FuturistWindowDict()
return ret
def __setitem__(self, turn, value):
if type(value) is not FuturistWindowDict:
value = FuturistWindowDict(value)
super().__setitem__(turn, value)
[docs]class SettingsTurnDict(WindowDict):
__slots__ = ('_future', '_past')
cls = WindowDict
def __getitem__(self, rev):
try:
return super().__getitem__(rev)
except KeyError:
ret = self[rev] = WindowDict()
return ret
def __setitem__(self, turn, value):
if type(value) is not WindowDict:
value = WindowDict(value)
super().__setitem__(turn, value)
def _default_args_munger(self, k):
return tuple()
def _default_kwargs_munger(self, k):
return {}
[docs]class PickyDefaultDict(dict):
"""A ``defaultdict`` alternative that requires values of a specific type.
Pass some type object (such as a class) to the constructor to
specify what type to use by default, which is the only type I will
accept.
Default values are constructed with no arguments by default;
supply ``args_munger`` and/or ``kwargs_munger`` to override this.
They take arguments ``self`` and the unused key being looked up.
"""
__slots__ = ['type', 'args_munger', 'kwargs_munger', 'parent', 'key']
def __init__(
self, type=object,
args_munger=_default_args_munger,
kwargs_munger=_default_kwargs_munger
):
self.type = type
self.args_munger = args_munger
self.kwargs_munger = kwargs_munger
def __getitem__(self, k):
if k in self:
return super(PickyDefaultDict, self).__getitem__(k)
try:
ret = self[k] = self.type(
*self.args_munger(self, k),
**self.kwargs_munger(self, k)
)
except TypeError:
raise KeyError
return ret
def _create(self, v):
return self.type(v)
def __setitem__(self, k, v):
if type(v) is not self.type:
v = self._create(v)
super(PickyDefaultDict, self).__setitem__(k, v)
[docs]class StructuredDefaultDict(dict):
"""A ``defaultdict``-like class that expects values stored at a specific depth.
Requires an integer to tell it how many layers deep to go.
The innermost layer will be ``PickyDefaultDict``, which will take the
``type``, ``args_munger``, and ``kwargs_munger`` arguments supplied
to my constructor.
"""
__slots__ = ['layer', 'type', 'args_munger', 'kwargs_munger', 'parent', 'key']
def __init__(
self, layers, type=object,
args_munger=_default_args_munger,
kwargs_munger=_default_kwargs_munger
):
if layers < 1:
raise ValueError("Not enough layers")
self.layer = layers
self.type = type
self.args_munger = args_munger
self.kwargs_munger = kwargs_munger
def __getitem__(self, k):
if k in self:
return super(StructuredDefaultDict, self).__getitem__(k)
if self.layer < 2:
ret = PickyDefaultDict(
self.type, self.args_munger, self.kwargs_munger
)
else:
ret = StructuredDefaultDict(
self.layer-1, self.type,
self.args_munger, self.kwargs_munger
)
ret.parent = self
ret.key = k
super(StructuredDefaultDict, self).__setitem__(k, ret)
return ret
def __setitem__(self, k, v):
if type(v) is StructuredDefaultDict:
if (
v.layer == self.layer - 1
and v.type is self.type
and v.args_munger is self.args_munger
and v.kwargs_munger is self.kwargs_munger
):
super().__setitem__(k, v)
return
elif type(v) is PickyDefaultDict:
if (
self.layer < 2
and v.type is self.type
and v.args_munger is self.args_munger
and v.kwargs_munger is self.kwargs_munger
):
super().__setitem__(k, v)
return
raise TypeError("Can't set layer {}".format(self.layer))
[docs]class Cache(object):
"""A data store that's useful for tracking graph revisions."""
keycache_maxsize = 1024
def __init__(self, db):
self.db = db
self.parents = StructuredDefaultDict(3, TurnDict)
"""Entity data keyed by the entities' parents.
An entity's parent is what it's contained in. When speaking of a node,
this is its graph. When speaking of an edge, the parent is usually the
graph and the origin in a pair, though for multigraphs the destination
might be part of the parent as well.
Deeper layers of this cache are keyed by branch and revision.
"""
self.keys = StructuredDefaultDict(2, TurnDict)
"""Cache of entity data keyed by the entities themselves.
That means the whole tuple identifying the entity is the
top-level key in this cache here. The second-to-top level
is the key within the entity.
Deeper layers of this cache are keyed by branch, turn, and tick.
"""
self.keycache = PickyDefaultDict(SettingsTurnDict)
"""Keys an entity has at a given turn and tick."""
self.branches = StructuredDefaultDict(1, TurnDict)
"""A less structured alternative to ``keys``.
For when you already know the entity and the key within it,
but still need to iterate through history to find the value.
"""
self.shallowest = OrderedDict()
"""A dictionary for plain, unstructured hinting."""
self.settings = PickyDefaultDict(SettingsTurnDict)
"""All the ``entity[key] = value`` operations that were performed on some turn"""
self.presettings = PickyDefaultDict(SettingsTurnDict)
"""The values prior to ``entity[key] = value`` operations performed on some turn"""
self._kc_lru = OrderedDict()
[docs] def load(self, data, validate=False, cb=None):
"""Add a bunch of data. It doesn't need to be in chronological order.
With ``validate=True``, raise ValueError if this results in an
incoherent cache.
If a callable ``cb`` is provided, it will be called with each row.
It will also be passed my ``validate`` argument.
"""
from collections import defaultdict, deque
dd2 = defaultdict(lambda: defaultdict(list))
for row in data:
entity, key, branch, turn, tick, value = row[-6:]
dd2[branch][turn, tick].append(row)
# Make keycaches and valcaches. Must be done chronologically
# to make forwarding work.
childbranch = self.db._childbranch
branch2do = deque(['trunk'])
def store(*args):
self._store(*args, planning=False)
while branch2do:
branch = branch2do.popleft()
dd2b = dd2[branch]
for turn, tick in sorted(dd2b.keys()):
rows = dd2b[turn, tick]
for row in rows:
store(*row)
if cb:
cb(row, validate=validate)
if branch in childbranch:
branch2do.extend(childbranch[branch])
def _valcache_lookup(self, cache, branch, turn, tick):
if branch in cache:
branc = cache[branch]
try:
if turn in branc and branc[turn].rev_gettable(tick):
return branc[turn][tick]
elif branc.rev_gettable(turn-1):
turnd = branc[turn-1]
return turnd[turnd.end]
except HistoryError as ex:
# probably shouldn't ever happen, empty branches shouldn't be kept in the cache at all...
# but it's easy to handle
if ex.deleted:
raise
for b, r, t in self.db._iter_parent_btt(branch, turn, tick):
if b in cache:
if r in cache[b] and cache[b][r].rev_gettable(t):
try:
return cache[b][r][t]
except HistoryError as ex:
if ex.deleted:
raise
elif cache[b].rev_gettable(r-1):
cbr = cache[b][r-1]
try:
return cbr[cbr.end]
except HistoryError as ex:
if ex.deleted:
raise
def _get_keycachelike(self, keycache, keys, get_adds_dels, parentity, branch, turn, tick, *, forward):
keycache_key = parentity + (branch,)
if keycache_key in keycache and turn in keycache[keycache_key] and tick in keycache[keycache_key][turn]:
return keycache[keycache_key][turn][tick]
if forward:
# Take valid values from the past of a keycache and copy them forward, into the present.
# Assumes that time is only moving forward, never backward, never skipping any turns or ticks,
# and any changes to the world state are happening through allegedb proper, meaning they'll all get cached.
# In LiSE this means every change to the world state should happen inside of a call to
# ``Engine.next_turn`` in a rule.
if keycache_key in keycache and keycache[keycache_key].rev_gettable(turn):
kc = keycache[keycache_key]
if turn not in kc:
old_turn = kc.rev_before(turn)
old_turn_kc = kc[turn]
added, deleted = get_adds_dels(
keys[parentity], branch, turn, tick, stoptime=(
branch, old_turn, old_turn_kc.end
)
)
ret = old_turn_kc[old_turn_kc.end].union(added).difference(deleted)
# assert ret == get_adds_dels(keys[parentity], branch, turn, tick)[0] # slow
new_turn_kc = WindowDict()
new_turn_kc[tick] = ret
kc[turn] = new_turn_kc
return ret
kcturn = kc[turn]
if tick not in kcturn:
if kcturn.rev_gettable(tick):
added, deleted = get_adds_dels(
keys[parentity], branch, turn, tick, stoptime=(
branch, turn, kcturn.rev_before(tick)
)
)
ret = kcturn[tick].union(added).difference(deleted)
# assert ret == get_adds_dels(keys[parentity], branch, turn, tick)[0] # slow
kcturn[tick] = ret
return ret
else:
turn_before = kc.rev_before(turn)
tick_before = kc[turn_before].end
keys_before = kc[turn_before][tick_before]
added, deleted = get_adds_dels(
keys[parentity], branch, turn, tick, stoptime=(
branch, turn_before, tick_before
)
)
ret = kcturn[tick] = keys_before.union(added).difference(deleted)
# assert ret == get_adds_dels(keys[parentity], branch, turn, tick)[0] # slow
return ret
# assert kcturn[tick] == get_adds_dels(keys[parentity], branch, turn, tick)[0] # slow
return kcturn[tick]
else:
for (parbranch, parturn, partick) in self.db._iter_parent_btt(branch, turn, tick):
par_kc_key = parentity + (parbranch,)
if par_kc_key in keycache:
kcpkc = keycache[par_kc_key]
if parturn in kcpkc and kcpkc[parturn].rev_gettable(partick):
parkeys = kcpkc[parturn][partick]
break
elif kcpkc.rev_gettable(parturn-1):
partkeys = kcpkc[parturn-1]
parkeys = partkeys[partkeys.end]
break
else:
parkeys = frozenset()
kc = SettingsTurnDict()
added, deleted = get_adds_dels(
keys[parentity], branch, turn, tick, stoptime=(
parbranch, parturn, partick
)
)
ret = parkeys.union(added).difference(deleted)
kc[turn] = {tick: ret}
keycache[keycache_key] = kc
# assert ret == get_adds_dels(keys[parentity], branch, turn, tick)[0] # slow
return ret
ret = frozenset(get_adds_dels(keys[parentity], branch, turn, tick)[0])
if keycache_key in keycache:
if turn in keycache[keycache_key]:
keycache[keycache_key][turn][tick] = ret
else:
keycache[keycache_key][turn] = {tick: ret}
else:
kcc = SettingsTurnDict()
kcc[turn][tick] = ret
keycache[keycache_key] = kcc
return ret
def _get_keycache(self, parentity, branch, turn, tick, *, forward):
self._lru_append(self.keycache, self._kc_lru, (parentity+(branch,), turn, tick), self.keycache_maxsize)
return self._get_keycachelike(
self.keycache, self.keys, self._get_adds_dels,
parentity, branch, turn, tick, forward=forward
)
def _update_keycache(self, *args, forward):
entity, key, branch, turn, tick, value = args[-6:]
parent = args[:-6]
kc = self._get_keycache(parent + (entity,), branch, turn, tick, forward=forward)
if value is None:
kc = kc.difference((key,))
else:
kc = kc.union((key,))
self.keycache[parent+(entity, branch)][turn][tick] = kc
def _lru_append(self, kc, lru, kckey, maxsize):
if kckey in lru:
return
while len(lru) >= maxsize:
(peb, turn, tick), _ = lru.popitem(False)
del kc[peb][turn][tick]
if not kc[peb][turn]:
del kc[peb][turn]
if not kc[peb]:
del kc[peb]
lru[kckey] = True
def _get_adds_dels(self, cache, branch, turn, tick, *, stoptime=None):
added = set()
deleted = set()
for key, branches in cache.items():
for (branc, trn, tck) in self.db._iter_parent_btt(branch, turn, tick, stoptime=stoptime):
if branc not in branches or not branches[branc].rev_gettable(trn):
continue
turnd = branches[branc]
if trn in turnd:
if turnd[trn].rev_gettable(tck):
try:
if turnd[trn][tck] is not None:
added.add(key)
break
except HistoryError as ex:
if ex.deleted:
deleted.add(key)
break
else:
trn -= 1
if not turnd.rev_gettable(trn):
break
tickd = turnd[trn]
try:
if tickd[tickd.end] is not None:
added.add(key)
break
except HistoryError as ex:
if ex.deleted:
deleted.add(key)
break
return added, deleted
[docs] def store(self, *args, planning=None, forward=None):
"""Put a value in various dictionaries for later .retrieve(...).
Needs at least five arguments, of which the -1th is the value
to store, the -2th is the tick to store it at, the -3th
is the turn to store it in, the -4th is the branch the
revision is in, the -5th is the key the value is for,
and the remaining arguments identify the entity that has
the key, eg. a graph, node, or edge.
With ``planning=True``, you will be permitted to alter
"history" that takes place after the last non-planning
moment of time, without much regard to consistency.
Otherwise, contradictions will be handled by deleting
everything after the present moment.
"""
if planning is None:
planning = self.db._planning
if forward is None:
forward = self.db._forward
self._store(*args, planning=planning)
if not self.db._no_kc:
self._update_keycache(*args, forward=forward)
def _store(self, *args, planning):
entity, key, branch, turn, tick, value = args[-6:]
parent = args[:-6]
if parent:
parentity = self.parents[parent][entity]
if key in parentity:
branches = parentity[key]
assert branches is self.branches[parent+(entity, key)] is self.keys[parent+(entity,)][key]
turns = branches[branch]
else:
branches = self.branches[parent+(entity, key)] \
= self.keys[parent+(entity,)][key] \
= parentity[key]
turns = branches[branch]
else:
if (entity, key) in self.branches:
branches = self.branches[entity, key]
assert branches is self.keys[entity, ][key]
turns = branches[branch]
else:
branches = self.branches[entity, key]
self.keys[entity,][key] = branches
turns = branches[branch]
if planning:
if turn in turns and tick < turns[turn].end:
raise HistoryError(
"Already have some ticks after {} in turn {} of branch {}".format(
tick, turn, branch
)
)
else:
# truncate settings
for mapp in (self.settings[branch], self.presettings[branch], turns):
if turn in mapp:
mapp[turn].truncate(tick)
mapp.truncate(turn)
self._store_journal(*args)
self.shallowest[parent+(entity, key, branch, turn, tick)] = value
while len(self.shallowest) > self.keycache_maxsize:
self.shallowest.popitem(False)
if turn in turns:
the_turn = turns[turn]
the_turn.truncate(tick)
the_turn[tick] = value
else:
new = FuturistWindowDict()
new[tick] = value
turns[turn] = new
def _store_journal(self, *args):
# overridden in LiSE.cache.InitializedCache
entity, key, branch, turn, tick, value = args[-6:]
parent = args[:-6]
settings_turns = self.settings[branch]
presettings_turns = self.presettings[branch]
try:
prev = self.retrieve(*args[:-1])
except KeyError:
prev = None
if turn in settings_turns or turn in settings_turns.future():
# These assertions hold for most caches but not for the contents
# caches, and are therefore commented out.
# assert turn in presettings_turns or turn in presettings_turns.future()
setticks = settings_turns[turn]
# assert tick not in setticks
presetticks = presettings_turns[turn]
# assert tick not in presetticks
presetticks[tick] = parent + (entity, key, prev)
setticks[tick] = parent + (entity, key, value)
else:
presettings_turns[turn] = {tick: parent + (entity, key, prev)}
settings_turns[turn] = {tick: parent + (entity, key, value)}
[docs] def retrieve(self, *args):
"""Get a value previously .store(...)'d.
Needs at least five arguments. The -1th is the tick
within the turn you want,
the -2th is that turn, the -3th is the branch,
and the -4th is the key. All other arguments identify
the entity that the key is in.
"""
try:
ret = self.shallowest[args]
if ret is None:
raise HistoryError("Set, then deleted", deleted=True)
return ret
except HistoryError:
raise
except KeyError:
pass
entity = args[:-4]
key, branch, turn, tick = args[-4:]
if entity+(key,) not in self.branches:
raise KeyError
if (
branch in self.branches[entity+(key,)]
and self.branches[entity+(key,)][branch].rev_gettable(turn)
):
brancs = self.branches[entity+(key,)][branch]
if turn in brancs:
if brancs[turn].rev_gettable(tick):
ret = brancs[turn][tick]
self.shallowest[args] = ret
return ret
elif brancs.rev_gettable(turn-1):
b1 = brancs[turn-1]
ret = b1[b1.end]
self.shallowest[args] = ret
return ret
else:
ret = brancs[turn]
ret = ret[ret.end]
self.shallowest[args] = ret
return ret
for (b, r, t) in self.db._iter_parent_btt(branch):
if (
b in self.branches[entity+(key,)]
and self.branches[entity+(key,)][b].rev_gettable(r)
):
brancs = self.branches[entity+(key,)][b]
if r in brancs and brancs[r].rev_gettable(t):
ret = brancs[r][t]
elif brancs.rev_gettable(r-1):
ret = brancs[r-1]
ret = ret[ret.end]
else:
continue
self.shallowest[args] = ret
return ret
else:
raise KeyError
[docs] def iter_entities_or_keys(self, *args, forward=None):
"""Iterate over the keys an entity has, if you specify an entity.
Otherwise iterate over the entities themselves, or at any rate the
tuple specifying which entity.
"""
if forward is None:
forward = self.db._forward
entity = args[:-3]
branch, turn, tick = args[-3:]
if self.db._no_kc:
yield from self._get_adds_dels(self.keys[entity], branch, turn, tick)[0]
return
yield from self._get_keycache(entity, branch, turn, tick, forward=forward)
iter_entities = iter_keys = iter_entity_keys = iter_entities_or_keys
[docs] def count_entities_or_keys(self, *args, forward=None):
"""Return the number of keys an entity has, if you specify an entity.
Otherwise return the number of entities.
"""
if forward is None:
forward = self.db._forward
entity = args[:-3]
branch, turn, tick = args[-3:]
if self.db._no_kc:
return len(self._get_adds_dels(self.keys[entity], branch, turn, tick)[0])
return len(self._get_keycache(entity, branch, turn, tick, forward=forward))
count_entities = count_keys = count_entity_keys = count_entities_or_keys
[docs] def contains_entity_or_key(self, *args):
"""Check if an entity has a key at the given time, if entity specified.
Otherwise check if the entity exists.
"""
try:
return self.retrieve(*args) is not None
except KeyError:
return False
contains_entity = contains_key = contains_entity_key \
= contains_entity_or_key
[docs]class NodesCache(Cache):
"""A cache for remembering whether nodes exist at a given time."""
def _store(self, graph, node, branch, turn, tick, ex, *, planning):
if not ex:
ex = None
return super()._store(graph, node, branch, turn, tick, ex, planning=planning)
[docs]class EdgesCache(Cache):
"""A cache for remembering whether edges exist at a given time."""
@property
def successors(self):
return self.parents
def __init__(self, db):
Cache.__init__(self, db)
self.destcache = PickyDefaultDict(SettingsTurnDict)
self.origcache = PickyDefaultDict(SettingsTurnDict)
self.predecessors = StructuredDefaultDict(3, TurnDict)
self._origcache_lru = OrderedDict()
self._destcache_lru = OrderedDict()
def _adds_dels_sucpred(self, cache, branch, turn, tick, *, stoptime=None):
added = set()
deleted = set()
for node, nodes in cache.items():
addidx, delidx = self._get_adds_dels(nodes, branch, turn, tick, stoptime=stoptime)
if addidx:
assert not delidx
added.add(node)
elif delidx:
assert delidx and not addidx
deleted.add(node)
return added, deleted
def _get_destcache(self, graph, orig, branch, turn, tick, *, forward):
self._lru_append(self.destcache, self._destcache_lru, ((graph, orig, branch), turn, tick), self.keycache_maxsize)
return self._get_keycachelike(
self.destcache, self.successors, self._adds_dels_sucpred, (graph, orig),
branch, turn, tick, forward=forward
)
def _get_origcache(self, graph, dest, branch, turn, tick, *, forward):
self._lru_append(self.origcache, self._origcache_lru, ((graph, dest, branch), turn, tick), self.keycache_maxsize)
return self._get_keycachelike(
self.origcache, self.predecessors, self._adds_dels_sucpred, (graph, dest),
branch, turn, tick, forward=forward
)
[docs] def iter_successors(self, graph, orig, branch, turn, tick, *, forward=None):
"""Iterate over successors of a given origin node at a given time."""
if self.db._no_kc:
yield from self._adds_dels_sucpred(self.successors[graph, orig], branch, turn, tick)[0]
return
if forward is None:
forward = self.db._forward
yield from self._get_destcache(graph, orig, branch, turn, tick, forward=forward)
[docs] def iter_predecessors(self, graph, dest, branch, turn, tick, *, forward=None):
"""Iterate over predecessors to a given destination node at a given time."""
if self.db._no_kc:
yield from self._adds_dels_sucpred(self.predecessors[graph, dest], branch, turn, tick)[0]
return
if forward is None:
forward = self.db._forward
yield from self._get_origcache(graph, dest, branch, turn, tick, forward=forward)
[docs] def count_successors(self, graph, orig, branch, turn, tick, *, forward=None):
"""Return the number of successors to a given origin node at a given time."""
if self.db._no_kc:
return len(self._adds_dels_sucpred(self.successors[graph, orig], branch, turn, tick)[0])
if forward is None:
forward = self.db._forward
return len(self._get_destcache(graph, orig, branch, turn, tick, forward=forward))
[docs] def count_predecessors(self, graph, dest, branch, turn, tick, *, forward=None):
"""Return the number of predecessors from a given destination node at a given time."""
if self.db._no_kc:
return len(self._adds_dels_sucpred(self.predecessors[graph, dest], branch, turn, tick)[0])
if forward is None:
forward = self.db._forward
return len(self._get_origcache(graph, dest, branch, turn, tick, forward=forward))
[docs] def has_successor(self, graph, orig, dest, branch, turn, tick):
"""Return whether an edge connects the origin to the destination at the given time."""
try:
return self.retrieve(graph, orig, dest, 0, branch, turn, tick) is not None
except KeyError:
return False
[docs] def has_predecessor(self, graph, dest, orig, branch, turn, tick):
"""Return whether an edge connects the destination to the origin at the given time."""
try:
return self.retrieve(graph, orig, dest, 0, branch, turn, tick) is not None
except KeyError:
return False
def _store(self, graph, orig, dest, idx, branch, turn, tick, ex, *, planning=None):
if not ex:
ex = None
if planning is None:
planning = self.db.planning
Cache._store(self, graph, orig, dest, idx, branch, turn, tick, ex, planning=planning)
self.predecessors[(graph, dest)][orig][idx][branch][turn] \
= self.successors[graph, orig][dest][idx][branch][turn]
if ex:
assert self.has_successor(graph, orig, dest, branch, turn, tick)
assert self.has_predecessor(graph, dest, orig, branch, turn, tick)