Source code for grewpy.grs

import json
import os.path

from . import network
from .grew import JSON
from grewpy.graph import Graph
from .corpus import Corpus, CorpusDraft, GrewError

[docs]class RequestItem(): def __init__(self,sort : str,*L): """ sort in {"global", "pattern", "without", "with"} L is a list of - ";" separated clauses or - a list of items - they will be concatenated """ self.sort = sort self.items = tuple() for elt in L: if isinstance(elt,str): self.items += tuple(c.strip() for c in elt.split(";") if c.strip()) else: self.items += tuple(elt)
[docs] def json_data(self): return {self.sort : list(self.items)}
[docs] @classmethod def from_json(cls, json_data : JSON) : k = list(json_data)[0] v = json_data[k] return cls(k,v)
def __str__(self): its = ";".join([str(x) for x in self.items]) return f"{self.sort} {{{its}}}" def __repr__(self): return f"{self.sort} {{{ ';'.join([str(x) for x in self.items]) }}}"
[docs]class Request(): """ lists of ClauseList """ def __init__(self, *L): """ L is either a list of - RequestItem or - (pattern) string or a - Request (for copies) """ if len(L) == 2 and isinstance(L[0],int): self.index = L[0] self.string = L[1] else: elts = tuple() for e in L: if isinstance(e,str): elts += (RequestItem("pattern", e),) elif isinstance(e,RequestItem): elts += (e,) elif isinstance(e,Request): elts += e.items elif isinstance(e,tuple): #supposed to be a list of ClauseList elts += e else: try: #suppose it is a generator for x in e: elts += (x,) except: raise ValueError(f"{e} cannot be used to build a Request") self.items = elts
[docs] def without(self, *L): if hasattr(self, 'items'): self.items += tuple(RequestItem("without", e) for e in L) return self else: raise ValueError("Abstract request")
[docs] def with_(self, *L): if hasattr(self, 'items'): self.items += tuple(RequestItem("with", e) for e in L) return self else: raise ValueError("Abstract request")
[docs] def global_(self, *L): self.items += tuple(RequestItem("global", e) for e in L) return self
[docs] @classmethod def from_json(cls,json_data): if isinstance(json_data,str): return cls.parse(json_data) if len(json_data) > 0 and isinstance(json_data[0], str): return cls.parse("\n".join(json_data)) else: elts = [RequestItem.from_json(c) for c in json_data] return cls(*elts)
[docs] @classmethod def parse(cls, string_request): req = {"command": "request_parse", "request": string_request} reply = network.send_and_receive(req) return cls(reply["index"], string_request)
[docs] def json_data(self): if hasattr(self, 'items'): return [x.json_data() for x in self.items] else: return { "index": self.index }
def __str__(self): if hasattr(self, 'items'): return "\n".join([str(e) for e in self.items]) else: return self.string def __iter__(self): if hasattr(self, 'items'): return iter(self.items) else: raise ValueError("Abstract request are not iterable")
[docs] def pattern(self): """ return the pattern of self as a tuple """ if hasattr(self, 'items'): return Request(p for p in self.items if p.sort == "pattern") else: raise ValueError("Abstract request")
[docs] def append(self, *L): """ Append a new ClauseList to the Request L is given either as a pair (s,t) with "s \in {'pattern','without','meta'} and t : str or L[0] is a ClauseList """ if hasattr(self, 'items'): if len(L) == 2: self.items = self.items + (RequestItem(L[0], L[1]),) elif len(L) == 1: self.items = self.items + L else: raise ValueError(f"cannot build a clause list with {L}") else: raise ValueError("Abstract request")
[docs]class Command: def __init__(self,s): """ self.item = str representation of the command """ self.item = s
[docs] def json_data(self): return self.item
def __str__(self): return str(self.item)
[docs] def safe(self): """ return a clause list for a safe request """ raise NotImplementedError ("not yet implemented")
[docs]class Add_edge(Command): def __init__(self,X,e,Y): if isinstance(e, dict): s = ",".join(f"{k}={v}" for k,v in e.items()) else: s = str(e) super().__init__(f"add_edge {X}-[{s}]->{Y}") self.X, self.e, self.Y = X, e, Y
[docs] def safe(self): return RequestItem("without",self.item.replace("add_edge",""))
def __repr__(self): return str(self)
[docs]class Delete_edge(Command): def __init__(self, X, e, Y): super().__init__(f"del_edge {X}-[{e}]->{Y}") self.X, self.e, self.Y = X, e, Y
[docs] def safe(self): return RequestItem("pattern", self.item.replace("add_edge", ""))
[docs]class Delete_feature(Command): def __init__(self, X, f): super().__init__(f"del_feat {X}.{f}") self.X = X self.f = f
[docs]class Commands(list): def __init__(self, *L): super().__init__() for elt in L: if isinstance(elt,str): self += [t.strip() for t in elt.split(";") if t.strip()] elif isinstance(elt,list): self += elt elif isinstance(elt, Command): self.append(elt) def __str__(self): c = ";".join([str(x) for x in self]) return f"commands {{{c}}}"
[docs] @classmethod def from_json(cls, json_data): return cls(*json_data)
[docs] def json_data(self): return [x if isinstance(x,str) else x.json_data() for x in self]
[docs]class Rule(): def __init__(self, request : Request, cmd_list : Commands, lexicons =None): self.request = request self.commands = cmd_list self.lexicons = lexicons if lexicons else dict()
[docs] def json_data(self): p = self.request.json_data() c = self.commands.json_data() return {"request" : p, "commands" : c, "lexicons" : json.dumps(self.lexicons)}
def __str__(self): return f"{str(self.request)}\n{str(self.commands)}" def __repr__(self): return f"{str(self.request)}\n{str(self.commands)}"
[docs] @classmethod def from_json(cls,json_data): # print(json_data) reqs = Request.from_json(json_data["request"]) cmds = Commands.from_json(json_data["commands"]) return cls(reqs,cmds)
[docs]class Package(dict): """ dict mapping names to rule/package/strategies"""
[docs] @classmethod def from_json(cls, json_data): res = Package._from_json(json_data) return cls(res)
def _from_json(json_data): res = dict() for k,v in json_data.items(): if isinstance(v,str): res[k] = v elif "decls" in v: #it is a package res[k] = Package.from_json(v["decls"]) else: res[k] = Rule.from_json(v) return res
[docs] def json_data(self): elts = dict() for k,v in self.items(): elts[k] = v if isinstance(v,str) else v.json_data() return {"decls" : elts}
def __str__(self): res = [f"strat {k} {{{self[k]}}}" for k in self.strategies()] +\ [f"package {k} {{{str(self[k])}}}" for k in self.packages()] +\ [f"rule {k} {{{str(self[k])}}}" for k in self.rules()] return "\n".join(res)
[docs] def rules(self): return filter(lambda x: isinstance(self[x], Rule), self.__iter__())
[docs] def packages(self): return filter(lambda x: isinstance(self[x], Package), self.__iter__())
[docs] def strategies(self): return filter(lambda x: isinstance(self[x], str), self.__iter__())
[docs]class GRSDraft(Package): """ A GRSDraft is a structure that gives access to the internals of a Graph Rewriting System: packages, rules, patterns, strategies, etc It cannot be used to perform rewriting, for that, use a GRS """ def __init__(self,args=None): """Load a grs stored in a file :param data: either a file name or a Grew string representation of a grs :or kwargs contains explicitly the parts of the grs :return: an integer index for latter reference to the grs :raise an error if the file was not correctly loaded """ if isinstance(args,str): agrs = GRS(args) json_data = agrs.json() res = Package._from_json(json_data["decls"]) super().__init__(res) elif isinstance(args, dict): super().__init__( args ) elif args == None: super().__init__() def __str__(self): return super().__str__()
[docs] def rules(self): return (rule_name for rule_name in self if isinstance(self[rule_name],Rule))
[docs] def safe_rules(self): """ create a new grs with application of safe to each rule. self.rules() are supposed to contain only Commands of length 1 that support safe method """ grs = GRSDraft() for rule_name in self.rules(): rule = self[rule_name] cde = rule.commands[0] safe_request = Request(rule.request) safe_request.append(cde.safe()) safe_rule = Rule(safe_request, rule.commands, rule.lexicons) grs[rule_name] = safe_rule return grs
[docs] def onf(self, strat_name="main"): self[strat_name] = f'Onf(Alt({",".join(self.rules())}))' return self
[docs] def save(self, filename): with open(filename, "w") as f: f.write(str(self))
[docs]def constant_UD2bUD(cls): cls.UD2bUD = cls(""" package UD2bUD { rule enh { % remove enhanced relations pattern { e:N -[enhanced=yes]-> M } commands { del_edge e} } rule empty { % remove empty nodes pattern { N [wordform=__EMPTY__, textform=_] } commands { del_node N } } } strat main { Onf(UD2bUD) } """) return cls
[docs]@constant_UD2bUD class GRS: """ An abstract GRS. Offers the possibility to apply rewriting. The object is abstract and cannot be changed. For that, use a GRSDraft """ def __init__(self, args): """Load a grs stored in a file :param data: either a file name or a Grew string representation of a grs :or kwargs contains explicitly the parts of the grs :return: an integer index for latter reference to the grs :raise an error if the file was not correctly loaded """ if isinstance(args, str): if os.path.isfile(args): req = {"command": "load_grs", "file": args} else: req = {"command": "load_grs", "str": args} elif isinstance(args, GRSDraft): req = {"command": "load_grs", "json": args.json_data()} elif isinstance(args, dict): """ suppose it is a GRS style """ try: grs = GRSDraft(args) req = {"command": "load_grs", "json": grs.json_data()} except GrewError as e: raise ValueError(f"cannot build a grs with {args}\n {e.message}") else: raise ValueError(f"cannot build a grs with {args}") reply = network.send_and_receive(req) self.id = reply["index"]
[docs] def json(self): req = {"command": "json_grs", "grs_index": self.id} return network.send_and_receive(req)
def __str__(self): return f"GRS({self.id})"
[docs] def run(self, data, strat="main"): """ run a Grs on a graph :param data: a graph or an AbstractCorpus :param strat: the strategy (by default "main") :return: a dictionary mapping sid to the list of rewritten graphs """ if isinstance(data, Graph): req = { "command": "grs_run_graph", "graph": json.dumps(data.json_data()), "grs_index": self.id, "strat": strat } reply = network.send_and_receive(req) return [Graph.from_json(s) for s in reply] elif isinstance(data, Corpus): req = { "command": "grs_run_corpus", "corpus_index": data.get_id(), "grs_index": self.id, "strat": strat } reply = network.send_and_receive(req) return {sid: [Graph(s) for s in L] for sid, L in reply.items() } elif isinstance(data, CorpusDraft): return {sid: self.run(g) for sid,g in data.items() }
[docs] def apply(self, data, strat="main", abstract=True): """ run a Grs on a graph or corpus :param grs_data: a graph rewriting system or a Grew string representation of a grs :param G: the graph, either a str (in grew format) or a dict :param strat: the strategy (by default "main") :return: the rewritten graph and an error if there is not exaclty one output graph """ if isinstance(data, Graph): req = { "command": "grs_apply_graph", "graph": json.dumps(data.json_data()), "grs_index": self.id, "strat": strat } reply = network.send_and_receive(req) return Graph(reply) elif isinstance(data, Corpus): req = { "command": "grs_apply_corpus", "corpus_index": data.get_id(), "grs_index": self.id, "strat": strat } # return None because inplace network.send_and_receive(req) return data if abstract else CorpusDraft (data) elif isinstance(data, CorpusDraft): acorpus = Corpus(data) self.apply(acorpus, strat, abstract)