diff --git a/smoothnlp/algorithm/phrase/ngram_utils.py b/smoothnlp/algorithm/phrase/ngram_utils.py index 0c33b32..fb361ce 100644 --- a/smoothnlp/algorithm/phrase/ngram_utils.py +++ b/smoothnlp/algorithm/phrase/ngram_utils.py @@ -1,6 +1,7 @@ import re import types -from multiprocessing import cpu_count,Pool +from multiprocessing import Process +import multiprocessing import math from collections.abc import Iterable from collections import Counter @@ -23,8 +24,8 @@ def union_word_freq(dic1,dic2): keys = (dic1.keys()) | (dic2.keys()) total = {} for key in keys: - total[key] = dic1.get(key, 0) + dic2.get(key, 0) - return total + dic2.update({key: dic1.get(key, 0) + dic2.get(key, 0)}) + return dic2 def sentence_split_by_punc(corpus:str): return re.split(r'[;;.。,,!\n!??]',corpus) @@ -50,7 +51,21 @@ def generate_ngram_str(text:str,n): for text in corpus: for ngram in generate_ngram_str(text,n): yield ngram - +#进程函数 ngram_freq_total是进程之间的共享变量 +def compute_freq(corpus_chunk,lock,ngram_freq_total,min_n,max_n,ngram_keys,min_freq): + # def _process_corpus_chunk(corpus_chunk): + ngram_freq = {} + for ni in [1]+list(range(min_n,max_n+2)): + ngram_generator = generate_ngram(corpus_chunk, ni) + nigram_freq = dict(Counter(ngram_generator)) + lock.acquire() + ngram_keys.update({ni:(ngram_keys.get(ni) | nigram_freq.keys())}) + lock.release() + ngram_freq = {**nigram_freq, **ngram_freq} + ngram_freq = {word: count for word, count in ngram_freq.items() if count >= min_freq} ## 每个chunk的ngram频率统计 + lock.acquire() + ngram_freq_total = union_word_freq(ngram_freq, ngram_freq_total) + lock.release() def get_ngram_freq_info(corpus, ## list or generator min_n:int=2, max_n:int=4, @@ -58,7 +73,6 @@ def get_ngram_freq_info(corpus, ## list or generator min_freq:int=2, ): """ - :param corpus: 接受list或者generator 如果corpus是generator, 默认该generator每次yield一段长度为chunk_size的corpus_chunk :param max_n: @@ -66,9 +80,20 @@ def get_ngram_freq_info(corpus, ## list or generator :param min_freq: :return: """ - ngram_freq_total = {} ## 记录词频 - ngram_keys = {i: set() for i in range(1, max_n + 2)} ## 用来存储N=时, 都有哪些词 - + #子进程队列 方便join主进程 等待ngram_freq_total和ngram_keys计算完成 + process_queue=[] + manager = multiprocessing.Manager() + #声明进程之间的共享变量 + ngram_freq_total = manager.dict() + # 声明进程之间的共享变量 + ngram_keys = manager.dict({i: set() for i in range(1, max_n + 2)}) + lock=multiprocessing.Lock() + #多进程并发计算ngram_freq_total和ngram_keys + def join_process(corpus, lock, ngrame_freq_total, min_n, max_n, ngram_keys): + for corpus_chunk in corpus: + p = Process(target=compute_freq, args=(corpus_chunk, lock, ngrame_freq_total, min_n, max_n, ngram_keys,min_freq)) + p.start() + process_queue.append(p) def _process_corpus_chunk(corpus_chunk): ngram_freq = {} for ni in [1]+list(range(min_n,max_n+2)): @@ -78,12 +103,10 @@ def _process_corpus_chunk(corpus_chunk): ngram_freq = {**nigram_freq, **ngram_freq} ngram_freq = {word: count for word, count in ngram_freq.items() if count >= min_freq} ## 每个chunk的ngram频率统计 return ngram_freq - if isinstance(corpus,types.GeneratorType): - ## 注意: 如果corpus是generator, 该function对chunk_size无感知 - for corpus_chunk in corpus: - ngram_freq = _process_corpus_chunk(corpus_chunk) - ngram_freq_total = union_word_freq(ngram_freq, ngram_freq_total) + join_process(corpus, lock, ngram_freq_total, min_n, max_n, ngram_keys) + for per_pro in process_queue: + per_pro.join() elif isinstance(corpus,list): len_corpus = len(corpus) for i in range(0,len_corpus,chunk_size): @@ -91,9 +114,8 @@ def _process_corpus_chunk(corpus_chunk): ngram_freq = _process_corpus_chunk(corpus_chunk) ngram_freq_total = union_word_freq(ngram_freq,ngram_freq_total) for k in ngram_keys: - ngram_keys[k] = ngram_keys[k] & ngram_freq_total.keys() + ngram_keys[k] = ngram_keys[k] & set(ngram_freq_total.keys()) return ngram_freq_total,ngram_keys - def _ngram_entropy_scorer(parent_ngrams_freq): """ 根据一个candidate的neighbor的出现频率, 计算Entropy具体值 @@ -120,20 +142,16 @@ def _calc_ngram_entropy(ngram_freq, for ni in n: entropy = {**entropy,**_calc_ngram_entropy(ngram_freq,ngram_keys,ni)} return entropy - ngram_entropy = {} target_ngrams = ngram_keys[n] parent_candidates = ngram_keys[n+1] - if CPU_COUNT == 1: ## 对 n+1 gram 进行建Trie处理 left_neighbors = Trie() right_neighbors = Trie() - for parent_candidate in parent_candidates: right_neighbors[parent_candidate] = ngram_freq[parent_candidate] left_neighbors[parent_candidate[1:]+parent_candidate[0]] = ngram_freq[parent_candidate] - ## 计算 for target_ngram in target_ngrams: try: ## 一定情况下, 一个candidate ngram 没有左右neighbor @@ -177,8 +195,6 @@ def _calc_ngram_pmi(ngram_freq,ngram_keys,n): ami = pmi/len(target_ngram) #average mutual information mi[target_ngram] = (pmi,ami) return mi - - def get_scores(corpus, min_n:int = 2, max_n: int = 4, @@ -196,7 +212,6 @@ def get_scores(corpus, chunk_size=chunk_size, min_freq=min_freq) - left_right_entropy = _calc_ngram_entropy(ngram_freq,ngram_keys,range(min_n,max_n+1)) mi = _calc_ngram_pmi(ngram_freq,ngram_keys,range(min_n,max_n+1)) joint_phrase = mi.keys() & left_right_entropy.keys() @@ -209,7 +224,6 @@ def get_scores(corpus, word_liberalization(left_right_entropy[word][0],left_right_entropy[word][1])+mi[word][1] #our score ) for word in joint_phrase} - ## DONE 对在candidate ngram中, 首字或者尾字出现次数特别多的进行筛选, 如"XX的,美丽的,漂亮的"剔出字典 target_ngrams = word_info_scores.keys() start_chars = Counter([n[0] for n in target_ngrams]) @@ -219,10 +233,7 @@ def get_scores(corpus, config.logger.info("~~~ Threshold used for removing start end char: {} ~~~~".format(threshold)) invalid_start_chars = set([char for char, count in start_chars.items() if count > threshold]) invalid_end_chars = set([char for char, count in end_chars.items() if count > threshold]) - invalid_target_ngrams = set([n for n in target_ngrams if (n[0] in invalid_start_chars or n[-1] in invalid_end_chars)]) - for n in invalid_target_ngrams: ## 按照不合适的字头字尾信息删除一些 word_info_scores.pop(n) - return word_info_scores