import hashlib
import pprint
import time
import pickle
from tqdm import tqdm
from collections import Counter
from .object import JsonSerializedObject
[docs]class ASERConcept(JsonSerializedObject):
""" ASER Conceptualied Eventuality
"""
def __init__(self, words=None, instances=None):
"""
:param words: the word list of a concept
:type words: List[str]
:param instances: a list of (eid, pattern, score)
:type instances: List[Tuple[str, str, float]]
"""
super().__init__()
self.words = words if words else ""
self.instances = instances if instances else []
self.cid = ASERConcept.generate_cid(self.__str__())
[docs] @staticmethod
def generate_cid(concept_str):
""" Generate the cid to a concept
:param concept_str: concept representation (words connected by " ")
:type concept_str: List[str]
:return: the corresponding unique cid
:rtype: str
"""
return hashlib.sha1(concept_str.encode('utf-8')).hexdigest()
@property
def pattern(self):
if len(self.instances) > 0:
cnter = Counter([t[1] for t in self.instances])
return cnter.most_common(1)[0][0]
else:
return ""
def __str__(self):
return " ".join(self.words)
def __repr__(self):
return " ".join(self.words)
[docs] def instantiate(self, kg_conn=None):
""" Retrieve the instances that are associated with this concept
:param kg_conn: an KG connection to ASER
:type kg_conn: aser.database.kg_connection.ASERKGConnection
:return: a list of (eid, pattern, score)
:rtype: List[Tuple[str, str, float]]
"""
if kg_conn:
eventualities = kg_conn.get_exact_match_eventualities(
[t[0] for t in self.instances])
return eventualities
else:
return self.instances
[docs]class ASERConceptInstancePair(JsonSerializedObject):
def __init__(self, cid="", eid="", pattern="unknown", score=0.0):
"""
:param cid: the unique cid to the conceptualized eventuality
:type cid: str
:param eid: the unique eid to the eventuality
:type eid: str
:param pattern: the corresponding pattern
:type pattern: str
:param score: the conceptualization probability
:type score: float
"""
super().__init__()
self.cid = cid
self.eid = eid
self.pattern = pattern
self.score = score
self.pid = ASERConceptInstancePair.generate_pid(cid, eid)
[docs] @staticmethod
def generate_pid(cid, eid):
""" Generate the pid to a pair
:param cid: the unique cid to the conceptualized eventuality
:type cid: str
:param eid: the unique eid to the eventuality
:type eid: str
:return: the unique pid to the pair
:rtype: str
"""
key = cid + "$" + eid
return hashlib.sha1(key.encode('utf-8')).hexdigest()
def __str__(self):
repr_dict = {
"pid": self.pid,
"cid": self.cid,
"eid": self.eid,
"pattern": self.pattern,
"score": self.score
}
return pprint.pformat(repr_dict)
def __repr__(self):
return self.__str__()
[docs]class ProbaseConcept(object):
""" Copied from https://github.com/ScarletPan/probase-concept
"""
def __init__(self, data_concept_path=""):
"""
:param data_concept_path: Probase .txt file path
:type data_concept_path: str
"""
self.concept2idx = dict()
self.idx2concept = dict()
self.concept_inverted_list = dict()
self.instance2idx = dict()
self.idx2instance = dict()
self.instance_inverted_list = dict()
if data_concept_path:
self._load_raw_data(data_concept_path)
def _load_raw_data(self, data_concept_path):
st = time.time()
print("[probase-conceptualize] Loading Probase files...")
with open(data_concept_path) as f:
triplet_lines = [line.strip() for line in f]
print("[probase-conceptualize] Building index...")
for line in tqdm(triplet_lines):
concept, instance, freq = line.split('\t')
if concept not in self.concept2idx:
self.concept2idx[concept] = len(self.concept2idx)
concept_idx = self.concept2idx[concept]
if instance not in self.instance2idx:
self.instance2idx[instance] = len(self.instance2idx)
instance_idx = self.instance2idx[instance]
if concept_idx not in self.concept_inverted_list:
self.concept_inverted_list[concept_idx] = list()
self.concept_inverted_list[concept_idx].append((instance_idx, int(freq)))
if instance_idx not in self.instance_inverted_list:
self.instance_inverted_list[instance_idx] = list()
self.instance_inverted_list[instance_idx].append((concept_idx, int(freq)))
self.idx2concept = {val: key for key, val in self.concept2idx.items()}
self.idx2instance = {val: key for key, val in self.instance2idx.items()}
print("[probase-conceptualize] Loading data finished in {:.2f} s".format(time.time() - st))
[docs] def conceptualize(self, instance, score_method="likelihood"):
""" Conceptualize the given instance
:param instance: the given instance
:type instance: str
:param score_method: the method to compute sscores ("likelihood" or "pmi")
:type score_method: str
:return: a list of (concept, score)
:rtype: List[Tuple[aser.concept.ProbaseConcept, float]]
"""
if instance not in self.instance2idx:
return []
instance_idx = self.instance2idx[instance]
instance_freq = self.get_instance_freq(instance_idx)
concept_list = self.instance_inverted_list[instance_idx]
rst_list = list()
for concept_idx, co_occurrence in concept_list:
if score_method == "pmi":
score = co_occurrence / self.get_concept_freq(concept_idx) / instance_freq
elif score_method == "likelihood":
score = co_occurrence / instance_freq
else:
raise NotImplementedError
rst_list.append((self.idx2concept[concept_idx], score))
rst_list.sort(key=lambda x: x[1], reverse=True)
return rst_list
[docs] def instantiate(self, concept):
""" Retrieve all instances of a concept
:param concept: the given concept
:type concept: str
:return: a list of instances
:rtype: List[Tuple[str, float]]
"""
if concept not in self.concept2idx:
return []
concept_idx = self.concept2idx[concept]
rst_list = [(self.idx2instance[idx], freq) for idx, freq
in self.concept_inverted_list[concept_idx]]
rst_list.sort(key=lambda x: x[1], reverse=True)
return rst_list
[docs] def get_concept_chain(self, instance, max_chain_length=5):
""" Conceptualize the given instance in a chain
:param instance: the given instance
:type instance: str
:param max_chain_length: the maximum length of the chain
:type max_chain_length: int (default = 5)
:return: a chain that contains concepts
:rtype: List[str]
"""
if instance in self.concept2idx:
chain = [instance]
else:
chain = list()
tmp_instance = instance
while True:
concepts = self.conceptualize(tmp_instance, score_method="likelihood")
if concepts:
chain.append(concepts[0][0])
else:
break
if len(chain) >= max_chain_length:
break
tmp_instance = chain[-1]
if chain and chain[0] != instance:
return [instance] + chain
else:
return chain
[docs] def get_concept_freq(self, concept):
""" Get the frequency of a concept
:param concept: the given concept
:type concept: str
:return: the corresponding frequency
:rtype: float
"""
if isinstance(concept, str):
if concept not in self.concept2idx:
return 0
concept = self.concept2idx[concept]
elif isinstance(concept, int):
if concept not in self.idx2concept:
return 0
return sum([t[1] for t in self.concept_inverted_list[concept]])
[docs] def get_instance_freq(self, instance):
""" Get the frequency of an instance
:param instance: the given instance
:type instance: str
:return: the corresponding frequency
:rtype: float
"""
if isinstance(instance, str):
if instance not in self.instance2idx:
return 0
instance = self.instance2idx[instance]
elif isinstance(instance, int):
if instance not in self.idx2instance:
return 0
return sum([t[1] for t in self.instance_inverted_list[instance]])
[docs] def save(self, file_name):
"""
:param file_name: the file name to save the probase concepts
:type file_name: str
"""
with open(file_name, "wb") as f:
pickle.dump(self.__dict__, f)
[docs] def load(self, file_name):
"""
:param file_name: the file name to load the probase concepts
:type file_name: str
"""
with open(file_name, "rb") as f:
tmp_dict = pickle.load(f)
for key, val in tmp_dict.items():
self.__setattr__(key, val)
@property
def concept_size(self):
return len(self.concept2idx)
@property
def instance_size(self):
return len(self.instance2idx)