# -*- coding: UTF-8 -*- """ Copyright 2020 Tianshu AI Platform. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ============================================================= """ import sys sys.path.append('../service_utils') from pathlib import Path from utils.redis_utils import RedisInstance from tbparser.log_parser import LogParser from threading import Thread import time import json def response(stateId, code, msg): # 通过redis消息队列通知当前状态 s = json.dumps({'code': code, 'msg': msg}) RedisInstance.lpush(stateId, s) class Master: fileParsers = {} def __init__(self): RedisInstance.flushdb() def set_parser(self, uid, log_dir, cache_dir): # 日志路径不存在 if not Path(log_dir).exists(): return response(stateId='parser_statu' + uid, code = 500, msg = 'User does not exist or log path not found error: {}'.format(log_dir)) # 若当前任务已经解析,则跳过 if uid in self.fileParsers.keys(): response(stateId='parser_statu' + uid, code=200, msg="User {} has already started".format(uid)) else: response(stateId='parser_statu' + uid, code=200, msg='({}) starts successfully'.format(uid)) parser = LogParser(uid, log_dir, cache_dir) parser.start() self.fileParsers[uid] = parser response(stateId='parser_statu' + uid, code=200, msg='({}) is finished'.format(uid)) def kill_parser(self, uid): if uid in self.fileParsers.keys(): parser = self.fileParsers.pop(uid) if parser.alive: parser.close() def run_server(self): while True: _, request = RedisInstance.brpop('sessions') #取出django的通知消息 request = json.loads(request) if request['type'] == 'run': self.set_parser(uid = request['uid'], log_dir = request['logdir'], cache_dir = request['cachedir']) elif request['type'] == 'kill': self.kill_parser(uid=request['uid']) else: print('Unrecognized request') def run(): Master().run_server() def cleanup(signum=None, frame=None): # 正常退出,触发每个parser的线程回收函数cleanup,清空所有的cache文件 for parser in Master.fileParsers.values(): parser.close() print('closing master ...') sys.exit() if __name__ == '__main__': import signal # 为响应信号绑定触发函数 signal.signal(signal.SIGINT, cleanup) # ctrl + c 退出 signal.signal(signal.SIGTERM, cleanup) # kill pids 退出 print("Master running...") p = Thread(target=run, daemon=True) p.start() while True: time.sleep(100)