import hashlib
import json
import pprint
import bisect
from collections import Counter
from copy import copy
from itertools import chain
from .object import JsonSerializedObject
[docs]class Eventuality(JsonSerializedObject):
""" ASER Eventuality
"""
def __init__(self, pattern="unknown", dependencies=None, skeleton_dependencies=None, parsed_result=None, use_lemma=True):
"""
:param pattern: the corresponding pattern
:type pattern: str
:param dependencies: the corresponding dependencies (e.g., [(1, "nsubj", 0)])
:type dependencies: List[Tuple[int, str, int]]
:param skeleton_dependencies: the corresponding dependencies without optional edges (e.g., [(1, "nsubj", 0)])
:type skeleton_dependencies: List[Tuple[int, str, int]]
:param parsed_result: the parsed result of a sentence
:type parsed_result: Dict[str, object]
"""
super().__init__()
self.eid = None
self.pattern = pattern
self._dependencies = None
self.words = None
self.pos_tags = None
self._ners = None
self._mentions = None
self._skeleton_dependency_indices = None
self._skeleton_indices = None
self._verb_indices = None
self.raw_sent_mapping = None
self._phrase_segment_indices = None
self.frequency = 1.0
if dependencies and skeleton_dependencies and parsed_result:
self._construct(dependencies, skeleton_dependencies, parsed_result, use_lemma)
[docs] @staticmethod
def generate_eid(eventuality):
""" Generate the eid to an eventuality
:param eventuality: the given eventuality
:type eventuality: aser.eventuality.Eventuality
:return: the unique eid to the eventuality
:rtype: str
"""
msg = json.dumps([eventuality.dependencies, eventuality.words, eventuality.pos_tags])
return hashlib.sha1(msg.encode('utf-8')).hexdigest()
[docs] def update(self, x):
""" Update the eventuality ('s frequency)
:param x: the given frequency or eventuality
:type x: Union[float, aser.eventuality.Eventuality]
:return: the updated eventuality
:rtype: aser.eventuality.Eventuality
"""
if x is not None:
if isinstance(x, float):
self.frequency += x
elif isinstance(x, Eventuality):
if x.eid == self.eid:
if self._ners is not None and x._ners is not None:
for i, (ner, x_ner) in enumerate(zip(self._ners, x._ners)):
if isinstance(ner, str) and isinstance(x_ner, str) and ner == x_ner:
continue
if isinstance(ner, str):
self._ners[i] = Counter({ner: self.frequency})
if isinstance(x_ner, str):
x_ner = Counter({x_ner: x.frequency})
self._ners[i].update(x_ner)
if self._mentions is not None and x._mentions is not None:
for s_t, x_mention in x._mentions.items():
self._mentions[s_t] = x_mention
self.frequency += x.frequency
else:
raise ValueError("Error: the input of Eventuality.update is invalid.")
return self
def __len__(self):
return len(self.words)
def __str__(self):
repr_dict = {
"eid": self.eid,
"pattern": self.pattern,
"verbs": self.verbs,
"skeleton_words": self.skeleton_words,
"words": self.words,
"skeleton_dependencies": self.skeleton_dependencies,
"dependencies": self.dependencies,
"pos_tags": self.pos_tags,
}
if self._ners is not None:
repr_dict["ners"] = self._ners
if self._mentions is not None:
repr_dict["mentions"] = self._mentions
repr_dict["frequency"] = self.frequency
return pprint.pformat(repr_dict)
def __repr__(self):
return " ".join(self.words)
@property
def dependencies(self):
return self._render_dependencies(self._dependencies)
@property
def ners(self):
if self._ners is None:
return None
return [self._get_ner(idx) for idx in range(len(self._ners))]
@property
def mentions(self):
if self._ners is None:
return None
_mentions = list()
len_words = len(self.words)
i = 0
while i < len_words:
ner = self._get_ner(i)
if ner == "O":
i += 1
continue
j = i + 1
while j < len_words and self._get_ner(j) == ner:
j += 1
mention = self._mentions.get(
(i, j), {
"start": i,
"end": j,
"text": self.words[i:j],
"ner": ner,
"link": None,
"entity": None
}
)
_mentions.append(mention)
i = j
return _mentions
@property
def _raw_dependencies(self):
if self.raw_sent_mapping is None:
return self._dependencies
new_dependencies = list()
for governor, dep, dependent in self._dependencies:
new_dependencies.append((self.raw_sent_mapping[governor], dep, self.raw_sent_mapping[dependent]))
return new_dependencies
@property
def raw_dependencies(self):
if self.raw_sent_mapping is None:
return self.dependencies
tmp_dependencies = self.dependencies
new_dependencies = list()
for governor, dep, dependent in tmp_dependencies:
g_pos, g_word, g_tag = governor
d_pos, d_word, d_tag = dependent
new_dependencies.append(
((self.raw_sent_mapping[g_pos], g_word, g_tag), dep, (self.raw_sent_mapping[d_pos], d_word, d_tag))
)
return new_dependencies
@property
def skeleton_dependencies(self):
dependencies = [self._dependencies[i] for i in self._skeleton_dependency_indices]
return self._render_dependencies(dependencies)
@property
def skeleton_words(self):
return [self.words[idx] for idx in self._skeleton_indices]
@property
def skeleton_pos_tags(self):
return [self.pos_tags[idx] for idx in self._skeleton_indices]
@property
def skeleton_ners(self):
if self._ners is None:
return None
return [self._get_ner(idx) for idx in self._skeleton_indices]
@property
def verbs(self):
return [self.words[idx] for idx in self._verb_indices]
@property
def position(self):
"""
:return: this property returns average position of eventuality in a sentence.
this property only make sense when this eventuality are constructed while
extraction, instead of recovered from database.
"""
positions = set()
for governor, _, dependent in self._dependencies:
positions.add(self.raw_sent_mapping[governor])
positions.add(self.raw_sent_mapping[dependent])
avg_position = sum(positions) / len(positions) if positions else 0.0
return avg_position
@property
def phrases(self):
_tmp_phrases = list()
for _range in self._phrase_segment_indices:
st = min(_range)
end = max(_range) + 1
_tmp_phrases.append(" ".join(self.words[st:end]))
return _tmp_phrases
@property
def phrases_ners(self):
_phrases_ners = list()
for _range in self._phrase_segment_indices:
_phrases_ners.append(self._get_ner(_range[0]))
return _phrases_ners
@property
def phrases_postags(self):
_phrases_postags = list()
for _range in self._phrase_segment_indices:
_phrases_postags.append(self.pos_tags[_range[0]])
return _phrases_postags
@property
def skeleton_phrases(self):
phrase_dict = dict()
for i, _range in enumerate(self._phrase_segment_indices):
for j in _range:
phrase_dict[j] = i
used_indices = set()
_skeleton_phrases = list()
for idx in self._skeleton_indices:
if idx in used_indices:
continue
used_indices.add(idx)
_range = self._phrase_segment_indices[phrase_dict[idx]]
st = min(_range)
end = max(_range) + 1
_skeleton_phrases.append(" ".join(self.words[st:end]))
return _skeleton_phrases
@property
def skeleton_phrases_ners(self):
if self._ners is None:
return None
phrase_dict = dict()
for i, _range in enumerate(self._phrase_segment_indices):
for j in _range:
phrase_dict[j] = i
used_indices = set()
_skeleton_phrases_ners = list()
for idx in self._skeleton_indices:
if idx in used_indices:
continue
used_indices.add(idx)
_range = self._phrase_segment_indices[phrase_dict[idx]]
_skeleton_phrases_ners.append(self._get_ner(_range[0]))
return _skeleton_phrases_ners
@property
def skeleton_phrases_postags(self):
phrase_dict = dict()
for i, _range in enumerate(self._phrase_segment_indices):
for j in _range:
phrase_dict[j] = i
used_indices = set()
_skeleton_phrases_postags = list()
for idx in self._skeleton_indices:
if idx in used_indices:
continue
used_indices.add(idx)
_range = self._phrase_segment_indices[phrase_dict[idx]]
_skeleton_phrases_postags.append(self.pos_tags[_range[0]])
return _skeleton_phrases_postags
def _construct(self, dependencies, skeleton_dependencies, parsed_result, use_lemma):
word_indices = Eventuality.extract_indices_from_dependencies(dependencies)
if parsed_result["pos_tags"][word_indices[0]] == "IN":
poped_idx = word_indices[0]
for i in range(len(dependencies) - 1, -1, -1):
if dependencies[i][0] == poped_idx or \
dependencies[i][2] == poped_idx:
dependencies.pop(i)
for i in range(len(skeleton_dependencies) - 1, -1, -1):
if skeleton_dependencies[i][0] == poped_idx or \
skeleton_dependencies[i][2] == poped_idx:
skeleton_dependencies.pop(i)
word_indices.pop(0)
len_words = len(word_indices)
if use_lemma:
self.words = [parsed_result["lemmas"][i].lower() for i in word_indices]
else:
self.words = [parsed_result["tokens"][i].lower() for i in word_indices]
self.pos_tags = [parsed_result["pos_tags"][i] for i in word_indices]
if "ners" in parsed_result:
self._ners = [parsed_result["ners"][i] for i in word_indices]
if "mentions" in parsed_result:
self._mentions = dict()
for mention in parsed_result["mentions"]:
start_idx = bisect.bisect_left(word_indices, mention["start"])
if not (start_idx < len_words and word_indices[start_idx] == mention["start"]):
continue
end_idx = bisect.bisect_left(word_indices, mention["end"] - 1)
if not (end_idx < len_words and word_indices[end_idx] == mention["end"] - 1):
continue
mention = copy(mention)
mention["start"] = start_idx
mention["end"] = end_idx + 1
mention["text"] = " ".join(self.words[mention["start"]:mention["end"]])
self._mentions[(mention["start"], mention["end"])] = mention
dependencies, raw2reset_idx, reset2raw_idx = Eventuality.sort_dependencies_position(
dependencies, reset_position=True
)
self._dependencies = dependencies
self.raw_sent_mapping = reset2raw_idx
skeleton_word_indices = Eventuality.extract_indices_from_dependencies(skeleton_dependencies)
self._skeleton_indices = [raw2reset_idx[idx] for idx in skeleton_word_indices]
_skeleton_dependencies, _, _ = Eventuality.sort_dependencies_position(
skeleton_dependencies, reset_position=False
)
skeleton_dependency_indices = list()
ptr = 0
for i, dep in enumerate(dependencies):
if ptr >= len(_skeleton_dependencies):
break
skeleton_dep = _skeleton_dependencies[ptr]
skeleton_dep = (raw2reset_idx[skeleton_dep[0]], skeleton_dep[1], raw2reset_idx[skeleton_dep[2]])
if dep == skeleton_dep:
skeleton_dependency_indices.append(i)
ptr += 1
self._skeleton_dependency_indices = skeleton_dependency_indices
self._verb_indices = [i for i, tag in enumerate(self.pos_tags) if tag.startswith('VB')]
self._phrase_segment_indices = self._phrase_segment()
self.eid = Eventuality.generate_eid(self)
[docs] def to_dict(self, **kw):
""" Convert an eventuality to a dictionary
:param kw: other parameters
:type kw: Dict[str, object]
:return: the converted dictionary that contains necessary information
:rtype: Dict[str, object]
"""
minimum = kw.get("minimum", False)
if minimum:
d = {
"_dependencies": self._dependencies,
"words": self.words,
"pos_tags": self.pos_tags,
"_ners": self._ners,
"_mentions": {str(k): v
for k, v in self._mentions.items()}, # key cannot be tuple
"_verb_indices": self._verb_indices,
"_skeleton_indices": self._skeleton_indices,
"_skeleton_dependency_indices": self._skeleton_dependency_indices
}
else:
d = dict(self.__dict__) # shadow copy
d["_mentions"] = {str(k): v for k, v in d["_mentions"].items()} # key cannot be tuple
return d
[docs] def decode(self, msg, encoding="utf-8", **kw):
""" Decode the eventuality
:param msg: the encoded bytes
:type msg: bytes
:param encoding: the encoding format
:type encoding: str (default = "utf-8")
:param kw: other parameters
:type kw: Dict[str, object]
:return: the decoded bytes
:rtype: bytes
"""
if encoding == "utf-8":
decoded_dict = json.loads(msg.decode("utf-8"))
elif encoding == "ascii":
decoded_dict = json.loads(msg.decode("ascii"))
else:
decoded_dict = msg
self.from_dict(decoded_dict, **kw)
if self.raw_sent_mapping is not None:
keys = list(self.raw_sent_mapping.keys())
for key in keys:
if not isinstance(key, int):
self.raw_sent_mapping[int(key)] = self.raw_sent_mapping.pop(key)
if self._ners is not None:
for i, ner in enumerate(self._ners):
if not isinstance(ner, str):
self._ners[i] = Counter(ner)
if self._mentions is not None:
keys = list(self._mentions.keys())
for key in keys:
self._mentions[eval(key)] = self._mentions.pop(key)
self._phrase_segment_indices = self._phrase_segment()
return self
def _render_dependencies(self, dependencies):
edges = list()
for governor_idx, dep, dependent_idx in dependencies:
edge = (
(governor_idx, self.words[governor_idx], self.pos_tags[governor_idx]), dep,
(dependent_idx, self.words[dependent_idx], self.pos_tags[dependent_idx])
)
edges.append(edge)
return edges
def _get_ner(self, index):
ner = "O"
if not self.pos_tags[index].startswith('VB'):
if isinstance(self._ners[index], str):
ner = self._ners[index]
else:
for x in self._ners[index].most_common():
if x[0] != "O":
ner = x[0]
break
return ner
def _dep_compound_segment(self):
tmp_compound_tuples = list()
for governor_idx, dep, dependent_idx in self._dependencies:
if dep.startswith("compound"):
tmp_compound_tuples.append((governor_idx, dependent_idx))
tmp_compound_tuples = sorted(tmp_compound_tuples)
compound_tuples = list()
used_indices = set()
for i in range(len(tmp_compound_tuples)):
if i in used_indices:
continue
s1 = tmp_compound_tuples[i]
for j in range(i+1, len(tmp_compound_tuples)):
if j in used_indices:
continue
s2 = tmp_compound_tuples[j]
# s1[0] is the governor
if s2[0] in set(s1[1:]):
s1 = s1 + s2[1:]
used_indices.add(j)
# s2[0] is the governor
elif s1[0] in set(s2[1:]):
s1 = s2 + s1[1:]
used_indices.add(j)
# s1[0] and s2[0] are same
elif s1[0] == s2[0]:
s1 = s1 + s2[1:]
used_indices.add(j)
else:
break
used_indices.add(i)
# check continuous spans
sorted_s1 = sorted(s1)
if sorted_s1[-1] - sorted_s1[0] == len(sorted_s1) - 1:
compound_tuples.append(s1)
else:
s1s = []
k1 = 0
k2 = 1
len_s1 = len(sorted_s1)
indices = dict(zip(s1, range(len_s1)))
while k2 < len_s1:
if sorted_s1[k2-1] + 1 != sorted_s1[k2]:
# k1 to k2-1
s1s.append(tuple([s1[indices[sorted_s1[k]]] for k in range(k1, k2)]))
k1 = k2
k2 += 1
if k1 != k2:
s1s.append(tuple([s1[indices[sorted_s1[k]]] for k in range(k1, k2)]))
compound_tuples.extend(s1s)
compound_tuples.sort()
used_indices = set(chain.from_iterable(compound_tuples))
segment_rst = list()
word_idx = 0
compound_idx = 0
num_words = len(self.words)
num_tuples = len(compound_tuples)
while word_idx < num_words:
if word_idx not in used_indices:
segment_rst.append((word_idx,))
elif word_idx in used_indices and compound_idx < num_tuples and word_idx == compound_tuples[compound_idx][0]:
segment_rst.append(compound_tuples[compound_idx])
compound_idx += 1
word_idx += 1
return segment_rst
def _ner_compound_segment(self):
if self._ners is None:
return [(i, i + 1) for i in range(len(self.words))]
compound_tuples = list()
for idx in self._skeleton_indices:
ner = self._get_ner(idx)
if ner != "O":
st = idx - 1
while st >= 0 and self._get_ner(st) == ner:
st -= 1
end = idx + 1
post_tokens = list()
while end < len(self.words) and self._get_ner(end) == ner:
post_tokens.append(self.words[end])
end += 1
compound_tuples.append((st, end))
segment_rst = list()
ptr = 0
i = 0
while True:
if i >= len(self.words):
break
if ptr < len(compound_tuples) and i == compound_tuples[ptr][0]:
segment_rst.append(tuple(range(compound_tuples[ptr][0], compound_tuples[ptr][-1])))
i = compound_tuples[ptr][-1]
ptr += 1
else:
segment_rst.append((i,))
i += 1
return segment_rst
def _phrase_segment(self):
return self._dep_compound_segment()
[docs] @staticmethod
def sort_dependencies_position(dependencies, reset_position=True):
""" Fix absolute positions into relevant positions and sort them
:param dependencies: the input dependencies
:type dependencies: List[Tuple[int, str, int]]
:param reset_position: whether to reset positions
:type reset_position: bool (default = True)
:return: the new dependencies, the position mapping, and the inversed mapping
:rtype: Tuple[List[Tuple[int, str, int], Union[Dict[int, int], None], Union[Dict[int, int], None]]
.. highlight:: python
.. code-block:: python
Input:
[(8, "cop", 7), (8, "nsubj", 6)]
Output:
[(2, 'nsubj', 0), (2, 'cop', 1)], {6: 0, 7: 1, 8: 2}, {0: 6, 1: 7, 2: 8}
"""
tmp_dependencies = set()
for triplet in dependencies:
tmp_dependencies.add(tuple(triplet))
new_dependencies = list()
if reset_position:
positions = set()
for governor, _, dependent in tmp_dependencies:
positions.add(governor)
positions.add(dependent)
positions = sorted(positions)
position_map = dict(zip(positions, range(len(positions))))
for governor, dep, dependent in tmp_dependencies:
new_dependencies.append((position_map[governor], dep, position_map[dependent]))
new_dependencies.sort(key=lambda x: (x[0], x[2]))
return new_dependencies, position_map, {val: key for key, val in position_map.items()}
else:
new_dependencies = list([t for t in sorted(tmp_dependencies, key=lambda x: (x[0], x[2]))])
return new_dependencies, None, None