Source code for graphbrain.meaning.parser

import sys
import logging
import spacy
from graphbrain import *
from graphbrain.meaning.nlpvis import print_tree


logging.basicConfig(stream=sys.stderr, level=logging.WARNING)


deps_arg_types = {
    'nsubj': 's',      # subject
    'nsubjpass': 'p',  # passive subject
    'agent': 'a',      # agent
    'acomp': 'c',      # subject complement
    'attr': 'c',       # subject complement
    'dobj': 'o',       # direct object
    'prt': 'o',        # direct object
    'dative': 'i',     # indirect object
    'advcl': 'x',      # specifier
    'prep': 'x',       # specifier
    'npadvmod': 'x',   # specifier
    'parataxis': 't',  # parataxis
    'intj': 'j',       # interjection
    'xcomp': 'r',      # clausal complement
    'ccomp': 'r'       # clausal complement
}


def token_head_type(token):
    head = token.head
    if head and head != token:
        return token_type(head)
    else:
        return ''


def is_noun(token):
    return token.tag_[:2] == 'NN'


def is_verb(token):
    tag = token.tag_
    if len(tag) > 0:
        return token.tag_[0] == 'V'
    else:
        return False


def is_infinitive(token):
    return token.tag_ == 'VB'


def is_compound(token):
    return token.dep_ == 'compound'


def token_type(token, head=False):
    if is_noun(token):
        return 'c'

    dep = token.dep_
    head_type = token_head_type(token)
    if len(head_type) > 1:
        head_subtype = head_type[1]
    else:
        head_subtype = ''
    if len(head_type) > 0:
        head_type = head_type[0]

    if dep == 'ROOT':
        if token.pos_ == 'VERB':  # TODO: generalize!
            return 'p'
        else:
            return 'c'
    elif dep in {'appos', 'attr', 'compound', 'dative', 'dep', 'dobj',
                 'nsubj', 'nsubjpass', 'oprd', 'pobj', 'meta'}:
        return 'c'
    elif dep in {'advcl', 'ccomp', 'csubj', 'csubjpass', 'parataxis'}:
        return 'p'
    elif dep == 'relcl':
        if is_verb(token):
            return 'pr'
        else:
            return 'c'
    elif dep in {'acl', 'pcomp', 'xcomp'}:
        if token.tag_ == 'IN':
            return 'a'
        else:
            return 'pc'
    elif dep in {'amod', 'det', 'npadvmod', 'nummod', 'nmod', 'preconj',
                 'predet'}:
        return 'm'
    elif dep in {'aux', 'auxpass', 'expl', 'prt', 'quantmod'}:
        if token.n_lefts + token.n_rights == 0:
            return 'a'
        else:
            return 'x'
    elif dep == 'cc':
        if head_type == 'p':
            return 'pm'
        else:
            return 'b'
    elif dep == 'case':
        if token.head.dep_ == 'poss':
            return 'bp'
        else:
            return 'b'
    elif dep == 'neg':
        return 'an'
    elif dep == 'agent':
        return 'x'
    elif dep in {'intj', 'punct'}:
        return ''
    elif dep == 'advmod':
        if token.head.dep_ == 'advcl':
            return 't'
        elif head_type == 'p':
            return 'a'
        elif head_type in {'m', 'x', 't', 'b'}:
            return 'w'
        else:
            return 'm'
    elif dep == 'poss':
        if is_noun(token):
            return 'c'
        else:
            return 'mp'
    elif dep == 'prep':
        if head_type == 'p':
            return 't'
        else:
            return 'b'
    elif dep == 'conj':
        if head_type == 'p' and is_verb(token):
            return 'p'
        else:
            return 'c'
    elif dep == 'mark':
        if head_type == 'p' and head_subtype != 'c':
            return 'x'
        else:
            return 'b'
    elif dep == 'acomp':
        if is_verb(token):
            return 'x'
        else:
            return 'c'
    else:
        #  error / warning ?
        pass


