# -*- coding: UTF-8 -*- """ Copyright 2021 Tianshu AI Platform. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ============================================================= """ import time from utils.logfile_utils import get_runinfo from python_io.lazy_load import LazyLoad from multiprocessing import Process from pathlib import Path from shutil import rmtree from python_io.dictionary_watcher import start_run_watcher from utils.cache_io import CacheIO from utils.redis_utils import RedisInstance class ParserWorker(Process): def __init__(self, uid, logdir, cachedir): super(ParserWorker, self).__init__() self.uid = uid self._logdir = logdir self._cachedir = cachedir def run(self): if not Path(self._logdir).exists(): raise FileExistsError("No such dictionary {}".format(self._logdir)) run_dirs = get_runinfo(self._logdir) # 开启监听当前解析的文件夹 start_run_watcher(self.uid, '.', self._logdir, self._cachedir) # 解析日志 print(f'({self._logdir}) starts to parse successfully') start_time = time.time() run_logs = {} for _run, _dir in run_dirs.items(): LazyLoad(self.uid, _run, _dir, run_logs).init_load(self._cachedir, is_init=True) # 检查是否解析完成 assert len(run_logs) == len(run_dirs) while len(run_logs) > 0: runs = list(run_logs.keys()) for run in runs: if len(run_logs[run]) == 0: run_logs.pop(run) if time.time() - start_time >= 30: return else: time.sleep(0.5) class LogParser: def __init__(self, uid, logdir, cachedir): super(LogParser, self).__init__() self.uid = uid self.logdir = logdir self.cachedir = Path(cachedir).absolute() self.r = RedisInstance self.alive = False def start(self): self.alive = True # 记录日志解析后的本地缓存路径 self.r.set(self.uid, str(self.cachedir)) if self.cachedir.exists(): rmtree(self.cachedir) self.worker = ParserWorker(self.uid, self.logdir, self.cachedir) self.worker.run() def close(self): self.alive = False print(f'({self.uid}) : clean up ... ') # 关闭解析进程 if self.worker.is_alive(): self.worker.terminate() # 关闭当前parser已打开的文件io files = list(CacheIO.file_io.keys()) for file in files: if str(self.cachedir) in str(file): CacheIO.file_io[file].close() CacheIO.file_io.pop(file) # 清除redis缓存 for key in self.r.keys(self.uid + '*'): self.r.delete(key) # 清除缓存文件 if self.cachedir.exists(): rmtree(self.cachedir) try: # 尝试删除空的父目录,直至cache根目录 parent_cache = self.cachedir.parent while '__cache__' in str(parent_cache): parent_cache.rmdir() parent_cache = parent_cache.parent except: pass