|
- #!/usr/bin/env python
- # encoding: utf-8
- import os
- import pickle
- from math import log
- from collections import defaultdict
-
-
- def add_curr_dir(name):
- return os.path.join(os.path.dirname(__file__), name)
-
-
- class Trie(object):
- def __init__(self):
- self.root = {}
- self.value = "value"
- self.trie_file_path = os.path.join(os.path.dirname(__file__), "data/Trie.pkl")
-
- def get_matches(self, word):
- ret = []
- node = self.root
- for c in word:
- if c not in node:
- break
- node = node[c]
- if "value" in node:
- ret.append(node["value"])
- return ret
-
- def load(self):
- with open(self.trie_file_path, "rb") as f:
- data = pickle.load(f)
- self.root = data
-
-
- class Chunk:
- def __init__(self, words_list, chrs):
- # self.sentence_sep = ['?', '!', ';', '?', '!', '。', ';', '……', '…', ",", ",", "."]
- self.best_word = words_list[0]
- self.words_num = len(words_list)
- self.length = 0
- self.entropy = 0
- length_square = 0
-
- for word in words_list:
- word_length = len(word)
- self.length += word_length
- self.entropy += log(chrs.get(word, 1))
- length_square += word_length * word_length
-
- self.mean = self.length / self.words_num
- self.var = length_square / self.words_num - (self.length / self.words_num) * (self.length / self.words_num)
-
- def __lt__(self, other):
- return (self.length, self.mean, -self.var, self.entropy) < \
- (other.length, other.mean, -other.var, other.entropy)
-
-
- class MMSeg:
- def __init__(self):
- # 加载词语字典
- trie = Trie()
- trie.load()
- self.words_dic = trie
- # 加载字频字典
- self.chrs_dic = self._load_freq(filename="data/chars.dic")
-
- def _load_freq(self, filename):
- chrs_dic = defaultdict()
- with open(add_curr_dir(filename), "r", encoding="utf-8") as f:
- for line in f:
- if line:
- key, value = line.strip().split(" ")
- chrs_dic.setdefault(key, int(value))
- return chrs_dic
-
- def __get_start_words(self, sentence):
- if sentence:
- match_words = self.words_dic.get_matches(sentence)
- return match_words if match_words else [sentence[0]]
- else:
- return False
-
- def __get_chunks(self, sentence):
- # 获取chunk,每个chunk中最多三个词
- first_match_words = self.__get_start_words(sentence)
-
- for word_one in first_match_words:
- word_one_length = len(word_one)
- second_match_words = self.__get_start_words(sentence[word_one_length:])
- if second_match_words:
- for word_two in second_match_words:
- word_two_length = len(word_two) + word_one_length
- third_match_words = self.__get_start_words(sentence[word_two_length:])
- if third_match_words:
- for word_three in third_match_words:
- yield (Chunk([word_one, word_two, word_three], self.chrs_dic))
- else:
- yield (Chunk([word_one, word_two], self.chrs_dic))
- else:
- yield (Chunk([word_one], self.chrs_dic))
-
- def cws(self, sentence):
- """
- :param sentence: 输入的数据
- :return: 返回的分词生成器
- """
- while sentence:
- chunks = self.__get_chunks(sentence)
- word = max(chunks).best_word
- sentence = sentence[len(word):]
- yield word
-
-
- if __name__ == "__main__":
- mmseg = MMSeg()
- print(list(mmseg.cws("武汉市长江大桥上的日落非常好看,很喜欢看日出日落。")))
- print(list(mmseg.cws("人要是行干一行行一行.")))
|