def is_relative_concept(token):
    return token.dep_ == 'appos'


def arg_type(token):
    return deps_arg_types.get(token.dep_, '?')


def insert_after_predicate(targ, orig):
    targ_type = entity_type(targ)
    if targ_type[0] == 'p':
        return (targ, orig)
    elif targ_type[0] == 'r':
        if targ_type == 'rm':
            inner_rel = insert_after_predicate(targ[1], orig)
            return (targ[0], inner_rel) + tuple(targ[2:])
        else:
            return insert_first_argument(targ, orig)
    else:
        # TODO: error / warning
        print('ERROR %s %s' % (targ, orig))
        return targ


def nest_predicate(inner, outer, before):
    if entity_type(inner) == 'rm':
        first_rel = nest_predicate(inner[1], outer, before)
        return (inner[0], first_rel) + tuple(inner[2:])
    elif is_atom(inner) or entity_type(inner)[0] == 'p':
        return outer, inner
    else:
        return ((outer, inner[0]),) + inner[1:]


def post_process(entity):
    if is_atom(entity):
        return entity
    else:
        entity = tuple(post_process(item) for item in entity)
        ct = connector_type(entity)
        if ct[0] == 'c':
            return connect('+/b/.', entity)
        elif ct[0] == 'b' and is_atom(entity[0]) and len(entity) == 2:
            ps = atom_parts(entity[0])
            ps[1] = 'm' + ct[1:]
            return ('/'.join(ps),) + entity[1:]
        elif (ct[0] == 'w' and is_atom(entity[0]) and len(entity) == 2 and
                is_edge(entity[1]) and connector_type(entity[1])[0] == 'm'):
            return ((entity[0], entity[1][0]),) + entity[1][1:]
        else:
            return entity


