import re
import graphbrain.constants as const
from graphbrain import hedge
from graphbrain.hyperedge import UniqueAtom
def _edge2text(edge, parse):
atoms = [UniqueAtom(atom) for atom in edge.all_atoms()]
tokens = [parse['atom2token'][atom] for atom in atoms if atom in parse['atom2token']]
if len(tokens) == 0:
return ''
tokens = sorted(tokens, key=lambda x: x.i)
prev_txt = tokens[0].text
txt_parts = [prev_txt]
sentence = str(parse['spacy_sentence'])
for token in tokens[1:]:
txt = token.text
res = re.search(r'{}(.*?){}'.format(re.escape(prev_txt), re.escape(txt)), sentence)
if res:
sep = res.group(1)
else:
sep = ' '
if any(letter.isalnum() for letter in sep):
sep = ' '
txt_parts.append(sep)
txt_parts.append(token.text)
prev_txt = txt
return ''.join(txt_parts)
def _set_edge_text(edge, hg, parse):
text = _edge2text(edge, parse)
hg.set_attribute(edge, 'text', text)
if edge.not_atom:
for subedge in edge:
_set_edge_text(subedge, hg, parse)
[docs]class Parser(object):
"""Defines the common interface for parser objects.
Parsers transofrm natural text into graphbrain hyperedges.
"""
def __init__(self, lemmas=True, corefs=True, debug=False):
self.lemmas = lemmas
self.corefs = corefs
self.debug = debug
# to be created by derived classes
self.lang = None
def debug_msg(self, msg):
if self.debug:
print(msg)
[docs] def parse(self, text):
"""Transforms the given text into hyperedges + aditional information.
Returns a dictionary with two fields:
-> parses: a sequence of dictionaries, with one dictionary for each
sentence found in the text.
-> inferred_edges: a sequence of edges inferred during by parsing
process (e.g. genders, 'X is Y' relationships)
Each sentence parse dictionary contains at least the following fields:
-> main_edge: the hyperedge corresponding to the sentence.
-> extra_edges: aditional edges, e.g. connecting atoms that appear
in the main_edge to their lemmas.
-> text: the string of natural language text corresponding to the
main_edge, i.e.: the sentence itself.
-> edges_text: a dictionary of all edges and subedges to their
corresponding text.
-> corefs: resolve coreferences.
"""
# replace newlines with spaces
clean_text = text.replace('\n', ' ').replace('\r', ' ')
# remove repeated spaces
clean_text = ' '.join(clean_text.split())
parse_results = self._parse(clean_text)
# coreference resolution
if self.corefs:
self._resolve_corefs(parse_results)
else:
for parse in parse_results['parses']:
parse['resolved_corefs'] = parse['main_edge']
return parse_results
def parse_and_add(self, text, hg, sequence=None, infsrcs=False, max_text=1500):
# split large blocks of text to avoid coreference resolution errors
if self.corefs and 0 < max_text < len(text):
for sentence in self.sentences(text):
self.parse_and_add(sentence, hg=hg, sequence=sequence, infsrcs=infsrcs, max_text=-1)
parse_results = self.parse(text)
edges = []
for parse in parse_results['parses']:
if parse['main_edge']:
edges.append(parse['main_edge'])
main_edge = parse['resolved_corefs']
if self.corefs:
unresolved_edge = parse['main_edge']
else:
unresolved_edge = None
# add main edge
if main_edge:
if sequence:
hg.add_to_sequence(sequence, main_edge)
else:
hg.add(main_edge)
# attach text to edge and subedges
_set_edge_text(main_edge, hg, parse)
# attach token list and token position structure to edge
self._set_edge_tokens(main_edge, hg, parse)
if self.corefs:
if unresolved_edge != main_edge:
_set_edge_text(unresolved_edge, hg, parse)
self._set_edge_tokens(unresolved_edge, hg, parse)
coref_res_edge = hedge((const.coref_res_connector, unresolved_edge, main_edge))
hg.add(coref_res_edge)
# add extra edges
for edge in parse['extra_edges']:
hg.add(edge)
for edge in parse_results['inferred_edges']:
hg.add(edge, count=True)
if infsrcs:
inference_srcs_edge = hedge([const.inference_srcs_connector, edge] + edges)
hg.add(inference_srcs_edge)
return parse_results
def sentences(self, text):
raise NotImplementedError()
def atom_gender(self, atom):
raise NotImplementedError()
def atom_number(self, atom):
raise NotImplementedError()
def atom_person(self, atom):
raise NotImplementedError()
def atom_animacy(self, atom):
raise NotImplementedError()
def _parse_token(self, token, atom_type):
raise NotImplementedError()
def _parse(self, text):
raise NotImplementedError()
def _set_edge_tokens(self, edge, hg, parse):
raise NotImplementedError()
def _resolve_corefs(self, parse_results):
# do nothing if not implemented in derived classes
for parse in parse_results['parses']:
parse['resolved_corefs'] = parse['main_edge']