from collections import Counter
from graphbrain.utils.corefs import main_coref
from graphbrain.processor import Processor
from graphbrain.utils.concepts import has_proper_concept, strip_concept
from graphbrain.utils.lemmas import deep_lemma
[docs]def is_actor(hg, edge):
"""Checks if the edge is a coreference to an actor."""
if edge.mtype() == 'C':
return hg.exists(('actor/P/.', main_coref(hg, edge)))
else:
return False
[docs]def find_actors(hg, edge):
"""Returns set of all coreferences to actors found in the edge."""
_actors = set()
if is_actor(hg, edge):
_actors.add(main_coref(hg, edge))
if edge.not_atom:
for item in edge:
_actors |= find_actors(hg, item)
return _actors
[docs]def actors(hg):
""""Returns an iterator over all actors."""
return [edge[1] for edge in hg.search('(actor/P/. *)', strict=True)]
ACTOR_PRED_LEMMAS = {'say', 'claim', 'warn', 'kill', 'accuse', 'condemn',
'slam', 'arrest', 'clash', 'blame', 'want', 'call',
'tell'}
class Actors(Processor):
def __init__(self, hg, sequence=None):
super().__init__(hg=hg, sequence=sequence)
self.actor_counter = Counter()
def process_edge(self, edge):
if edge.not_atom:
ct = edge.connector_type()
if ct[0] == 'P':
subjects = edge.edges_with_argrole('s')
if len(subjects) == 1:
subject = strip_concept(subjects[0])
if subject and has_proper_concept(subject):
pred = edge[0]
dlemma = deep_lemma(self.hg, pred).root()
if dlemma in ACTOR_PRED_LEMMAS:
try:
actor = main_coref(self.hg, subject)
self.actor_counter[actor] += 1
except Exception as e:
print(str(e))
def on_end(self):
for actor in self.actor_counter:
self.hg.add(('actor/P/.', actor))
def report(self):
return 'actors found: {}'.format(len(self.actor_counter))