[docs]class Parser(object): def __init__(self, lang, pos=False, lemmas=False): self.lang = lang self.pos = pos self.lemmas = lemmas if lang == 'en': self.nlp = spacy.load('en_core_web_lg') elif lang == 'fr': self.nlp = spacy.load('fr_core_news_md') else: raise RuntimeError('unkown language: %s' % lang) def parse_token(self, token): extra_edges = set() positions = {} tokens = {} children = [] entities = [] child_tokens = tuple((t, True) for t in token.lefts) child_tokens += tuple((t, False) for t in token.rights) for child_token, pos in child_tokens: child, child_extra_edges = self.parse_token(child_token) if child: extra_edges |= child_extra_edges positions[child] = pos tokens[child] = child_token child_type = entity_type(child) if child_type: children.append(child) if child_type[0] in {'c', 'r', 'd', 's'}: entities.append(child) children.reverse() parent_type = token_type(token) if parent_type == '' or parent_type is None: return None, None # build atom text = token.text.lower() et = parent_type if self.pos: pos = '{}.{}'.format(self.lang, token.tag_.lower()) else: pos = None if parent_type[0] == 'p' and parent_type != 'pm': args = [arg_type(tokens[entity]) for entity in entities] args_string = ''.join([arg for arg in args if arg != '?']) # assign predicate subtype # (declarative, imperative, interrogative, ...) if len(parent_type) == 1: # interrogative cases last_token = child_tokens[-1][0] if (last_token.tag_ == '.' and last_token.dep_ == 'punct' and last_token.lemma_.strip() == '?'): parent_type = 'p?' # imperative cases elif (is_infinitive(token) and 's' not in args_string and 'TO' not in [child[0].tag_ for child in child_tokens]): parent_type = 'p!' # declarative (by default) else: parent_type = 'pd' et = '{}.{}'.format(parent_type, args_string) parent_atom = build_atom(text, et, pos) parent = parent_atom # lemma if self.lemmas: text = token.lemma_.lower() lemma = build_atom(text, et[0], pos) if parent != lemma: lemma_edge = ('lemma/p/.', parent, lemma) extra_edges.add(lemma_edge) relative_to_concept = [] # process children for child in children: child_type = entity_type(child) logging.debug('TARGET <-: [%s] %s', parent_type, parent) logging.debug('<- ORIG: [%s] %s', child_type, child) if child_type[0] in {'c', 'r', 'd', 's'}: if parent_type[0] == 'c': if (connector_type(child) in {'pc', 'pr'} or is_relative_concept(tokens[child])): logging.debug('CHOICE #1') relative_to_concept.append(child) elif connector_type(child)[0] == 'b': if connector_type(parent)[0] == 'c': logging.debug('CHOICE #2') parent = nest(parent, child, positions[child]) else: logging.debug('CHOICE #3') parent = apply_fun_to_atom( lambda target: nest(target, child, positions[child]), parent_atom, parent) elif connector_type(child)[0] in {'x', 't'}: logging.debug('CHOICE #4') parent = nest(parent, child, positions[child]) else: if ((entity_type(parent_atom)[0] == 'c' and connector_type(child)[0] == 'c') or is_compound(tokens[child])): if connector_type(parent)[0] == 'c': if connector_type(child)[0] == 'c': logging.debug('CHOICE #5a') parent = sequence(parent, child, positions[child]) else: logging.debug('CHOICE #5b') parent = sequence(parent, child, positions[child], flat=False) else: logging.debug('CHOICE #6') parent = apply_fun_to_atom( lambda target: sequence(target, child, positions[child]), parent_atom, parent) else: logging.debug('CHOICE #7') parent = apply_fun_to_atom( lambda target: connect(target, (child,)), parent_atom, parent) elif parent_type[0] in {'p', 'r', 'd', 's'}: logging.debug('CHOICE #8') parent = insert_after_predicate(parent, child) else: logging.debug('CHOICE #9') parent = insert_first_argument(parent, child) elif child_type[0] == 'b': if connector_type(parent) == 'c': logging.debug('CHOICE #10') parent = connect(child, parent) else: logging.debug('CHOICE #11') parent = nest(parent, child, positions[child]) elif child_type[0] == 'p': # TODO: Pathological case # e.g. "Some subspecies of mosquito might be 1s..." if child_type == 'pm': logging.debug('CHOICE #12') # parent = nest(parent, child, positions[child]) parent = (child,) + parens(parent) else: logging.debug('CHOICE #13') parent = connect(parent, (child,)) elif child_type[0] == 'm': logging.debug('CHOICE #14') parent = (child, parent) elif child_type[0] in {'x', 't'}: logging.debug('CHOICE #15') parent = (child, parent) elif child_type[0] == 'a': logging.debug('CHOICE #16') parent = nest_predicate(parent, child, positions[child]) elif child_type == 'w': if parent_type[0] in {'d', 's'}: logging.debug('CHOICE #17') parent = nest_predicate(parent, child, positions[child]) # pass else: logging.debug('CHOICE #18') parent = nest(parent, child, positions[child]) else: # TODO: warning ? logging.debug('CHOICE #19') pass parent_type = entity_type(parent) logging.debug('=== [%s] %s', parent_type, parent) if len(relative_to_concept) > 0: relative_to_concept.reverse() parent = (':/b/.', parent) + tuple(relative_to_concept) return parent, extra_edges def parse_sentence(self, sent): main_edge, extra_edges = self.parse_token(sent.root) main_edge = post_process(main_edge) return {'main_edge': main_edge, 'extra_edges': extra_edges, 'text': str(sent), 'spacy_sentence': sent} def parse(self, text): doc = self.nlp(text.strip()) return tuple(self.parse_sentence(sent) for sent in doc.sents)
if __name__ == '__main__': text = """ There’s also a link to the Turing Test that we finished up with last week. """ parser = Parser(lang='en', pos=True, lemmas=True) parse = parser.parse(text)[0] print_tree(parse['spacy_sentence'].root) print(ent2str(parse['main_edge'])) print('EXTRA EDGES:') for edge in parse['extra_edges']: print(ent2str(edge))