You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

master.py 3.5 kB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: UTF-8 -*-
  2. """
  3. Copyright 2020 Tianshu AI Platform. All Rights Reserved.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. =============================================================
  14. """
  15. import sys
  16. sys.path.append('../service_utils')
  17. from pathlib import Path
  18. from utils.redis_utils import RedisInstance
  19. from tbparser.log_parser import LogParser
  20. from threading import Thread
  21. import time
  22. import json
  23. def response(stateId, code, msg):
  24. # 通过redis消息队列通知当前状态
  25. s = json.dumps({'code': code, 'msg': msg})
  26. RedisInstance.lpush(stateId, s)
  27. class Master:
  28. fileParsers = {}
  29. def __init__(self):
  30. RedisInstance.flushdb()
  31. def set_parser(self, uid, log_dir, cache_dir):
  32. # 日志路径不存在
  33. if not Path(log_dir).exists():
  34. return response(stateId='parser_statu' + uid,
  35. code = 500,
  36. msg = 'User does not exist or log path not found error: {}'.format(log_dir))
  37. # 若当前任务已经解析,则跳过
  38. if uid in self.fileParsers.keys():
  39. response(stateId='parser_statu' + uid,
  40. code=200,
  41. msg="User {} has already started".format(uid))
  42. else:
  43. response(stateId='parser_statu' + uid,
  44. code=200,
  45. msg='({}) starts successfully'.format(uid))
  46. parser = LogParser(uid, log_dir, cache_dir)
  47. parser.start()
  48. self.fileParsers[uid] = parser
  49. response(stateId='parser_statu' + uid,
  50. code=200,
  51. msg='({}) is finished'.format(uid))
  52. def kill_parser(self, uid):
  53. if uid in self.fileParsers.keys():
  54. parser = self.fileParsers.pop(uid)
  55. if parser.alive:
  56. parser.close()
  57. def run_server(self):
  58. while True:
  59. _, request = RedisInstance.brpop('sessions') #取出django的通知消息
  60. request = json.loads(request)
  61. if request['type'] == 'run':
  62. self.set_parser(uid = request['uid'],
  63. log_dir = request['logdir'],
  64. cache_dir = request['cachedir'])
  65. elif request['type'] == 'kill':
  66. self.kill_parser(uid=request['uid'])
  67. else:
  68. print('Unrecognized request')
  69. def run():
  70. Master().run_server()
  71. def cleanup(signum=None, frame=None):
  72. # 正常退出,触发每个parser的线程回收函数cleanup,清空所有的cache文件
  73. for parser in Master.fileParsers.values():
  74. parser.close()
  75. print('closing master ...')
  76. sys.exit()
  77. if __name__ == '__main__':
  78. import signal
  79. # 为响应信号绑定触发函数
  80. signal.signal(signal.SIGINT, cleanup) # ctrl + c 退出
  81. signal.signal(signal.SIGTERM, cleanup) # kill pids 退出
  82. print("Master running...")
  83. p = Thread(target=run, daemon=True)
  84. p.start()
  85. while True:
  86. time.sleep(100)

一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库、视觉模型炼知平台、数据可视化分析平台等一系列平台及工具,在模型高效分布式训练、数据处理和可视分析、模型炼知和轻量化等技术上形成独特优势,目前已在产学研等各领域近千家单位及个人提供AI应用赋能