From ec3a8cab911f7913ff0e4f5eb0df0af570915ef2 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 28 Aug 2025 18:32:23 +0800 Subject: [PATCH 01/21] init refactor, add Scheduler --- .gitignore | 2 + scheduler/app/database/__init__.py | 4 +- scheduler/app/faas_scheduler/models.py | 8 + scheduler/app/flask_server.py | 51 +++--- scheduler/app/scheduler.py | 213 ++++++++++++++++++++----- scheduler/app/timeout_setter.py | 56 +++++++ 6 files changed, 258 insertions(+), 76 deletions(-) create mode 100644 scheduler/app/timeout_setter.py diff --git a/.gitignore b/.gitignore index cb64eaa..701dcbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ local_settings.py seatable-python-runner/ seatable-python-runner.zip + +.python-version diff --git a/scheduler/app/database/__init__.py b/scheduler/app/database/__init__.py index d12cfaa..fd18bb7 100644 --- a/scheduler/app/database/__init__.py +++ b/scheduler/app/database/__init__.py @@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, scoped_session DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root") DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD") @@ -37,4 +37,4 @@ engine = create_engine(db_url, **db_kwargs) Base = declarative_base() -DBSession = sessionmaker(bind=engine) +DBSession = scoped_session(sessionmaker(bind=engine)) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index e2d38cc..14495af 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,6 +31,11 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) + state = Column(String(10)) + + PENDING = 'pending' + RUNNING = 'running' + FINISHED = 'finished' def __init__( self, @@ -40,6 +45,7 @@ def __init__( script_name, context_data, started_at, + state, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -49,6 +55,7 @@ def __init__( self.context_data = context_data self.started_at = started_at self.operate_from = operate_from + self.state = state def to_dict(self): from faas_scheduler.utils import datetime_to_isoformat_timestr @@ -68,6 +75,7 @@ def to_dict(self): "return_code": self.return_code, "output": self.output, "operate_from": self.operate_from, + "state": self.state } diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 77feb16..e6df078 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -8,14 +8,11 @@ from datetime import datetime from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer -from concurrent.futures import ThreadPoolExecutor from database import DBSession from faas_scheduler.utils import ( check_auth_token, - run_script, get_script, - add_script, get_run_script_statistics_by_month, hook_update_script, can_run_task, @@ -26,6 +23,7 @@ uuid_str_to_32_chars, basic_log, ) +from .scheduler import scheduler basic_log("scheduler.log") @@ -40,7 +38,11 @@ app = Flask(__name__) logger = logging.getLogger(__name__) -executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS) + + +@app.teardown_appcontext +def shutdown_session(exception=None): + DBSession.remove() @app.route("/ping/", methods=["GET"]) @@ -74,8 +76,6 @@ def scripts_api(): context_data = data.get("context_data") owner = data.get("owner") org_id = data.get("org_id") - script_url = data.get("script_url") - temp_api_token = data.get("temp_api_token") scripts_running_limit = data.get("scripts_running_limit", -1) operate_from = data.get("operate_from", "manualy") if not dtable_uuid or not script_name or not owner: @@ -89,27 +89,16 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script = add_script( - db_session, - dtable_uuid, - owner, + script_log = scheduler.add_script_log( + uuid_str_to_32_chars(dtable_uuid), org_id, + owner, script_name, context_data, - operate_from, - ) - logger.debug("lets call the starter to fire up the runner...") - executor.submit( - run_script, - script.id, - dtable_uuid, - script_name, - script_url, - temp_api_token, - context_data, + operate_from ) - return make_response(({"script_id": script.id}, 200)) + return make_response(({"script_id": script_log.id}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) @@ -145,15 +134,15 @@ def script_api(script_id): if dtable_uuid != script.dtable_uuid or script_name != script.script_name: return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() + # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + # now = datetime.now() + # duration_seconds = (now - script.started_at).seconds + # if duration_seconds > SUB_PROCESS_TIMEOUT: + # script.success = False + # script.return_code = -1 + # script.finished_at = now + # script.output = TIMEOUT_OUTPUT + # db_session.commit() return make_response(({"script": script.to_dict()}, 200)) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7d6bfe1..008bcfc 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -1,56 +1,183 @@ +import json +import logging import os -import gc import time -import logging -from threading import Thread +from threading import Lock, Thread from database import DBSession +from faas_scheduler.models import ScriptLog from faas_scheduler.utils import ( - check_and_set_tasks_timeout, - delete_log_after_days, - delete_statistics_after_days, - basic_log, + add_script, + run_script, + get_script_file, + hook_update_script ) -basic_log("scheduler.log") +logger = logging.getLogger(__name__) + -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes +class ScriptQueue: -logger = logging.getLogger(__name__) + def __init__(self): + self.q = [] # a list of ScriptLog + self.script_logs_dict = {} # a dict of {id: ScriptLog} + self.lock = Lock() + self.running_count = {} + # a dict of + # { + # "": 0, + # "_": 0, + # "__": 0 + # } + try: + run_limit_per_team = os.environ.get('RUN_LIMIT_PER_TEAM', 0) + except: + run_limit_per_team = 0 + try: + run_limit_per_base = os.environ.get('RUN_LIMIT_PER_BASE', 0) + except: + run_limit_per_base = 0 + try: + run_limit_per_script = os.environ.get('RUN_LIMIT_PER_SCRIPT', 0) + except: + run_limit_per_script = 0 + self.config = { + 'run_limit_per_team': run_limit_per_team, + 'run_limit_per_base': run_limit_per_base, + 'run_limit_per_script': run_limit_per_script + } + + def can_run_script(self, script_log: ScriptLog): + if script_log.org_id != -1: + running_team_key = f'{script_log.org_id}' + else: + running_team_key = f'{script_log.owner}' + running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' + running_script_key = f'{running_base_key}_{script_log.script_name}' + + if self.config['run_limit_per_team'] > 0 and self.config['run_limit_per_team'] <= self.running_count.get(running_team_key, 0): + return False + if self.config['run_limit_per_base'] > 0 and self.config['run_limit_per_base'] <= self.running_count.get(running_base_key, 0): + return False + if self.config['run_limit_per_script'] > 0 and self.config['run_limit_per_script'] <= self.running_count.get(running_script_key, 0): + return False + + return True + + def add_script_log(self, script_log: ScriptLog): + with self.lock: + self.q.append(script_log) + self.script_logs_dict[script_log.id] = script_log + + def get(self): + """get the first valid task from self.q + Return: an instance of ScriptTask or None + """ + with self.lock: + return_task = None -class FAASTaskTimeoutSetter(Thread): + index = 0 + while index < len(self.q): + script_log = self.q[index] + if self.can_run_script(script_log): + return_task = script_log + self.q.pop(index) + break + index += 1 + + return return_task + + def script_done_callback(self, script_log: ScriptLog): + with self.lock: + if script_log.org_id != -1: + running_team_key = f'{script_log.org_id}' + else: + running_team_key = f'{script_log.owner}' + running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' + running_script_key = f'{running_base_key}_{script_log.script_name}' + if running_team_key in self.running_count: + self.running_count[running_team_key] -= 1 + if running_base_key in self.running_count: + self.running_count[running_base_key] -= 1 + if running_script_key in self.running_count: + self.running_count[running_script_key] -= 1 + + +class Scheduelr: def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour - - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() - - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) - - # remove old script_logs and statistics - delete_log_after_days(db_session) - delete_statistics_after_days(db_session) - - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) - - -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() + self.script_queue = ScriptQueue() + + def add_script_log( + self, + dtable_uuid, + org_id, + owner, + script_name, + context_data, + operate_from + ): + script_log = add_script( + DBSession(), + dtable_uuid, + owner, + org_id, + script_name, + context_data, + operate_from + ) + self.script_queue.add_script_log(script_log) + return script_log + + def schedule(self): + while True: + script_log = self.script_queue.get() + if not script_log: + time.sleep(0.5) + try: + script_file_info = get_script_file(script_log.dtable_uuid, script_log.script_name) + run_script( + script_log.script_id, + script_log.dtable_uuid, + script_log.script_name, + script_file_info['script_url'], + script_file_info['temp_api_token'], + json.loads(script_log.context_data) + ) + except Exception as e: + logger.exception(f'run script: {script_log} error {e}') + + def script_done_callback( + self, + script_id, + success, + return_code, + output, + spend_time + ): + script_log = self.script_queue.script_logs_dict.pop(script_id) + hook_update_script( + DBSession(), + script_id, + success, + return_code, + output, + spend_time + ) + if not script_log: # not counted in memory, only update db record + return + + def load_pending_script_logs(self): + """load pending script logs, should be called only when server start + """ + script_logs = DBSession.query(ScriptLog).filter_by(state=ScriptLog.PENDING).order_by(ScriptLog.id) + for script_log in script_logs: + self.script_queue.add_script_log(script_log) + + def start(self): + self.load_pending_script_logs() + Thread(target=self.schedule, daemon=True).start() + + +scheduler = Scheduelr() diff --git a/scheduler/app/timeout_setter.py b/scheduler/app/timeout_setter.py new file mode 100644 index 0000000..7d6bfe1 --- /dev/null +++ b/scheduler/app/timeout_setter.py @@ -0,0 +1,56 @@ +import os +import gc +import time +import logging +from threading import Thread + +from database import DBSession +from faas_scheduler.utils import ( + check_and_set_tasks_timeout, + delete_log_after_days, + delete_statistics_after_days, + basic_log, +) + +basic_log("scheduler.log") + +SUB_PROCESS_TIMEOUT = int( + os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) +) # 15 minutes + +logger = logging.getLogger(__name__) + + +class FAASTaskTimeoutSetter(Thread): + + def __init__(self): + super(FAASTaskTimeoutSetter, self).__init__() + self.interval = 60 * 30 # every half an hour + + def run(self): + if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + while True: + logger.info("Start automatic cleanup ...") + db_session = DBSession() + try: + check_and_set_tasks_timeout(db_session) + except Exception as e: + logger.exception("task cleaner error: %s", e) + finally: + db_session.close() + + # python garbage collection + logger.info("gc.collect: %s", str(gc.collect())) + + # remove old script_logs and statistics + delete_log_after_days(db_session) + delete_statistics_after_days(db_session) + + # sleep + logger.info("Sleep for %d seconds ...", self.interval) + time.sleep(self.interval) + + +if __name__ == "__main__": + task_timeout_setter = FAASTaskTimeoutSetter() + task_timeout_setter.start() From 521135509e59b32115985da50139ffe87caf720b Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 29 Aug 2025 23:19:20 +0800 Subject: [PATCH 02/21] update --- scheduler/app/faas_scheduler/utils.py | 1 + scheduler/app/flask_server.py | 21 +++++++++++---------- scheduler/app/scheduler.py | 9 +++++---- starter/runner.py | 2 +- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index f43af3d..ffe3d24 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -172,6 +172,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): }, "context_data": context_data, "script_id": script_id, + "timeout": int(SUB_PROCESS_TIMEOUT) } headers = {"User-Agent": "python-scheduler/" + VERSION} logger.debug("I call starter at url %s", RUN_FUNC_URL) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index e6df078..c277da6 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -23,7 +23,7 @@ uuid_str_to_32_chars, basic_log, ) -from .scheduler import scheduler +from scheduler import scheduler basic_log("scheduler.log") @@ -134,15 +134,15 @@ def script_api(script_id): if dtable_uuid != script.dtable_uuid or script_name != script.script_name: return make_response(("Bad request", 400)) - # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - # now = datetime.now() - # duration_seconds = (now - script.started_at).seconds - # if duration_seconds > SUB_PROCESS_TIMEOUT: - # script.success = False - # script.return_code = -1 - # script.finished_at = now - # script.output = TIMEOUT_OUTPUT - # db_session.commit() + if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + now = datetime.now() + duration_seconds = (now - script.started_at).seconds + if duration_seconds > SUB_PROCESS_TIMEOUT: + script.success = False + script.return_code = -1 + script.finished_at = now + script.output = TIMEOUT_OUTPUT + db_session.commit() return make_response(({"script": script.to_dict()}, 200)) @@ -375,5 +375,6 @@ def base_run_python_statistics(): if __name__ == "__main__": + scheduler.start() http_server = WSGIServer(("127.0.0.1", 5055), app) http_server.serve_forever() diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 008bcfc..b5940bb 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -30,15 +30,15 @@ def __init__(self): # "__": 0 # } try: - run_limit_per_team = os.environ.get('RUN_LIMIT_PER_TEAM', 0) + run_limit_per_team = int(os.environ.get('RUN_LIMIT_PER_TEAM', 0)) except: run_limit_per_team = 0 try: - run_limit_per_base = os.environ.get('RUN_LIMIT_PER_BASE', 0) + run_limit_per_base = int(os.environ.get('RUN_LIMIT_PER_BASE', 0)) except: run_limit_per_base = 0 try: - run_limit_per_script = os.environ.get('RUN_LIMIT_PER_SCRIPT', 0) + run_limit_per_script = int(os.environ.get('RUN_LIMIT_PER_SCRIPT', 0)) except: run_limit_per_script = 0 self.config = { @@ -135,10 +135,11 @@ def schedule(self): script_log = self.script_queue.get() if not script_log: time.sleep(0.5) + continue try: script_file_info = get_script_file(script_log.dtable_uuid, script_log.script_name) run_script( - script_log.script_id, + script_log.id, script_log.dtable_uuid, script_log.script_name, script_file_info['script_url'], diff --git a/starter/runner.py b/starter/runner.py index e4cb429..9787f85 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -459,4 +459,4 @@ def health_check(): if __name__ == "__main__": - app.run(port=8088, debug=False) + app.run(host='0.0.0.0', port=8088, debug=False) From a7b64f48ebddb60e0aa240fc4219a5beba1489aa Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sat, 30 Aug 2025 16:38:12 +0800 Subject: [PATCH 03/21] add clean old data schedule task --- scheduler/app/faas_scheduler/models.py | 9 ++ scheduler/app/flask_server.py | 16 ++-- scheduler/app/scheduler.py | 125 ++++++++++++++++++++++--- scheduler/app/timeout_setter.py | 56 ----------- 4 files changed, 128 insertions(+), 78 deletions(-) delete mode 100644 scheduler/app/timeout_setter.py diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 14495af..a1924b3 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -57,6 +57,15 @@ def __init__( self.operate_from = operate_from self.state = state + def get_info(self): + return { + 'id': self.id, + 'org_id': self.org_id, + 'owner': self.owner, + 'dtable_uuid': self.dtable_uuid, + 'script_name': self.script_name + } + def to_dict(self): from faas_scheduler.utils import datetime_to_isoformat_timestr diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index c277da6..e13505c 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -131,7 +131,7 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if dtable_uuid != script.dtable_uuid or script_name != script.script_name: + if uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid or script_name != script.script_name: return make_response(("Bad request", 400)) if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): @@ -277,21 +277,19 @@ def record_script_result(): output = data.get("output") spend_time = data.get("spend_time") script_id = data.get("script_id") - - db_session = DBSession() - # update script_log and run-time statistics try: if script_id: - hook_update_script( - db_session, script_id, success, return_code, output, spend_time + # hook_update_script( + # db_session, script_id, success, return_code, output, spend_time + # ) + scheduler.script_done_callback( + script_id, success, return_code, output, spend_time ) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) - finally: - db_session.close() return "success" @@ -376,5 +374,5 @@ def base_run_python_statistics(): if __name__ == "__main__": scheduler.start() - http_server = WSGIServer(("127.0.0.1", 5055), app) + http_server = WSGIServer(("0.0.0.0", 5055), app) http_server.serve_forever() diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index b5940bb..571886f 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -2,18 +2,23 @@ import logging import os import time +from datetime import datetime from threading import Lock, Thread from database import DBSession from faas_scheduler.models import ScriptLog from faas_scheduler.utils import ( add_script, + delete_log_after_days, + delete_statistics_after_days, run_script, get_script_file, hook_update_script ) logger = logging.getLogger(__name__) +SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) +TIMEOUT_OUTPUT = "Script running for too long time!" class ScriptQueue: @@ -68,6 +73,7 @@ def add_script_log(self, script_log: ScriptLog): with self.lock: self.q.append(script_log) self.script_logs_dict[script_log.id] = script_log + self.inspect_queue_and_running(pre_msg=f'add script {script_log.get_info()} to queue') def get(self): """get the first valid task from self.q @@ -83,25 +89,83 @@ def get(self): if self.can_run_script(script_log): return_task = script_log self.q.pop(index) + self.increase_running(script_log) + self.inspect_queue_and_running(pre_msg=f'get script {script_log.get_info()} from queue') break index += 1 return return_task + def increase_running(self, script_log): + if script_log.org_id != -1: + running_team_key = f'{script_log.org_id}' + else: + running_team_key = f'{script_log.owner}' + running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' + running_script_key = f'{running_base_key}_{script_log.script_name}' + self.running_count[running_team_key] = self.running_count[running_team_key] + 1 if self.running_count.get(running_team_key) else 1 + self.running_count[running_base_key] = self.running_count[running_base_key] + 1 if self.running_count.get(running_base_key) else 1 + self.running_count[running_script_key] = self.running_count[running_script_key] + 1 if self.running_count.get(running_script_key) else 1 + + def decrease_running(self, script_log): + if script_log.org_id != -1: + running_team_key = f'{script_log.org_id}' + else: + running_team_key = f'{script_log.owner}' + running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' + running_script_key = f'{running_base_key}_{script_log.script_name}' + + if running_team_key in self.running_count: + self.running_count[running_team_key] -= 1 + if not self.running_count.get(running_team_key): + self.running_count.pop(running_team_key, None) + + if running_base_key in self.running_count: + self.running_count[running_base_key] -= 1 + if not self.running_count.get(running_base_key): + self.running_count.pop(running_base_key, None) + + if running_script_key in self.running_count: + self.running_count[running_script_key] -= 1 + if not self.running_count.get(running_script_key): + self.running_count.pop(running_script_key, None) + def script_done_callback(self, script_log: ScriptLog): with self.lock: - if script_log.org_id != -1: - running_team_key = f'{script_log.org_id}' - else: - running_team_key = f'{script_log.owner}' - running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' - running_script_key = f'{running_base_key}_{script_log.script_name}' - if running_team_key in self.running_count: - self.running_count[running_team_key] -= 1 - if running_base_key in self.running_count: - self.running_count[running_base_key] -= 1 - if running_script_key in self.running_count: - self.running_count[running_script_key] -= 1 + self.script_logs_dict.pop(script_log.id, None) + self.decrease_running(script_log) + self.inspect_queue_and_running(pre_msg=f'script {script_log.get_info()} run done') + + def inspect_queue_and_running(self, pre_msg=None): + if logger.root.level != logging.DEBUG: + return + lines = ['\n'] + if pre_msg: + lines.append(pre_msg) + lines.append(f"{'>' * 10} running {'>' * 10}") + for key, value in self.running_count.items(): + lines.append(f'{key}: {value}') + lines.append(f"{'<' * 10} running {'<' * 10}") + + lines.append(f"{'>' * 10} queue {'>' * 10}") + for script_log in self.q: + lines.append(f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}") + lines.append(f"{'<' * 10} queue {'<' * 10}") + logger.debug('\n'.join(lines)) + + def get_script_log_by_id(self, script_id): + return self.script_logs_dict.get(script_id) + + def get_timeout_scripts(self): + script_logs = [] + now_time = datetime.now() + with self.lock: + for index in range(len(self.q) - 1, -1, -1): + script_log = self.q[index] + if (now_time - script_log.started_at).seconds >= SUB_PROCESS_TIMEOUT: + script_logs.append(self.q.pop(index)) + self.script_logs_dict.pop(script_log.id, None) + return script_logs class Scheduelr: @@ -157,7 +221,6 @@ def script_done_callback( output, spend_time ): - script_log = self.script_queue.script_logs_dict.pop(script_id) hook_update_script( DBSession(), script_id, @@ -166,8 +229,10 @@ def script_done_callback( output, spend_time ) + script_log = self.script_queue.get_script_log_by_id(script_id) if not script_log: # not counted in memory, only update db record return + self.script_queue.script_done_callback(script_log) def load_pending_script_logs(self): """load pending script logs, should be called only when server start @@ -176,9 +241,43 @@ def load_pending_script_logs(self): for script_log in script_logs: self.script_queue.add_script_log(script_log) + def timeout_setter(self): + while True: + db_session = DBSession() + now_time = datetime.now() + try: + script_logs = self.script_queue.get_timeout_scripts() + if script_logs: + db_session.query(ScriptLog).filter(ScriptLog.id.in_([script_log.id for script_log in script_logs])).update( + { + ScriptLog.state: ScriptLog.FINISHED, + ScriptLog.finished_at: now_time, + ScriptLog.success: False, + ScriptLog.output: TIMEOUT_OUTPUT, + ScriptLog.return_code: -1 + } + ) + except Exception as e: + logger.exception(e) + finally: + DBSession.remove() + time.sleep(60) + + def statistic_cleaner(self): + while True: + db_session = DBSession() + try: + delete_log_after_days(db_session) + delete_statistics_after_days(db_session) + except Exception as e: + logger.exception(e) + time.sleep(24 * 60 * 60) + def start(self): self.load_pending_script_logs() Thread(target=self.schedule, daemon=True).start() + Thread(target=self.statistic_cleaner, daemon=True).start() + Thread(target=self.timeout_setter, daemon=True).start() scheduler = Scheduelr() diff --git a/scheduler/app/timeout_setter.py b/scheduler/app/timeout_setter.py deleted file mode 100644 index 7d6bfe1..0000000 --- a/scheduler/app/timeout_setter.py +++ /dev/null @@ -1,56 +0,0 @@ -import os -import gc -import time -import logging -from threading import Thread - -from database import DBSession -from faas_scheduler.utils import ( - check_and_set_tasks_timeout, - delete_log_after_days, - delete_statistics_after_days, - basic_log, -) - -basic_log("scheduler.log") - -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes - -logger = logging.getLogger(__name__) - - -class FAASTaskTimeoutSetter(Thread): - - def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour - - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() - - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) - - # remove old script_logs and statistics - delete_log_after_days(db_session) - delete_statistics_after_days(db_session) - - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) - - -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() From 383943d67e81c86abe29c4671f7ec78673c8fb18 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sat, 30 Aug 2025 17:20:19 +0800 Subject: [PATCH 04/21] update ScriptLog.state correctly --- scheduler/app/faas_scheduler/models.py | 2 +- scheduler/app/faas_scheduler/utils.py | 2 ++ scheduler/app/scheduler.py | 13 ++++++++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index a1924b3..bf9d019 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -54,8 +54,8 @@ def __init__( self.script_name = script_name self.context_data = context_data self.started_at = started_at - self.operate_from = operate_from self.state = state + self.operate_from = operate_from def get_info(self): return { diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index ffe3d24..5f062d1 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -381,6 +381,7 @@ def add_script( script_name, context_data, datetime.now(), + ScriptLog.PENDING, operate_from, ) db_session.add(script) @@ -394,6 +395,7 @@ def update_script(db_session, script, success, return_code, output): script.success = success script.return_code = return_code script.output = output + script.state = ScriptLog.FINISHED db_session.commit() return script diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 571886f..ae61842 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -200,7 +200,13 @@ def schedule(self): if not script_log: time.sleep(0.5) continue + db_session = DBSession() try: + db_session.query(ScriptLog).filter(ScriptLog.id==script_log.id).update( + {ScriptLog.state: ScriptLog.RUNNING}, + synchronize_session=False + ) + db_session.commit() script_file_info = get_script_file(script_log.dtable_uuid, script_log.script_name) run_script( script_log.id, @@ -212,6 +218,8 @@ def schedule(self): ) except Exception as e: logger.exception(f'run script: {script_log} error {e}') + finally: + DBSession.remove() def script_done_callback( self, @@ -255,7 +263,8 @@ def timeout_setter(self): ScriptLog.success: False, ScriptLog.output: TIMEOUT_OUTPUT, ScriptLog.return_code: -1 - } + }, + synchronize_session=False ) except Exception as e: logger.exception(e) @@ -271,6 +280,8 @@ def statistic_cleaner(self): delete_statistics_after_days(db_session) except Exception as e: logger.exception(e) + finally: + DBSession.remove() time.sleep(24 * 60 * 60) def start(self): From 2e3f7f4f6cc1792c1046c538b3a94c79d36b2f87 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sat, 30 Aug 2025 17:30:11 +0800 Subject: [PATCH 05/21] black fix --- scheduler/app/faas_scheduler/models.py | 18 +-- scheduler/app/faas_scheduler/utils.py | 2 +- scheduler/app/flask_server.py | 7 +- scheduler/app/scheduler.py | 156 ++++++++++++++----------- starter/runner.py | 2 +- 5 files changed, 102 insertions(+), 83 deletions(-) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index bf9d019..5b1d696 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -33,9 +33,9 @@ class ScriptLog(Base): operate_from = Column(String(255)) state = Column(String(10)) - PENDING = 'pending' - RUNNING = 'running' - FINISHED = 'finished' + PENDING = "pending" + RUNNING = "running" + FINISHED = "finished" def __init__( self, @@ -59,11 +59,11 @@ def __init__( def get_info(self): return { - 'id': self.id, - 'org_id': self.org_id, - 'owner': self.owner, - 'dtable_uuid': self.dtable_uuid, - 'script_name': self.script_name + "id": self.id, + "org_id": self.org_id, + "owner": self.owner, + "dtable_uuid": self.dtable_uuid, + "script_name": self.script_name, } def to_dict(self): @@ -84,7 +84,7 @@ def to_dict(self): "return_code": self.return_code, "output": self.output, "operate_from": self.operate_from, - "state": self.state + "state": self.state, } diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 5f062d1..c858b1f 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -172,7 +172,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): }, "context_data": context_data, "script_id": script_id, - "timeout": int(SUB_PROCESS_TIMEOUT) + "timeout": int(SUB_PROCESS_TIMEOUT), } headers = {"User-Agent": "python-scheduler/" + VERSION} logger.debug("I call starter at url %s", RUN_FUNC_URL) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index e13505c..c3b8c5b 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -95,7 +95,7 @@ def scripts_api(): owner, script_name, context_data, - operate_from + operate_from, ) return make_response(({"script_id": script_log.id}, 200)) @@ -131,7 +131,10 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid or script_name != script.script_name: + if ( + uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid + or script_name != script.script_name + ): return make_response(("Bad request", 400)) if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index ae61842..11bd53c 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -13,7 +13,7 @@ delete_statistics_after_days, run_script, get_script_file, - hook_update_script + hook_update_script, ) logger = logging.getLogger(__name__) @@ -28,43 +28,49 @@ def __init__(self): self.script_logs_dict = {} # a dict of {id: ScriptLog} self.lock = Lock() self.running_count = {} - # a dict of + # a dict of # { # "": 0, # "_": 0, # "__": 0 # } try: - run_limit_per_team = int(os.environ.get('RUN_LIMIT_PER_TEAM', 0)) + run_limit_per_team = int(os.environ.get("RUN_LIMIT_PER_TEAM", 0)) except: run_limit_per_team = 0 try: - run_limit_per_base = int(os.environ.get('RUN_LIMIT_PER_BASE', 0)) + run_limit_per_base = int(os.environ.get("RUN_LIMIT_PER_BASE", 0)) except: run_limit_per_base = 0 try: - run_limit_per_script = int(os.environ.get('RUN_LIMIT_PER_SCRIPT', 0)) + run_limit_per_script = int(os.environ.get("RUN_LIMIT_PER_SCRIPT", 0)) except: run_limit_per_script = 0 self.config = { - 'run_limit_per_team': run_limit_per_team, - 'run_limit_per_base': run_limit_per_base, - 'run_limit_per_script': run_limit_per_script + "run_limit_per_team": run_limit_per_team, + "run_limit_per_base": run_limit_per_base, + "run_limit_per_script": run_limit_per_script, } def can_run_script(self, script_log: ScriptLog): if script_log.org_id != -1: - running_team_key = f'{script_log.org_id}' + running_team_key = f"{script_log.org_id}" else: - running_team_key = f'{script_log.owner}' - running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' - running_script_key = f'{running_base_key}_{script_log.script_name}' + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" - if self.config['run_limit_per_team'] > 0 and self.config['run_limit_per_team'] <= self.running_count.get(running_team_key, 0): + if self.config["run_limit_per_team"] > 0 and self.config[ + "run_limit_per_team" + ] <= self.running_count.get(running_team_key, 0): return False - if self.config['run_limit_per_base'] > 0 and self.config['run_limit_per_base'] <= self.running_count.get(running_base_key, 0): + if self.config["run_limit_per_base"] > 0 and self.config[ + "run_limit_per_base" + ] <= self.running_count.get(running_base_key, 0): return False - if self.config['run_limit_per_script'] > 0 and self.config['run_limit_per_script'] <= self.running_count.get(running_script_key, 0): + if self.config["run_limit_per_script"] > 0 and self.config[ + "run_limit_per_script" + ] <= self.running_count.get(running_script_key, 0): return False return True @@ -73,7 +79,9 @@ def add_script_log(self, script_log: ScriptLog): with self.lock: self.q.append(script_log) self.script_logs_dict[script_log.id] = script_log - self.inspect_queue_and_running(pre_msg=f'add script {script_log.get_info()} to queue') + self.inspect_queue_and_running( + pre_msg=f"add script {script_log.get_info()} to queue" + ) def get(self): """get the first valid task from self.q @@ -90,7 +98,9 @@ def get(self): return_task = script_log self.q.pop(index) self.increase_running(script_log) - self.inspect_queue_and_running(pre_msg=f'get script {script_log.get_info()} from queue') + self.inspect_queue_and_running( + pre_msg=f"get script {script_log.get_info()} from queue" + ) break index += 1 @@ -98,22 +108,34 @@ def get(self): def increase_running(self, script_log): if script_log.org_id != -1: - running_team_key = f'{script_log.org_id}' + running_team_key = f"{script_log.org_id}" else: - running_team_key = f'{script_log.owner}' - running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' - running_script_key = f'{running_base_key}_{script_log.script_name}' - self.running_count[running_team_key] = self.running_count[running_team_key] + 1 if self.running_count.get(running_team_key) else 1 - self.running_count[running_base_key] = self.running_count[running_base_key] + 1 if self.running_count.get(running_base_key) else 1 - self.running_count[running_script_key] = self.running_count[running_script_key] + 1 if self.running_count.get(running_script_key) else 1 + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" + self.running_count[running_team_key] = ( + self.running_count[running_team_key] + 1 + if self.running_count.get(running_team_key) + else 1 + ) + self.running_count[running_base_key] = ( + self.running_count[running_base_key] + 1 + if self.running_count.get(running_base_key) + else 1 + ) + self.running_count[running_script_key] = ( + self.running_count[running_script_key] + 1 + if self.running_count.get(running_script_key) + else 1 + ) def decrease_running(self, script_log): if script_log.org_id != -1: - running_team_key = f'{script_log.org_id}' + running_team_key = f"{script_log.org_id}" else: - running_team_key = f'{script_log.owner}' - running_base_key = f'{running_team_key}_{script_log.dtable_uuid}' - running_script_key = f'{running_base_key}_{script_log.script_name}' + running_team_key = f"{script_log.owner}" + running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" + running_script_key = f"{running_base_key}_{script_log.script_name}" if running_team_key in self.running_count: self.running_count[running_team_key] -= 1 @@ -134,24 +156,28 @@ def script_done_callback(self, script_log: ScriptLog): with self.lock: self.script_logs_dict.pop(script_log.id, None) self.decrease_running(script_log) - self.inspect_queue_and_running(pre_msg=f'script {script_log.get_info()} run done') + self.inspect_queue_and_running( + pre_msg=f"script {script_log.get_info()} run done" + ) def inspect_queue_and_running(self, pre_msg=None): if logger.root.level != logging.DEBUG: return - lines = ['\n'] + lines = ["\n"] if pre_msg: lines.append(pre_msg) lines.append(f"{'>' * 10} running {'>' * 10}") for key, value in self.running_count.items(): - lines.append(f'{key}: {value}') + lines.append(f"{key}: {value}") lines.append(f"{'<' * 10} running {'<' * 10}") lines.append(f"{'>' * 10} queue {'>' * 10}") for script_log in self.q: - lines.append(f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}") + lines.append( + f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}" + ) lines.append(f"{'<' * 10} queue {'<' * 10}") - logger.debug('\n'.join(lines)) + logger.debug("\n".join(lines)) def get_script_log_by_id(self, script_id): return self.script_logs_dict.get(script_id) @@ -174,14 +200,8 @@ def __init__(self): self.script_queue = ScriptQueue() def add_script_log( - self, - dtable_uuid, - org_id, - owner, - script_name, - context_data, - operate_from - ): + self, dtable_uuid, org_id, owner, script_name, context_data, operate_from + ): script_log = add_script( DBSession(), dtable_uuid, @@ -189,7 +209,7 @@ def add_script_log( org_id, script_name, context_data, - operate_from + operate_from, ) self.script_queue.add_script_log(script_log) return script_log @@ -202,40 +222,31 @@ def schedule(self): continue db_session = DBSession() try: - db_session.query(ScriptLog).filter(ScriptLog.id==script_log.id).update( - {ScriptLog.state: ScriptLog.RUNNING}, - synchronize_session=False + db_session.query(ScriptLog).filter( + ScriptLog.id == script_log.id + ).update( + {ScriptLog.state: ScriptLog.RUNNING}, synchronize_session=False ) db_session.commit() - script_file_info = get_script_file(script_log.dtable_uuid, script_log.script_name) + script_file_info = get_script_file( + script_log.dtable_uuid, script_log.script_name + ) run_script( script_log.id, script_log.dtable_uuid, script_log.script_name, - script_file_info['script_url'], - script_file_info['temp_api_token'], - json.loads(script_log.context_data) + script_file_info["script_url"], + script_file_info["temp_api_token"], + json.loads(script_log.context_data), ) except Exception as e: - logger.exception(f'run script: {script_log} error {e}') + logger.exception(f"run script: {script_log} error {e}") finally: DBSession.remove() - def script_done_callback( - self, - script_id, - success, - return_code, - output, - spend_time - ): + def script_done_callback(self, script_id, success, return_code, output, spend_time): hook_update_script( - DBSession(), - script_id, - success, - return_code, - output, - spend_time + DBSession(), script_id, success, return_code, output, spend_time ) script_log = self.script_queue.get_script_log_by_id(script_id) if not script_log: # not counted in memory, only update db record @@ -243,9 +254,12 @@ def script_done_callback( self.script_queue.script_done_callback(script_log) def load_pending_script_logs(self): - """load pending script logs, should be called only when server start - """ - script_logs = DBSession.query(ScriptLog).filter_by(state=ScriptLog.PENDING).order_by(ScriptLog.id) + """load pending script logs, should be called only when server start""" + script_logs = ( + DBSession.query(ScriptLog) + .filter_by(state=ScriptLog.PENDING) + .order_by(ScriptLog.id) + ) for script_log in script_logs: self.script_queue.add_script_log(script_log) @@ -256,15 +270,17 @@ def timeout_setter(self): try: script_logs = self.script_queue.get_timeout_scripts() if script_logs: - db_session.query(ScriptLog).filter(ScriptLog.id.in_([script_log.id for script_log in script_logs])).update( + db_session.query(ScriptLog).filter( + ScriptLog.id.in_([script_log.id for script_log in script_logs]) + ).update( { ScriptLog.state: ScriptLog.FINISHED, ScriptLog.finished_at: now_time, ScriptLog.success: False, ScriptLog.output: TIMEOUT_OUTPUT, - ScriptLog.return_code: -1 + ScriptLog.return_code: -1, }, - synchronize_session=False + synchronize_session=False, ) except Exception as e: logger.exception(e) diff --git a/starter/runner.py b/starter/runner.py index 9787f85..7393016 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -459,4 +459,4 @@ def health_check(): if __name__ == "__main__": - app.run(host='0.0.0.0', port=8088, debug=False) + app.run(host="0.0.0.0", port=8088, debug=False) From 2de974888915211f056ac8cd406d3c028a270313 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 31 Aug 2025 02:50:37 +0800 Subject: [PATCH 06/21] add created_at column in script_log --- scheduler/app/faas_scheduler/models.py | 10 ++- scheduler/app/faas_scheduler/utils.py | 20 ++++-- scheduler/app/flask_server.py | 29 ++++---- scheduler/app/scheduler.py | 99 ++++++++++++-------------- starter/runner.py | 8 ++- 5 files changed, 83 insertions(+), 83 deletions(-) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 5b1d696..947698c 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -32,6 +32,7 @@ class ScriptLog(Base): output = Column(Text, nullable=True) operate_from = Column(String(255)) state = Column(String(10)) + created_at = Column(DateTime, index=True) PENDING = "pending" RUNNING = "running" @@ -44,8 +45,8 @@ def __init__( org_id, script_name, context_data, - started_at, state, + created_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -53,8 +54,8 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.started_at = started_at self.state = state + self.created_at = created_at self.operate_from = operate_from def get_info(self): @@ -77,7 +78,8 @@ def to_dict(self): "context_data": ( json.loads(self.context_data) if self.context_data else None ), - "started_at": datetime_to_isoformat_timestr(self.started_at), + "started_at": self.started_at + and datetime_to_isoformat_timestr(self.started_at), "finished_at": self.finished_at and datetime_to_isoformat_timestr(self.finished_at), "success": self.success, @@ -85,6 +87,8 @@ def to_dict(self): "output": self.output, "operate_from": self.operate_from, "state": self.state, + "created_at": self.created_at + and datetime_to_isoformat_timestr(self.created_at), } diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index c858b1f..c288c34 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -2,7 +2,7 @@ import json import logging import requests -from datetime import datetime +from datetime import datetime, timedelta from uuid import UUID from tzlocal import get_localzone @@ -380,8 +380,8 @@ def add_script( org_id, script_name, context_data, - datetime.now(), ScriptLog.PENDING, + datetime.now(), operate_from, ) db_session.add(script) @@ -390,8 +390,11 @@ def add_script( return script -def update_script(db_session, script, success, return_code, output): - script.finished_at = datetime.now() +def update_script( + db_session, script, success, return_code, output, started_at, finished_at +): + script.started_at = started_at + script.finished_at = finished_at script.success = success script.return_code = return_code script.output = output @@ -423,10 +426,15 @@ def run_script( return True -def hook_update_script(db_session, script_id, success, return_code, output, spend_time): +def hook_update_script( + db_session, script_id, success, return_code, output, started_at, spend_time +): script = db_session.query(ScriptLog).filter_by(id=script_id).first() if script: - update_script(db_session, script, success, return_code, output) + finished_at = started_at + timedelta(seconds=spend_time) + update_script( + db_session, script, success, return_code, output, started_at, finished_at + ) update_statistics( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index c3b8c5b..9f901b4 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -14,7 +14,6 @@ check_auth_token, get_script, get_run_script_statistics_by_month, - hook_update_script, can_run_task, get_run_scripts_count_monthly, ping_starter, @@ -89,7 +88,7 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script_log = scheduler.add_script_log( + script_log = scheduler.add( uuid_str_to_32_chars(dtable_uuid), org_id, owner, @@ -137,23 +136,21 @@ def script_api(script_id): ): return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() + # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): + # now = datetime.now() + # duration_seconds = (now - script.created_at).seconds + # if duration_seconds > SUB_PROCESS_TIMEOUT: + # script.success = False + # script.return_code = -1 + # script.finished_at = now + # script.output = TIMEOUT_OUTPUT + # db_session.commit() return make_response(({"script": script.to_dict()}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) - finally: - db_session.close() # get python script statistics logs... @@ -278,16 +275,14 @@ def record_script_result(): success = data.get("success", False) return_code = data.get("return_code") output = data.get("output") + started_at = datetime.fromisoformat(data.get("started_at")) spend_time = data.get("spend_time") script_id = data.get("script_id") # update script_log and run-time statistics try: if script_id: - # hook_update_script( - # db_session, script_id, success, return_code, output, spend_time - # ) scheduler.script_done_callback( - script_id, success, return_code, output, spend_time + script_id, success, return_code, output, started_at, spend_time ) except Exception as e: diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 11bd53c..0ceb5c0 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -2,7 +2,7 @@ import logging import os import time -from datetime import datetime +from datetime import datetime, timedelta from threading import Lock, Thread from database import DBSession @@ -24,8 +24,10 @@ class ScriptQueue: def __init__(self): - self.q = [] # a list of ScriptLog - self.script_logs_dict = {} # a dict of {id: ScriptLog} + self.script_queue = ( + [] + ) # a list of ScriptLog instances, but can not be used to update database records!!! + self.script_dict = {} # a dict of {id: ScriptLog} self.lock = Lock() self.running_count = {} # a dict of @@ -75,10 +77,10 @@ def can_run_script(self, script_log: ScriptLog): return True - def add_script_log(self, script_log: ScriptLog): + def add(self, script_log: ScriptLog): with self.lock: - self.q.append(script_log) - self.script_logs_dict[script_log.id] = script_log + self.script_queue.append(script_log) + self.script_dict[script_log.id] = script_log self.inspect_queue_and_running( pre_msg=f"add script {script_log.get_info()} to queue" ) @@ -92,11 +94,11 @@ def get(self): return_task = None index = 0 - while index < len(self.q): - script_log = self.q[index] + while index < len(self.script_queue): + script_log = self.script_queue[index] if self.can_run_script(script_log): return_task = script_log - self.q.pop(index) + self.script_queue.pop(index) self.increase_running(script_log) self.inspect_queue_and_running( pre_msg=f"get script {script_log.get_info()} from queue" @@ -154,7 +156,7 @@ def decrease_running(self, script_log): def script_done_callback(self, script_log: ScriptLog): with self.lock: - self.script_logs_dict.pop(script_log.id, None) + self.script_dict.pop(script_log.id, None) self.decrease_running(script_log) self.inspect_queue_and_running( pre_msg=f"script {script_log.get_info()} run done" @@ -172,7 +174,7 @@ def inspect_queue_and_running(self, pre_msg=None): lines.append(f"{'<' * 10} running {'<' * 10}") lines.append(f"{'>' * 10} queue {'>' * 10}") - for script_log in self.q: + for script_log in self.script_queue: lines.append( f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}" ) @@ -180,17 +182,24 @@ def inspect_queue_and_running(self, pre_msg=None): logger.debug("\n".join(lines)) def get_script_log_by_id(self, script_id): - return self.script_logs_dict.get(script_id) + return self.script_dict.get(script_id) - def get_timeout_scripts(self): + def pop_timeout_scripts(self): script_logs = [] now_time = datetime.now() with self.lock: - for index in range(len(self.q) - 1, -1, -1): - script_log = self.q[index] - if (now_time - script_log.started_at).seconds >= SUB_PROCESS_TIMEOUT: - script_logs.append(self.q.pop(index)) - self.script_logs_dict.pop(script_log.id, None) + for index in range(len(self.script_queue) - 1, -1, -1): + script_log = self.script_queue[index] + if ( + script_log.state == ScriptLog.RUNNING + and (now_time - script_log.started_at).seconds + >= SUB_PROCESS_TIMEOUT + ): + script_logs.append(self.script_queue.pop(index)) + self.decrease_running(script_log) + self.inspect_queue_and_running( + pre_msg=f"set script {script_log.get_info()} timeout from queue" + ) return script_logs @@ -199,9 +208,7 @@ class Scheduelr: def __init__(self): self.script_queue = ScriptQueue() - def add_script_log( - self, dtable_uuid, org_id, owner, script_name, context_data, operate_from - ): + def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from): script_log = add_script( DBSession(), dtable_uuid, @@ -211,7 +218,7 @@ def add_script_log( context_data, operate_from, ) - self.script_queue.add_script_log(script_log) + self.script_queue.add(script_log) return script_log def schedule(self): @@ -225,7 +232,11 @@ def schedule(self): db_session.query(ScriptLog).filter( ScriptLog.id == script_log.id ).update( - {ScriptLog.state: ScriptLog.RUNNING}, synchronize_session=False + { + ScriptLog.state: ScriptLog.RUNNING, + ScriptLog.started_at: script_log.started_at, + }, + synchronize_session=False, ) db_session.commit() script_file_info = get_script_file( @@ -244,49 +255,28 @@ def schedule(self): finally: DBSession.remove() - def script_done_callback(self, script_id, success, return_code, output, spend_time): + def script_done_callback( + self, script_id, success, return_code, output, started_at, spend_time + ): hook_update_script( - DBSession(), script_id, success, return_code, output, spend_time + DBSession(), script_id, success, return_code, output, started_at, spend_time ) script_log = self.script_queue.get_script_log_by_id(script_id) if not script_log: # not counted in memory, only update db record return self.script_queue.script_done_callback(script_log) - def load_pending_script_logs(self): + def load_pending_scripts(self): """load pending script logs, should be called only when server start""" - script_logs = ( + script_logs = list( DBSession.query(ScriptLog) .filter_by(state=ScriptLog.PENDING) + .filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1))) .order_by(ScriptLog.id) ) + logger.info(f"load {len(script_logs)} pending scripts created within 1 hour") for script_log in script_logs: - self.script_queue.add_script_log(script_log) - - def timeout_setter(self): - while True: - db_session = DBSession() - now_time = datetime.now() - try: - script_logs = self.script_queue.get_timeout_scripts() - if script_logs: - db_session.query(ScriptLog).filter( - ScriptLog.id.in_([script_log.id for script_log in script_logs]) - ).update( - { - ScriptLog.state: ScriptLog.FINISHED, - ScriptLog.finished_at: now_time, - ScriptLog.success: False, - ScriptLog.output: TIMEOUT_OUTPUT, - ScriptLog.return_code: -1, - }, - synchronize_session=False, - ) - except Exception as e: - logger.exception(e) - finally: - DBSession.remove() - time.sleep(60) + self.script_queue.add(script_log) def statistic_cleaner(self): while True: @@ -301,10 +291,9 @@ def statistic_cleaner(self): time.sleep(24 * 60 * 60) def start(self): - self.load_pending_script_logs() + self.load_pending_scripts() Thread(target=self.schedule, daemon=True).start() Thread(target=self.statistic_cleaner, daemon=True).start() - Thread(target=self.timeout_setter, daemon=True).start() scheduler = Scheduelr() diff --git a/starter/runner.py b/starter/runner.py index 7393016..83b99f7 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -7,6 +7,7 @@ import time import ast import sys +from datetime import datetime from concurrent.futures import ThreadPoolExecutor from uuid import uuid4 @@ -132,7 +133,9 @@ def to_python_bool(value): return value.lower() == "true" -def send_to_scheduler(success, return_code, output, spend_time, request_data): +def send_to_scheduler( + success, return_code, output, started_at, spend_time, request_data +): """ This function is used to send result of script to scheduler - success: whether script running successfully @@ -152,6 +155,7 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): "success": success, "return_code": return_code, "output": output, + "started_at": datetime.fromtimestamp(started_at).isoformat(), "spend_time": spend_time, } result_data.update( @@ -427,7 +431,7 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, spend_time, data) + send_to_scheduler(return_code == 0, return_code, output, start_at, spend_time, data) #################### From fc1a6b5ddd0b976e1c748a6c55f650e354fec5fc Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 31 Aug 2025 03:56:34 +0800 Subject: [PATCH 07/21] update --- scheduler/app/flask_server.py | 2 +- starter/runner.py | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 9f901b4..b2d2d4f 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -276,7 +276,7 @@ def record_script_result(): return_code = data.get("return_code") output = data.get("output") started_at = datetime.fromisoformat(data.get("started_at")) - spend_time = data.get("spend_time") + spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") # update script_log and run-time statistics try: diff --git a/starter/runner.py b/starter/runner.py index 83b99f7..a176873 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -141,6 +141,7 @@ def send_to_scheduler( - success: whether script running successfully - return_code: return-code of subprocess - output: output of subprocess or error message + - started_at: start timestamp - spend_time: time subprocess took - request_data: data from request """ @@ -156,7 +157,7 @@ def send_to_scheduler( "return_code": return_code, "output": output, "started_at": datetime.fromtimestamp(started_at).isoformat(), - "spend_time": spend_time, + "spend_time": spend_time or 0, } result_data.update( { @@ -190,9 +191,11 @@ def run_python(data): logging.info("New python run initalized... (v%s)", VERSION) + started_at = time.time() + script_url = data.get("script_url") if not script_url: - send_to_scheduler(False, None, "Script URL is missing", None, data) + send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) return if ( to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) @@ -232,11 +235,11 @@ def run_python(data): logging.error( "Failed to get script from %s, response: %s", script_url, resp ) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return except Exception as e: logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return logging.debug("Generate temporary random folder directory") @@ -343,8 +346,6 @@ def run_python(data): command.append("run") # override command logging.debug("command: %s", command) - start_at = time.time() - logging.debug("try to start the python runner image") try: result = subprocess.run( @@ -375,6 +376,7 @@ def run_python(data): False, -1, "The script's running time exceeded the limit and the execution was aborted.", + started_at, DEFAULT_SUB_PROCESS_TIMEOUT, data, ) @@ -382,7 +384,7 @@ def run_python(data): except Exception as e: logging.exception(e) logging.error("Failed to run file %s error: %s", script_url, e) - send_to_scheduler(False, None, None, None, data) + send_to_scheduler(False, None, None, started_at, None, data) return else: logging.debug( @@ -392,7 +394,7 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", time.time() - start_at, data + False, -1, "Script invalid!", started_at, time.time() - started_at, data ) return with open(output_file_path, "r") as f: @@ -422,7 +424,7 @@ def run_python(data): except Exception as e: logging.warning("Fail to remove container error: %s", e) - spend_time = time.time() - start_at + spend_time = time.time() - started_at logging.info("python run finished successful. duration was: %s", spend_time) logging.debug( "send this to the scheduler. return_code: %s, output: %s, spend_time: %s, data: %s", @@ -431,7 +433,7 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, start_at, spend_time, data) + send_to_scheduler(return_code == 0, return_code, output, started_at, spend_time, data) #################### From a107ed859d232bed906068155eb965f95d2069ba Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 31 Aug 2025 03:59:56 +0800 Subject: [PATCH 08/21] update --- starter/runner.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/starter/runner.py b/starter/runner.py index a176873..1cdd10d 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -394,7 +394,12 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", started_at, time.time() - started_at, data + False, + -1, + "Script invalid!", + started_at, + time.time() - started_at, + data, ) return with open(output_file_path, "r") as f: @@ -433,7 +438,9 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, started_at, spend_time, data) + send_to_scheduler( + return_code == 0, return_code, output, started_at, spend_time, data + ) #################### From 09850d5ce06c741f424319efd579869794c4df6f Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 12:05:14 +0800 Subject: [PATCH 09/21] update --- scheduler/app/scheduler.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 0ceb5c0..060b744 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -232,10 +232,7 @@ def schedule(self): db_session.query(ScriptLog).filter( ScriptLog.id == script_log.id ).update( - { - ScriptLog.state: ScriptLog.RUNNING, - ScriptLog.started_at: script_log.started_at, - }, + {ScriptLog.state: ScriptLog.RUNNING}, synchronize_session=False, ) db_session.commit() @@ -252,6 +249,17 @@ def schedule(self): ) except Exception as e: logger.exception(f"run script: {script_log} error {e}") + db_session.query(ScriptLog).filter( + ScriptLog.id == script_log.id + ).update( + { + ScriptLog.started_at: datetime.now(), + ScriptLog.finished_at: datetime.now(), + ScriptLog.state: ScriptLog.FINISHED, + }, + synchronize_session=False, + ) + db_session.commit() finally: DBSession.remove() From 680b7cd7c722580c3d5b965fc4cffef58149c250 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 14:15:31 +0800 Subject: [PATCH 10/21] update stats when add script --- scheduler/app/faas_scheduler/utils.py | 187 ++++++++++++++++++-------- scheduler/app/flask_server.py | 4 +- 2 files changed, 134 insertions(+), 57 deletions(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index c288c34..427b8e8 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -7,7 +7,12 @@ from tzlocal import get_localzone from sqlalchemy import desc, text -from faas_scheduler.models import ScriptLog +from faas_scheduler.models import ( + ScriptLog, + UserRunScriptStatistics, + OrgRunScriptStatistics, + DTableRunScriptStatistics, +) import sys @@ -197,66 +202,134 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None): return None -def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return - username = owner - - # dtable_run_script_statistcis - sqls = [ - """ - INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES - (:dtable_uuid, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_count(db_session, dtable_uuid, owner, org_id): + run_date = datetime.today().strftime("%Y-%m-%d") + try: + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_count += 1 + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_count += 1 + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_count += 1 + org_stats.update_at = datetime.now() + db_session.commit() + except Exception as e: + logger.exception( + f"update stats for org_id {org_id} owner {owner} dtable {dtable_uuid} run count error {e}" + ) - # org_run_script_statistics - if org_id and org_id != -1: - sqls += [ - """ - INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # user_run_script_statistics - if "@seafile_group" not in username: - sqls += [ - """ - INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:username, :org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - org_id=:org_id, - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): + if not spend_time: + return + run_date = datetime.today().strftime("%Y-%m-%d") try: - for sql in sqls: - db_session.execute( - text(sql), - { - "dtable_uuid": dtable_uuid, - "username": username, - "org_id": org_id, - "run_date": datetime.today(), - "spend_time": spend_time, - "update_at": datetime.now(), - }, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_time += spend_time + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_time += spend_time + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_time += spend_time + org_stats.update_at = datetime.now() db_session.commit() except Exception as e: - logger.exception("update statistics sql error: %s", e) + logger.exception( + f"update stats for org_id {org_id} owner {owner} dtable {dtable_uuid} run time error {e}" + ) # required to get "script logs" in dtable-web @@ -387,6 +460,8 @@ def add_script( db_session.add(script) db_session.commit() + update_stats_run_count(db_session, dtable_uuid, owner, org_id) + return script @@ -435,7 +510,7 @@ def hook_update_script( update_script( db_session, script, success, return_code, output, started_at, finished_at ) - update_statistics( + update_stats_run_time( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index b2d2d4f..860a0d3 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -175,7 +175,9 @@ def task_logs_api(dtable_uuid, script_name): db_session = DBSession() try: - task_logs = list_task_logs(db_session, dtable_uuid, script_name, order_by) + task_logs = list_task_logs( + db_session, uuid_str_to_32_chars(dtable_uuid), script_name, order_by + ) count = task_logs.count() task_logs = task_logs[start:end] task_log_list = [task_log.to_dict() for task_log in task_logs] From e5805a5cb1bab6f86f986d040af374d571b95dd3 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 14:20:59 +0800 Subject: [PATCH 11/21] fix quality --- scheduler/app/faas_scheduler/utils.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 427b8e8..203c161 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -262,7 +262,11 @@ def update_stats_run_count(db_session, dtable_uuid, owner, org_id): db_session.commit() except Exception as e: logger.exception( - f"update stats for org_id {org_id} owner {owner} dtable {dtable_uuid} run count error {e}" + "update stats for org_id %s owner %s dtable %s run count error %s", + org_id, + owner, + dtable_uuid, + e, ) @@ -328,7 +332,11 @@ def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): db_session.commit() except Exception as e: logger.exception( - f"update stats for org_id {org_id} owner {owner} dtable {dtable_uuid} run time error {e}" + "update stats for org_id %s owner %s dtable %s run time error %s", + org_id, + owner, + dtable_uuid, + e, ) From 455a4a5376373806340d48f57009af6cb20d0ff7 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 14:33:07 +0800 Subject: [PATCH 12/21] update --- scheduler/app/faas_scheduler/utils.py | 2 -- scheduler/app/scheduler.py | 11 ++--------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 203c161..b438651 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -271,8 +271,6 @@ def update_stats_run_count(db_session, dtable_uuid, owner, org_id): def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return run_date = datetime.today().strftime("%Y-%m-%d") try: dtable_stats = ( diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 060b744..5b9fadc 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -249,15 +249,8 @@ def schedule(self): ) except Exception as e: logger.exception(f"run script: {script_log} error {e}") - db_session.query(ScriptLog).filter( - ScriptLog.id == script_log.id - ).update( - { - ScriptLog.started_at: datetime.now(), - ScriptLog.finished_at: datetime.now(), - ScriptLog.state: ScriptLog.FINISHED, - }, - synchronize_session=False, + hook_update_script( + db_session, script_log.id, False, -1, "", datetime.now(), 0 ) db_session.commit() finally: From beb7f11f0a6cc4a10887602d6ea7f21f8e2c75b6 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 16:51:17 +0800 Subject: [PATCH 13/21] add return resp when some error --- starter/runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/starter/runner.py b/starter/runner.py index 1cdd10d..cbbeb83 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -271,6 +271,7 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: logging.error("Failed to save script %s, error: %s", script_url, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return try: @@ -278,6 +279,7 @@ def run_python(data): os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return logging.debug("prepare the command to start the python runner") From 53a6ed246fa8b446861b16f8625cfc2dcf17259c Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 1 Sep 2025 17:32:17 +0800 Subject: [PATCH 14/21] update --- scheduler/app/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 5b9fadc..80d50d6 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -249,10 +249,9 @@ def schedule(self): ) except Exception as e: logger.exception(f"run script: {script_log} error {e}") - hook_update_script( - db_session, script_log.id, False, -1, "", datetime.now(), 0 + self.script_done_callback( + script_log.id, False, -1, "", datetime.now(), 0 ) - db_session.commit() finally: DBSession.remove() From ee272e75bef6cefd9e5ddd4aec4c145954c2a370 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Thu, 11 Sep 2025 22:14:34 +0800 Subject: [PATCH 15/21] add scheduler/starter bind host env --- scheduler/app/flask_server.py | 4 ++-- starter/runner.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 860a0d3..aa5bc0c 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -33,6 +33,7 @@ TIMEOUT_OUTPUT = ( "The script's running time exceeded the limit and the execution was aborted." ) +HOST = os.environ.get("PYTHON_SCHEDULER_BIND_HOST", "127.0.0.1") app = Flask(__name__) @@ -373,6 +374,5 @@ def base_run_python_statistics(): if __name__ == "__main__": - scheduler.start() - http_server = WSGIServer(("0.0.0.0", 5055), app) + http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() diff --git a/starter/runner.py b/starter/runner.py index cbbeb83..a0d0520 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -71,6 +71,9 @@ SEATABLE_USER_UID = 1000 SEATABLE_USER_GID = 1000 +# bind host +HOST = os.environ.get("PYTHON_STARTER_BIND_HOST", "127.0.0.1") + def get_log_level(level): if level.lower() == "info": @@ -474,4 +477,4 @@ def health_check(): if __name__ == "__main__": - app.run(host="0.0.0.0", port=8088, debug=False) + app.run(host=HOST, port=8088, debug=False) From ef8104436d30e50f4f9f0115434a7b27ef1c7693 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Fri, 12 Sep 2025 00:16:13 +0800 Subject: [PATCH 16/21] only fix start/finish fields --- scheduler/app/faas_scheduler/models.py | 8 - scheduler/app/faas_scheduler/utils.py | 4 +- scheduler/app/flask_server.py | 4 +- scheduler/app/scheduler.py | 252 ++----------------------- 4 files changed, 18 insertions(+), 250 deletions(-) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 947698c..e8435c1 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,13 +31,8 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) - state = Column(String(10)) created_at = Column(DateTime, index=True) - PENDING = "pending" - RUNNING = "running" - FINISHED = "finished" - def __init__( self, dtable_uuid, @@ -45,7 +40,6 @@ def __init__( org_id, script_name, context_data, - state, created_at, operate_from=None, ): @@ -54,7 +48,6 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.state = state self.created_at = created_at self.operate_from = operate_from @@ -86,7 +79,6 @@ def to_dict(self): "return_code": self.return_code, "output": self.output, "operate_from": self.operate_from, - "state": self.state, "created_at": self.created_at and datetime_to_isoformat_timestr(self.created_at), } diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index b438651..dce0599 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -83,7 +83,7 @@ def ping_starter(): ## triggered from scheduler.py to remove old script_logs def delete_log_after_days(db_session): clean_script_logs = ( - "DELETE FROM `script_log` WHERE `started_at` < DATE_SUB(NOW(), INTERVAL %s DAY)" + "DELETE FROM `script_log` WHERE `created_at` < DATE_SUB(NOW(), INTERVAL %s DAY)" % DELETE_LOG_DAYS ) logger.debug(clean_script_logs) @@ -459,7 +459,6 @@ def add_script( org_id, script_name, context_data, - ScriptLog.PENDING, datetime.now(), operate_from, ) @@ -479,7 +478,6 @@ def update_script( script.success = success script.return_code = return_code script.output = output - script.state = ScriptLog.FINISHED db_session.commit() return script diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index aa5bc0c..a0b516c 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -59,7 +59,7 @@ def ping(): # called from dtable-web to start the python run @app.route("/run-script/", methods=["POST"]) -def scripts_api(): +def run_script_api(): if not check_auth_token(request): return make_response(("Forbidden: the auth token is not correct.", 403)) @@ -108,7 +108,7 @@ def scripts_api(): # called from dtable-web to get the status of a specific run. @app.route("/run-script//", methods=["GET"]) -def script_api(script_id): +def get_script_api(script_id): if not check_auth_token(request): return make_response(("Forbidden: the auth token is not correct.", 403)) diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 80d50d6..a461b51 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -2,6 +2,7 @@ import logging import os import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from threading import Lock, Thread @@ -14,199 +15,17 @@ run_script, get_script_file, hook_update_script, + check_and_set_tasks_timeout, ) logger = logging.getLogger(__name__) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) -TIMEOUT_OUTPUT = "Script running for too long time!" - - -class ScriptQueue: - - def __init__(self): - self.script_queue = ( - [] - ) # a list of ScriptLog instances, but can not be used to update database records!!! - self.script_dict = {} # a dict of {id: ScriptLog} - self.lock = Lock() - self.running_count = {} - # a dict of - # { - # "": 0, - # "_": 0, - # "__": 0 - # } - try: - run_limit_per_team = int(os.environ.get("RUN_LIMIT_PER_TEAM", 0)) - except: - run_limit_per_team = 0 - try: - run_limit_per_base = int(os.environ.get("RUN_LIMIT_PER_BASE", 0)) - except: - run_limit_per_base = 0 - try: - run_limit_per_script = int(os.environ.get("RUN_LIMIT_PER_SCRIPT", 0)) - except: - run_limit_per_script = 0 - self.config = { - "run_limit_per_team": run_limit_per_team, - "run_limit_per_base": run_limit_per_base, - "run_limit_per_script": run_limit_per_script, - } - - def can_run_script(self, script_log: ScriptLog): - if script_log.org_id != -1: - running_team_key = f"{script_log.org_id}" - else: - running_team_key = f"{script_log.owner}" - running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" - running_script_key = f"{running_base_key}_{script_log.script_name}" - - if self.config["run_limit_per_team"] > 0 and self.config[ - "run_limit_per_team" - ] <= self.running_count.get(running_team_key, 0): - return False - if self.config["run_limit_per_base"] > 0 and self.config[ - "run_limit_per_base" - ] <= self.running_count.get(running_base_key, 0): - return False - if self.config["run_limit_per_script"] > 0 and self.config[ - "run_limit_per_script" - ] <= self.running_count.get(running_script_key, 0): - return False - - return True - - def add(self, script_log: ScriptLog): - with self.lock: - self.script_queue.append(script_log) - self.script_dict[script_log.id] = script_log - self.inspect_queue_and_running( - pre_msg=f"add script {script_log.get_info()} to queue" - ) - - def get(self): - """get the first valid task from self.q - - Return: an instance of ScriptTask or None - """ - with self.lock: - return_task = None - - index = 0 - while index < len(self.script_queue): - script_log = self.script_queue[index] - if self.can_run_script(script_log): - return_task = script_log - self.script_queue.pop(index) - self.increase_running(script_log) - self.inspect_queue_and_running( - pre_msg=f"get script {script_log.get_info()} from queue" - ) - break - index += 1 - - return return_task - - def increase_running(self, script_log): - if script_log.org_id != -1: - running_team_key = f"{script_log.org_id}" - else: - running_team_key = f"{script_log.owner}" - running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" - running_script_key = f"{running_base_key}_{script_log.script_name}" - self.running_count[running_team_key] = ( - self.running_count[running_team_key] + 1 - if self.running_count.get(running_team_key) - else 1 - ) - self.running_count[running_base_key] = ( - self.running_count[running_base_key] + 1 - if self.running_count.get(running_base_key) - else 1 - ) - self.running_count[running_script_key] = ( - self.running_count[running_script_key] + 1 - if self.running_count.get(running_script_key) - else 1 - ) - - def decrease_running(self, script_log): - if script_log.org_id != -1: - running_team_key = f"{script_log.org_id}" - else: - running_team_key = f"{script_log.owner}" - running_base_key = f"{running_team_key}_{script_log.dtable_uuid}" - running_script_key = f"{running_base_key}_{script_log.script_name}" - - if running_team_key in self.running_count: - self.running_count[running_team_key] -= 1 - if not self.running_count.get(running_team_key): - self.running_count.pop(running_team_key, None) - - if running_base_key in self.running_count: - self.running_count[running_base_key] -= 1 - if not self.running_count.get(running_base_key): - self.running_count.pop(running_base_key, None) - - if running_script_key in self.running_count: - self.running_count[running_script_key] -= 1 - if not self.running_count.get(running_script_key): - self.running_count.pop(running_script_key, None) - - def script_done_callback(self, script_log: ScriptLog): - with self.lock: - self.script_dict.pop(script_log.id, None) - self.decrease_running(script_log) - self.inspect_queue_and_running( - pre_msg=f"script {script_log.get_info()} run done" - ) - - def inspect_queue_and_running(self, pre_msg=None): - if logger.root.level != logging.DEBUG: - return - lines = ["\n"] - if pre_msg: - lines.append(pre_msg) - lines.append(f"{'>' * 10} running {'>' * 10}") - for key, value in self.running_count.items(): - lines.append(f"{key}: {value}") - lines.append(f"{'<' * 10} running {'<' * 10}") - - lines.append(f"{'>' * 10} queue {'>' * 10}") - for script_log in self.script_queue: - lines.append( - f"org_id: {script_log.org_id} owner: {script_log.owner} dtable_uuid: {script_log.dtable_uuid} script_name: {script_log.script_name}" - ) - lines.append(f"{'<' * 10} queue {'<' * 10}") - logger.debug("\n".join(lines)) - - def get_script_log_by_id(self, script_id): - return self.script_dict.get(script_id) - - def pop_timeout_scripts(self): - script_logs = [] - now_time = datetime.now() - with self.lock: - for index in range(len(self.script_queue) - 1, -1, -1): - script_log = self.script_queue[index] - if ( - script_log.state == ScriptLog.RUNNING - and (now_time - script_log.started_at).seconds - >= SUB_PROCESS_TIMEOUT - ): - script_logs.append(self.script_queue.pop(index)) - self.decrease_running(script_log) - self.inspect_queue_and_running( - pre_msg=f"set script {script_log.get_info()} timeout from queue" - ) - return script_logs class Scheduelr: def __init__(self): - self.script_queue = ScriptQueue() + self.executor = ThreadPoolExecutor() def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from): script_log = add_script( @@ -218,65 +37,26 @@ def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_fro context_data, operate_from, ) - self.script_queue.add(script_log) + script_file_info = get_script_file( + script_log.dtable_uuid, script_log.script_name + ) + self.executor.submit( + run_script, + script_log.id, + dtable_uuid, + script_name, + script_file_info["script_url"], + script_file_info["temp_api_token"], + context_data, + ) return script_log - def schedule(self): - while True: - script_log = self.script_queue.get() - if not script_log: - time.sleep(0.5) - continue - db_session = DBSession() - try: - db_session.query(ScriptLog).filter( - ScriptLog.id == script_log.id - ).update( - {ScriptLog.state: ScriptLog.RUNNING}, - synchronize_session=False, - ) - db_session.commit() - script_file_info = get_script_file( - script_log.dtable_uuid, script_log.script_name - ) - run_script( - script_log.id, - script_log.dtable_uuid, - script_log.script_name, - script_file_info["script_url"], - script_file_info["temp_api_token"], - json.loads(script_log.context_data), - ) - except Exception as e: - logger.exception(f"run script: {script_log} error {e}") - self.script_done_callback( - script_log.id, False, -1, "", datetime.now(), 0 - ) - finally: - DBSession.remove() - def script_done_callback( self, script_id, success, return_code, output, started_at, spend_time ): hook_update_script( DBSession(), script_id, success, return_code, output, started_at, spend_time ) - script_log = self.script_queue.get_script_log_by_id(script_id) - if not script_log: # not counted in memory, only update db record - return - self.script_queue.script_done_callback(script_log) - - def load_pending_scripts(self): - """load pending script logs, should be called only when server start""" - script_logs = list( - DBSession.query(ScriptLog) - .filter_by(state=ScriptLog.PENDING) - .filter(ScriptLog.created_at > (datetime.now() - timedelta(hours=1))) - .order_by(ScriptLog.id) - ) - logger.info(f"load {len(script_logs)} pending scripts created within 1 hour") - for script_log in script_logs: - self.script_queue.add(script_log) def statistic_cleaner(self): while True: @@ -291,8 +71,6 @@ def statistic_cleaner(self): time.sleep(24 * 60 * 60) def start(self): - self.load_pending_scripts() - Thread(target=self.schedule, daemon=True).start() Thread(target=self.statistic_cleaner, daemon=True).start() From af97917475c50dd1ff68d88f1acc8de30943138b Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 12 Sep 2025 15:14:03 +0800 Subject: [PATCH 17/21] update script log result when call starter failed --- scheduler/app/faas_scheduler/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index dce0599..253e49f 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -499,6 +499,8 @@ def run_script( call_faas_func(script_url, temp_api_token, context_data, script_id=script_id) except Exception as e: logger.exception("Run script %d error: %s", script_id, e) + now = datetime.now() + hook_update_script(db_session, script_id, False, -1, '', now, 0) finally: db_session.close() From 1c9c2e907650ef216bbd8b358f4301be03b07d71 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 12 Sep 2025 15:15:15 +0800 Subject: [PATCH 18/21] update black --- scheduler/app/faas_scheduler/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 253e49f..d945f8c 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -500,7 +500,7 @@ def run_script( except Exception as e: logger.exception("Run script %d error: %s", script_id, e) now = datetime.now() - hook_update_script(db_session, script_id, False, -1, '', now, 0) + hook_update_script(db_session, script_id, False, -1, "", now, 0) finally: db_session.close() From 7df7f4c9e12df1068e0f83ea7a23950866093c9b Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 12 Sep 2025 17:57:22 +0800 Subject: [PATCH 19/21] add IS_CHOWN_SCRIPT_DIR env --- starter/runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/starter/runner.py b/starter/runner.py index a0d0520..66f74e0 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -23,6 +23,7 @@ LOG_LEVEL = os.environ.get("PYTHON_STARTER_LOG_LEVEL", "INFO") TIME_ZONE = os.environ.get("TIME_ZONE", "") PYTHON_RUNNER_IMAGE = os.environ.get("PYTHON_RUNNER_IMAGE") +IS_CHOWN_SCRIPT_DIR = os.environ.get("IS_CHOWN_SCRIPT_DIR", "true").lower() == "true" THREAD_COUNT = int(os.environ.get("PYTHON_STARTER_THREAD_COUNT", 10)) SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) # 15 mins @@ -279,7 +280,8 @@ def run_python(data): try: logging.debug("Fix ownership of %s", tmp_dir) - os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) + if IS_CHOWN_SCRIPT_DIR: + os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) send_to_scheduler(False, -1, "", started_at, 0, data) From f48513a8a2e6167fd91009e7868e3e6594806515 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 18 Sep 2025 10:45:24 +0800 Subject: [PATCH 20/21] drop created_at in ScriptLog --- scheduler/app/database/__init__.py | 4 ++-- scheduler/app/faas_scheduler/models.py | 5 ----- scheduler/app/faas_scheduler/utils.py | 5 +---- scheduler/app/flask_server.py | 23 +++++++---------------- scheduler/app/monitor.sh | 13 ------------- scheduler/app/scheduler.py | 10 +++++----- scheduler/app/scheduler.sh | 4 ---- 7 files changed, 15 insertions(+), 49 deletions(-) diff --git a/scheduler/app/database/__init__.py b/scheduler/app/database/__init__.py index fd18bb7..d12cfaa 100644 --- a/scheduler/app/database/__init__.py +++ b/scheduler/app/database/__init__.py @@ -3,7 +3,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.orm import sessionmaker DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root") DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD") @@ -37,4 +37,4 @@ engine = create_engine(db_url, **db_kwargs) Base = declarative_base() -DBSession = scoped_session(sessionmaker(bind=engine)) +DBSession = sessionmaker(bind=engine) diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index e8435c1..747b3b1 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,7 +31,6 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) - created_at = Column(DateTime, index=True) def __init__( self, @@ -40,7 +39,6 @@ def __init__( org_id, script_name, context_data, - created_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -48,7 +46,6 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.created_at = created_at self.operate_from = operate_from def get_info(self): @@ -79,8 +76,6 @@ def to_dict(self): "return_code": self.return_code, "output": self.output, "operate_from": self.operate_from, - "created_at": self.created_at - and datetime_to_isoformat_timestr(self.created_at), } diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index d945f8c..4ab902c 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -80,10 +80,9 @@ def ping_starter(): return False -## triggered from scheduler.py to remove old script_logs def delete_log_after_days(db_session): clean_script_logs = ( - "DELETE FROM `script_log` WHERE `created_at` < DATE_SUB(NOW(), INTERVAL %s DAY)" + "DELETE FROM `script_log` WHERE `started_at` < DATE_SUB(NOW(), INTERVAL %s DAY)" % DELETE_LOG_DAYS ) logger.debug(clean_script_logs) @@ -99,7 +98,6 @@ def delete_log_after_days(db_session): db_session.close() -## triggered from scheduler.py to remove old statistics def delete_statistics_after_days(db_session): tables = [ "dtable_run_script_statistics", @@ -459,7 +457,6 @@ def add_script( org_id, script_name, context_data, - datetime.now(), operate_from, ) db_session.add(script) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index a0b516c..e3774cd 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -40,11 +40,6 @@ logger = logging.getLogger(__name__) -@app.teardown_appcontext -def shutdown_session(exception=None): - DBSession.remove() - - @app.route("/ping/", methods=["GET"]) def ping(): if not ping_starter(): @@ -90,6 +85,7 @@ def run_script_api(): ): return make_response(("The number of runs exceeds the limit"), 400) script_log = scheduler.add( + db_session, uuid_str_to_32_chars(dtable_uuid), org_id, owner, @@ -137,21 +133,13 @@ def get_script_api(script_id): ): return make_response(("Bad request", 400)) - # if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - # now = datetime.now() - # duration_seconds = (now - script.created_at).seconds - # if duration_seconds > SUB_PROCESS_TIMEOUT: - # script.success = False - # script.return_code = -1 - # script.finished_at = now - # script.output = TIMEOUT_OUTPUT - # db_session.commit() - return make_response(({"script": script.to_dict()}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) + finally: + db_session.close() # get python script statistics logs... @@ -282,15 +270,18 @@ def record_script_result(): spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") # update script_log and run-time statistics + db_session = DBSession() try: if script_id: scheduler.script_done_callback( - script_id, success, return_code, output, started_at, spend_time + db_session, script_id, success, return_code, output, started_at, spend_time ) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) + finally: + db_session.close() return "success" diff --git a/scheduler/app/monitor.sh b/scheduler/app/monitor.sh index b5f6dde..079664f 100755 --- a/scheduler/app/monitor.sh +++ b/scheduler/app/monitor.sh @@ -39,24 +39,11 @@ function monitor_flask_server() { fi } -function monitor_scheduler() { - process_name="scheduler.py" - check_num=$(check_process $process_name) - if [ $check_num -eq 0 ]; then - log "Start $process_name" - sleep 0.2 - cd /opt/scheduler/ - python3 -u scheduler.py >> "${LOG_FILE}" 2>&1 & - sleep 0.2 - fi -} - log "Start Monitor" while [ 1 ]; do monitor_flask_server - monitor_scheduler sleep 30 done diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index a461b51..8a5cb63 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -27,9 +27,9 @@ class Scheduelr: def __init__(self): self.executor = ThreadPoolExecutor() - def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_from): + def add(self, db_session, dtable_uuid, org_id, owner, script_name, context_data, operate_from): script_log = add_script( - DBSession(), + db_session, dtable_uuid, owner, org_id, @@ -52,10 +52,10 @@ def add(self, dtable_uuid, org_id, owner, script_name, context_data, operate_fro return script_log def script_done_callback( - self, script_id, success, return_code, output, started_at, spend_time + self, db_session, script_id, success, return_code, output, started_at, spend_time ): hook_update_script( - DBSession(), script_id, success, return_code, output, started_at, spend_time + db_session, script_id, success, return_code, output, started_at, spend_time ) def statistic_cleaner(self): @@ -67,7 +67,7 @@ def statistic_cleaner(self): except Exception as e: logger.exception(e) finally: - DBSession.remove() + db_session.close() time.sleep(24 * 60 * 60) def start(self): diff --git a/scheduler/app/scheduler.sh b/scheduler/app/scheduler.sh index 27a2988..4feac61 100755 --- a/scheduler/app/scheduler.sh +++ b/scheduler/app/scheduler.sh @@ -8,7 +8,6 @@ fi function stop_server() { pkill -9 -f flask_server.py - pkill -9 -f scheduler.py pkill -9 -f monitor rm -f /opt/scheduler/pids/*.pid } @@ -32,9 +31,6 @@ function start_server() { python3 -u flask_server.py >> "${LOG_FILE}" 2>&1 & sleep 0.2 - python3 -u scheduler.py >> "${LOG_FILE}" 2>&1 & - sleep 0.2 - ./monitor.sh & #&>>/opt/scheduler/logs/monitor.log & From b596cf60deeebb0d056e161f8f6ad5bb060e963d Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 18 Sep 2025 11:09:10 +0800 Subject: [PATCH 21/21] fix black --- scheduler/app/flask_server.py | 8 +++++++- scheduler/app/scheduler.py | 20 ++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index e3774cd..8d28adf 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -274,7 +274,13 @@ def record_script_result(): try: if script_id: scheduler.script_done_callback( - db_session, script_id, success, return_code, output, started_at, spend_time + db_session, + script_id, + success, + return_code, + output, + started_at, + spend_time, ) except Exception as e: diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 8a5cb63..5395c00 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -27,7 +27,16 @@ class Scheduelr: def __init__(self): self.executor = ThreadPoolExecutor() - def add(self, db_session, dtable_uuid, org_id, owner, script_name, context_data, operate_from): + def add( + self, + db_session, + dtable_uuid, + org_id, + owner, + script_name, + context_data, + operate_from, + ): script_log = add_script( db_session, dtable_uuid, @@ -52,7 +61,14 @@ def add(self, db_session, dtable_uuid, org_id, owner, script_name, context_data, return script_log def script_done_callback( - self, db_session, script_id, success, return_code, output, started_at, spend_time + self, + db_session, + script_id, + success, + return_code, + output, + started_at, + spend_time, ): hook_update_script( db_session, script_id, success, return_code, output, started_at, spend_time